top | item 41914736

Streaming joins are hard

117 points| danthelion | 1 year ago |estuary.dev

57 comments

order

ryzhyk|1 year ago

The correct way to think about the problem is in terms of evaluating joins (or any other queries) over changing datasets. And for that you need an engine designed for *incremental* processing from the ground up: algorithms, data structures, the storage layer, and of course the underlying theory. If you don't have such an engine, you're doomed to build layer of hacks, and still fail to do it well.

We've been building such an engine at Feldera (https://www.feldera.com/), and it can compute joins, aggregates, window queries, and much more fully incrementally. All you have to do is write your queries in SQL, attach your data sources (stream or batch), and watch results get incrementally updated in real-time.

qazxcvbnm|1 year ago

Hi, I’ve read the DBSP paper and it’s a really well-thought out framework; all the magic seemed so simple with the way the paper laid things out. However, the paper dealt with abelian Z-sets only, and mentioned that in your implementation, you also handle the non-abelian aspect of ordering. I was wondering if you guys have published about how did you that?

benreesman|1 year ago

I was with you and thinking Postgres over and over until the second paragraph. Which isn’t to say anything bad about your product, it sounds very cool.

But i’d work in “just like Postgres”.

crazygringo|1 year ago

Can someone explain what the use case is for streaming joins in the first place?

I've written my fair share of joins in SQL. They're indispensable.

But I've never come across a situation where I needed to join data from two streams in real time as they're both coming in. I'm not sure I even understand what that's supposed to mean conceptually.

It's easy enough to dump streams into a database and query the database but clearly this isn't about that.

So what's the use case for joins on raw stream data?

GeneralMayhem|1 year ago

Event correlations are a typical one. Think about ad tech: you want every click event to be hydrated with information about the impression or query that led to it. Both of those are high-volume log streams.

You want to end up with the results of:

``` select * from clicks left join impressions on (clicks.impression_id=impressions.id) ```

but you want to see incremental results - for instance, because you want to feed the joined rows into a streaming aggregator to keep counts as up to date as possible.

ryzhyk|1 year ago

The computational complexity of running an analytical query on a database is, at best, O(N), where N is the size of the database. The computational complexity of evaluating queries incrementally over streaming data with a well-designed query engine is O(delta), where delta is the size of the *new* data. If your use case is well served by a database (i.e., can tolerate the latency), then you're certainly better off relying on the more mature technology. But if you need to do some heavy-weight queries and get fresh results in real-time, no DB I can think of can pull that off (including "real-time" databases).

BeefWellington|1 year ago

I'll use a contrived example here to explain what the value of streaming the data itself is.

Let's say you run a large installation that has a variety of very important gauges and sensors. Due to the size and complexity of this installation, these gauges and sensors need to be fed back to a console somewhere so that an overseer role of sorts can get that big picture view to ensure the installation is functioning fully healthy.

For that scenario, if you look at your data in the sense of a typical RDBMS / Data Warehouse, you would probably want to save as much over the wire traffic as possible to ensure there's no delays in getting the sensor information fed into the system reliably on time. So you trim down things to just a station ID and some readings coming into your "fact" table (it could be more transactionally modeled but mostly it'll fit the same bill).

Basically the streaming is useful so that in near-realtime you can live scroll the recordset as data comes in. Your SQL query becomes more of an infinite Cursor.

Older ways of doing this did exist on SQL databases just fine; typically you'd have some kind of record marker, whether it was ROWID, DateTime, etc., and you'd just reissue an identical query to get the newer records. That introduces some overhead though, and the streaming approach kind of minimizes/eliminates that.

AtlasBarfed|1 year ago

Probably related to the fundamental problem of joining distributed data within CAP constraints. Virtually all distributed databases offering full SQL are CP (that is, they assume no nodes will be down otherwise the data won't return).

If you have distributed data, the join will get calculated by SOME node in the network, and the data will have to be streamed in and joined by the central processor. Even with modern meganodes, for BigData marketing you have to handle arbitrarily sized datasets, and that means streaming data into the processing nodes working memory.

Of course there are ways to distribute join calculation (sometimes) as well, but you're still talking merging streams of data coming into processing nodes.

Now, if you have to handle AP/eventually consistent models, then it REALLY gets complicated, and ultimately your huge massive join (I'm assuming a join of tables of data, not just a denormalization join of a single row/primary key and child foreign keys) is a big eventually consistent approximation view, even without the issue of incoming updates/transactions mutating the underlying datasets as you stream and merge/filter them.

psfried|1 year ago

The main benefit isn't necessarily that it's _streaming_ per se, but that it's _incremental_. We typically see people start by just incrementally materializing their data to a destination in more or less the same set tables that exist in the source system. Then they develop downstream applications on top of the destination tables, and they start to identify queries that could be sped up by pre-computing some portion of it incrementally before materializing it.

There's also cases where you just want real time results. For example, if you want to take action based on a joined result set, then in the rdbms world yoy might periodically run a query that joins the tables and see if you need to take action. But polling becomes increasingly inefficient at lower polling intervals. So it can work better to incrementally compute the join results, so you can take action immediately upon seeing something appear in the output. Think use cases like monitoring, fraud detection, etc.

closeparen|1 year ago

Anything you can do with stateful streaming technology, you can do with a database and a message handler. It’s just a question of programming model and scaling characteristics. You typically get an in-process embedded DB per shard, with an API that makes it seem closer to managing state in memory.

matlin|1 year ago

We apply incremental, streamable "joins" (relational queries) for real-time syncing between application client and server. I think much of the initial research in this space was around data pipelines but the killer app (no pun intended) is actually in app development

tshaddox|1 year ago

Isn't the use case just any time you want a client to essentially subscribe to an SQL query and receive message every time the result of that SQL query changes?

fnordpiglet|1 year ago

This is extremely common in trading systems where real time data is joined against reference data and grouped, etc for a variety of purposes including consumption by algorithms and display.

10000truths|1 year ago

Streams are conceptually infinite, yes, but many streaming use cases are dealing with a finite amount of data that's larger than memory but fits on disk. In those cases, you can typically get away with materializing your inputs to a temporary file in order to implement joins, sorts, percentile aggregations, etc.

psfried|1 year ago

Yes, and this is an important point! This is the reason for our current approach for sqlite derivations. You can absolutely just store all the data in the sqlite database, as long as it actually fits. And there's cases where people actually do this on our platform, though I don't think we have an example in our docs.

A lot of people just learning about streaming systems don't come in with useful intuitions about when they can and can't use that approach, or even that it's an option. We're hoping to build up to some documentation that can help new people learn what their options are, and when to use each one.

tombert|1 year ago

A large part of my job in the last few months has been in the form figuring out how to optimize joins in Kafka Streams.

Kafka Streams, by default, uses either RocksDB or an in-memory system for the join buffer, which is fine but completely devours your RAM, and so I have been writing something more tuned for our work that actually uses Postgres as the state store.

It works, but optimizing JOINs is almost as much of an art as it is a science. Trying to optimize caches and predict stuff so you can minimize the cost of latency ends up being a lot of “guess and check” work, particularly if you want to keep memory usage reasonable.

lfmunoz4|1 year ago

Can you explain why streaming joins are necessary. All examples I've seen are bad. For example joining books and author as a stream seems ridiculous, why couldn't the author come up with a better example that is realistic.

mattxxx|1 year ago

JOINs are just hard period. When you're operating at a large scale, you need to be thinking about exactly how to partition + index your data for the types of queries that you want to write with JOINs.

Streaming joins are so hard, that they're an anti pattern. If you're using external storage to make it work, then your architecture has probably gone really wrong or you're using streams for something that you shouldn't.

jdelman|1 year ago

The ability to express joins in terms of SQL with Estuary is pretty cool. Flink can do a lot of what is described in this post, but you have to set up a lot of intermediate structures, write a lot of Java/Scala, and store your state as protos to support backwards compatibility. Abstracting all of that away would be a huge time saver, but I imagine not having fine grained control over the results and join methods could be frustrating.

fiddlerwoaroof|1 year ago

Flink does have a SQL join now that you can make work. Streaming joins remain a hard problem, though and, imo, SQL doesn’t map nicely onto streaming systems.

neeleshs|1 year ago

"Unlike batch tables, streams are infinite. You can't "just wait" for all the rows to arrive before performing a join."

I view batch tables as simply a given state of some set of streams at a point in time. Running the same query against "batch" tables at different points in time yields different results (assuming the table is churning over time).

manx|1 year ago

I think it should be possible to create a compiler which transforms arbitrary sql queries into a set of triggers and temporary tables to get incremental materialized views which are just normal tables. Those can be indexed, joined etc. no extra services needed. Such an approach should in theory work for multiple relational database systems if it's all adhering to standards.

ctenb|1 year ago

If both inputs are ordered by a subset of the join key, you can stream the join operation. It depends on your domain whether this can be made the case, or course. If one of the two join operands is much smaller than the other, you can make the join operation streaming for the larger operand.

bob1029|1 year ago

> Streaming data isn't static like tables in databases—it's unbounded, constantly updating, and poses significant challenges in managing state.

I don't really see the difference between tables & streams. Data in tables changes over time too. You can model a stream as a table with any degree of fidelity you desire. In fact, I believe this could be considered a common approach for implementing streaming abstractions.

pgwhalen|1 year ago

When one queries a table though, it's only query at one point in time. Querying a stream implies that your result set is a stream as well, which introduces a whole separate set of complexities to worry about both as an implementor of the query engine and a client.

hamandcheese|1 year ago

It seems intuitive to me that a correct streaming join is impossible without an infinite buffer and strong guarantees on how events are ordered. The number of real world systems offering both of those guarantees is zero. Anyone espousing streaming joins as a general solution should be avoided at all costs, particularly if they have a title that contains "architect" or "enterprise" (god forbid both in the same title).

At best, it is a trick to be applied in very specific circumstances.

ryzhyk|1 year ago

A streaming join indeed requires an unbounded buffer in the most general case when inputs keep growing and any input record on one side of the join can match any record on the other side. However, it does not require inputs to be ordered. An incremental query engine such as Feldera or Materialize can handle out-of-order data and offer strong consistency guarantees (disclaimer: I am a developer of Feldera). In practice, unbounded buffers can often be avoided as well. This may require a specialized join such as as-of join (https://www.feldera.com/blog/asof-join) and some GC machinery.