Skip to content

Instantly share code, notes, and snippets.

@gianm
Last active April 22, 2020 16:18
Show Gist options
  • Save gianm/39548daef74f0373b3c87056e3db4627 to your computer and use it in GitHub Desktop.
Save gianm/39548daef74f0373b3c87056e3db4627 to your computer and use it in GitHub Desktop.

Joins

About this document

This is a design document describing the mechanics of joins in Druid. It is related to the initial join support proposal (apache/druid#8728) but covers different things: the proposal is more about motivation, design considerations, scope, and direction. This document is more about implementation.

Virtual join segments

Mechanics

The lowest-level component of joins is the virtual join segment, HashJoinSegment. A HashJoinSegment has two fields:

Segment baseSegment;
List<JoinableClause> clauses;

This represents a left-deep join of a left-hand side baseSegment (a "real" segment, like a QueryableIndexSegment or IncrementalIndexSegment) onto a series of right-hand side clauses. Logically, the join tree must look something like the following, where the right-hand side of every join pair is a joinable table (e.g. T0, T1) and the bottom-leftmost leaf is a real datasource (D):

    J
   / \
  J   T1
 / \
D   T0

It is the responsibility of higher-level things (e.g. the SQL planner or the user) to transform joins into this form before sending them down to data servers.

Just like real segments, virtual join segments have storage adapters. In this case, it's HashJoinSegmentStorageAdapter. The most interesting method here is makeCursors, which creates a cursor tree matching the join tree.

Each pairwise join in the cursor tree is implemented by HashJoinEngine. Given a Cursor and a JoinableClause the engine will create a new cursor that implements a join.

To facilitate this, the JoinableClause includes a Joinable object, which can be used to create a JoinMatcher. Join matchers support operations like matching a join condition, iterating through matches, and remembering the remainder (rows that never matched; useful for right/full joins). You can think of Joinable (and the associated JoinMatcher) as the extension points needed to implement a new kind of table that can slot in as the right-hand side of a join.

Today, there are two kinds of joinables: lookups (LookupJoinable) and indexed tables (IndexedTableJoinable). Lookups are the Druid lookups you know and love. Indexed tables are more flexible, supporting all native Druid types, and wider tables with multiple key columns.

Column selectors

HashJoinEngine's JoinColumnSelectorFactory is the selector factory for a join cursor.

When accessing columns from the left-hand side:

  • For left/inner joins the underlying selector is passed through without decoration, meaning efficiency is the same as a non-joined datasource.
  • For right/full joins the underlying selector is decorated with a PossiblyNullDimensionSelector, which allows for generation of null left-hand columns, as required by right/full joins. This is potentially less efficient than a non-joined datasource.

When accessing columns from the right-hand side, the Joinable has total control over how it generates a selector. For example, IndexedTableDimensionSelector uses the table row number as its dictionary code, allowing optimizations where query engines operate on the row number rather than the value.

Filtering

Currently, filters cannot be applied directly on the baseSegment in any case. This means that join segments cannot get the benefit of indexes. This must be improved in the future to achieve good performance. One possible way is to add a baseFilter parameter to HashJoinSegment that would be used to filter the baseSegment; in this case, we'd rely on the SQL layer to push filters into baseFilter as appropriate.

Limitations

Today, the system has the following limitations:

  • The Joinables (right-hand-sides) involved in a query must fit in memory.
  • The join condition must be an equi-join (equality condition) based on a left-hand-side expression and a right-hand-side direct column access, like f(left_side) = right_side_column. (An AND of multiple equality conditions is also okay.)

These limitations are both due to the fact that the only algorithm currently implemented is single-pass on the left hand side and assumes that the right-hand side supports fast row retrieval based on the join condition. These assumptions are only valid for in-memory hash joins.

TODO

Next steps (each should be roughly one PR worth of of work):

  1. The rest of "Data server behavior" from apache/druid#8728 (this patch is number 3, the virtual join Segment). After this step, data servers will be able to handle join queries.
  2. Enough Broker logic to send down unmodified join queries to data servers (i.e., just enough to determine which 'table' datasource is the one being queried). After this step, Brokers will be able to handle join queries that do not require rewrites.
  3. SQL layer logic to translate table JOIN lookup ON table.col = lookup.k into native join queries. After this step, the JOIN keyword in SQL works end to end in at least one case.
  4. Subquery inlining on the Broker: the ability to evaluate subqueries and rewrite them as inline datasources. After this step, Brokers will be able to handle joins against subqueries.
  5. SQL layer logic to translate joins against subqueries into native queries. Remove semijoin-specific logic like DruidSemiJoinRule; it should be handled by a more generic rule.
  6. Broker and data server ability to build hash tables from subqueries on lookups. This will involve being able to do queries on lookups in the first place, implying the need for a new LookupSegment and LookupStorageAdapter.
  7. Tweak Broker logic to send down subqueries on lookups, rather than inlining them. The combination of (6) and (7) should make joins on subqueries of lookups more efficient.
  8. Broker ability to run 100% global (no 'table' datasources) on their own. This will need to leverage the LookupSegment and LookupStorageAdapter from (6). At this point everything from the proposal apache/druid#8728 will have been implemented.

After these steps are complete, follow ups should include:

  • Various performance optimizations: filter push-down, deferred lookupName during condition matching / row retrieval, vectorized joins. A good target to shoot for is that simple joins should be just as fast as lookups.
  • Proper handling of RIGHT OUTER joins; see comment in HashJoinEngine: "Warning! The way this engine handles 'righty' joins is flawed: it generates the 'remainder' rows per-segment, but this should really be done globally. This should be improved in the future." The original proposal did not discuss how to handle this (I hadn't thought of it when I wrote the proposal...)
@drcrallen
Copy link

This is good info! Where/how are Shuffles for joins taken into account? Is that a responsibility of the cursor, matcher, joinable, dont-know-yet? Are there no shuffles and each join stage has to materialize its results as a joinable distributed to all nodes?

@gianm
Copy link
Author

gianm commented Dec 31, 2019

This is good info! Where/how are Shuffles for joins taken into account? Is that a responsibility of the cursor, matcher, joinable, dont-know-yet? Are there no shuffles and each join stage has to materialize its results as a joinable distributed to all nodes?

They aren't taken into account at all. IMO this lower-level part of the system documented so far (the virtual join segment) shouldn't handle shuffles, they should be handled by some other component that shuffles data first before virtual segments are created.

I was imagining that the first thing built on top of this would be broadcast-based rather than shuffle-based. For example the overall system would support joining a datasource to a subquery by first executing the subquery, materializing the results, and sending them to all data servers. Then those data servers would build an IndexedTableJoinable from those results and do a hash join against it.

For shuffle-based joins I think the way to do them would be to have it be the responsibility of some system that doesn't exist yet. Maybe we can reuse some code from the IntermediaryDataManager used by parallel indexing tasks. Then after shuffle we could do either a hash join or a sort-merge join. That means that the hash-join-based virtual segments described here may or may not be used (they wouldn't be useful for a sort-merge join, we'd need something else…).

@drcrallen
Copy link

Sounds like a reasonable progression.

Shuffling mechanisms seems like it is going to need its own proposal when the time comes.

@gianm
Copy link
Author

gianm commented Dec 31, 2019

Shuffling mechanisms seems like it is going to need its own proposal when the time comes.

Yes, I agree.

@jnaous
Copy link

jnaous commented Jan 10, 2020

It would be good to start design documents with some history or additional context for the changes. Basically how things work now so you can build on top of it, and so that newbies who want to understand the doc/design don't have to hunt for the prerequisite information needed so they can contribute effectively. (BTW, I hate the commenting system on github. Unthreaded, no inline comments, yuck)

@jnaous
Copy link

jnaous commented Jan 10, 2020

Is there a reason to keep lookups around when you have IndexedTableJoinable?

@jnaous
Copy link

jnaous commented Jan 10, 2020

Currently, filters cannot be applied directly on the baseSegment in any case

Why not?

@jnaous
Copy link

jnaous commented Jan 10, 2020

IMHO, we should make PRs as small as a couple of classes at a time to make it easier for people to thoroughly review PRs.

@kishoreduraisam
Copy link

Can we have some examples of joins using SQL? I am looking through the documentation and not very sure if the Druid SQL supports joining one table datasource to another table datasource. My confusion is specifically around if Druid SQL supports this but native queries does not.

@gianm
Copy link
Author

gianm commented Apr 22, 2020

Here's an example from the docs: https://druid.apache.org/docs/latest/querying/datasource.html#join

The main use cases for join support in 0.18.0 are joining tables to lookups, or to smallish subquery results (fewer than 100,000 rows), or to inline data.

You can join a table to a table, but as the docs mention, that will require issuing a subquery for one or both of the tables. Subqueries have performance and memory usage implications as noted in https://druid.apache.org/docs/latest/querying/datasource.html#query. So this isn't a good solution for direct table-to-table joins unless they are small.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment