Skip to content

Instantly share code, notes, and snippets.

@onderkalaci
Last active September 8, 2023 13:53
Show Gist options
  • Save onderkalaci/a012c58133ca333f4b29eee200dd6130 to your computer and use it in GitHub Desktop.
Save onderkalaci/a012c58133ca333f4b29eee200dd6130 to your computer and use it in GitHub Desktop.
A comprehensive guide on Citus' distributed planning with examples

Distributed Query Planning with Examples in Citus (as of Citus 12.1)

This part of the documentation aims to provide a comprehensive understanding of how Citus handles distributed query planning. We will use a set of realistic tables to demonstrate various query queries. Through these examples, we hope to offer a step-by-step guide on how Citus chooses to plan queries.

Non-Goal: This document does not aim to provide the high level design of the distributed planner, but instead the lowest levels of details possible. For high-level flow, go to https://postgresconf.org/system/events/document/000/000/233/Distributing_Queries_the_Citus_Way.pdf

Tables Included:

  • Users Table (Distributed)
  • Orders Table (Distributed)
  • Products Table (Distributed)
  • Country Codes (Reference)
  • Order Status (Reference)
  • Product Categories (Reference)

Table definitions used in this section

-- Distributed Table: Users Table
CREATE TABLE users_table (
    user_id bigserial PRIMARY KEY,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(50),
    date_of_birth DATE,
    country_code VARCHAR(3)
);
SELECT create_distributed_table('users_table', 'user_id');

-- Distributed Table: Orders Table
CREATE TABLE orders_table (
    order_id bigserial,
    user_id BIGINT REFERENCES users_table(user_id),
    product_id BIGINT,
    order_date TIMESTAMPTZ,
    status VARCHAR(20)
);
SELECT create_distributed_table('orders_table', 'user_id');


-- Distributed Table: Products Table
CREATE TABLE products_table (
    product_id bigserial PRIMARY KEY,
    product_name VARCHAR(100),
    category_id INT,
    price NUMERIC(10, 2)
);
SELECT create_distributed_table('products_table', 'product_id');

-- Reference Table: Country Codes
CREATE TABLE country_codes (
    country_code VARCHAR(3) PRIMARY KEY,
    country_name VARCHAR(50)
);

-- Reference Table: Order Status
CREATE TABLE order_status (
    status VARCHAR(20) PRIMARY KEY,
    description TEXT
);
SELECT create_reference_table('order_status');

-- Reference Table: Product Categories
CREATE TABLE product_categories (
    category_id INT PRIMARY KEY,
    category_name VARCHAR(50)
);
SELECT create_reference_table('product_categories');

Fast Path Router Planner

The Fast Path Router Planner is specialized in optimizing queries that are both simple in structure and certain to touch a single shard. Importantly, it targets queries on a single shard distributed, citus local or reference tables. This does not mean the planner is restricted to trivial queries; it can handle complex SQL constructs like GROUP BY, HAVING, DISTINCT, etc., as long as these operate on a single table and involve an equality condition on the distribution key (distribution_key = X). The main SQL limitation for fast path distributed query planning is the subquery/CTE support. Those are left to the next planner: Router planner.

The aim of this planner is to avoid relying on PostgreSQL's standard_planner() for planning, which performs unnecessary computations like cost estimation, irrelevant for distributed planning. Skipping the standard_planner has significant performance gains for OLTP workloads. By focusing on "shard-reachable" queries, the Fast Path Router Planner is able to bypass the need for more computationally expensive planning processes, thereby accelerating query execution.

Main C Functions Involved:

  • FastPathRouterPlan(): The primary function for creating the fast-path query plan.
  • FastPathRouterQuery(): Validates if a query is eligible for fast-path routing by checking its structure and the WHERE clause.

With set client_min_messages to debug4; you should see the following in the DEBUG messages: "DEBUG: Distributed planning for a fast-path router query"

-- Fetches the count of users born in the same year, but only  
-- for a single country 
SELECT EXTRACT(YEAR FROM date_of_birth) as birth_year, COUNT(*)  
FROM users_table 
WHERE country_code = 'USA' AND user_id = 15 
GROUP BY birth_year 
HAVING COUNT(*) > 10; 
-- all INSERT commands are by definition fast path 
-- router queries in the sense that they do not 
-- need any information from Postgres' standard_planner() 
INSERT INTO orders_table (user_id, product_id, order_date, status)  
VALUES (42, 555, now(), 'NEW'); 
-- UPDATE/DELETEs can also be qualified as fast path router 
-- queries  
UPDATE products_table SET price = price * 1.1 WHERE product_id = 555; 

Fast path queries have another important characteristic named "deferredPruning."

For regular queries, Citus does the shard pruning during the planning phase, meaning that the shard that the query touches are calculated during the planning phase. However, in an ideal world, the shard pruning should happen during the execution and, for a certain class of queries, we support that. In the code, that is denoted by "Job->deferredPruning" field.

Given that fast path queries are performance critical, they can be planned with prepared statements. When this is done, "Job->deferredPruning" becomes "true". And, the meaning of that is Citus can support PREPARED statements as expected. The first 6 executions of the plan do distributed planning, the rest is cached similar to Postgres' plan caching, and the shard pruning is done during the execution phase. And, if you attach a debugger, you'd see that on the first 6th execution, the debugger will stop at distributed_planner() function, but on the rest, it will not. The shard pruning for the cached command will happen in CitusBeginScan() function.

To see that in action, checkout the DEBUG messages:

set client_min_messages to debug4;
PREPARE p1 (bigint) AS SELECT * FROM users_table WHERE user_id = $1;
 
-- 1st execute
execute p1(1);
DEBUG:  Deferred pruning for a fast-path router query
DEBUG:  Creating router plan
....
(0 rows)

-- 2nd execute
execute p1(1);
DEBUG:  Deferred pruning for a fast-path router query
DEBUG:  Creating router plan
....
(0 rows)
...
execute p1(1);
execute p1(1);
execute p1(1);
...

-- 6th execute
execute p1(1);
DEBUG:  Deferred pruning for a fast-path router query
DEBUG:  Creating router plan
....
(0 rows)

-- now, on the 7th execute, you would **NOT** see the fast-path
-- planning anymore, because the query comes from Postgres'
-- query cache
execute p1(1);
DEBUG:  constraint value: '1'::bigint
DEBUG:  shard count after pruning for users_table: 1
DEBUG:  opening 1 new connections to localhost:9702
DEBUG:  established connection to localhost:9702 for session 8 in 9499 microseconds
DEBUG:  task execution (0) for placement (46) on anchor shard (102049) finished in 1281 microseconds on worker node localhost:9702
DEBUG:  Total number of commands sent over the session 8: 1 to node localhost:9702
(0 rows)

Router Planner in Citus

Overview

The Router Planner plays a key role in Citus' query optimization landscape. While sharing some common traits with the Fast Path Router Planner, it offers unique capabilities as well. Router (and fast path router) planners are the bedrock for the multi-tenant use cases.

Similarities with Fast Path Router Planner

  • Single Node Routing: Both planners send queries to a single node. Unlike the Fast Path Planner, the Router Planner can work with multiple colocated tables, provided they have filters on their distribution columns.

  • Query Routing Mechanics: Router Planner takes the query, verifies if it can be routed, and if so, it replaces original table names with their corresponding shard names, directing the query to the appropriate nodes.

Differences

  • Subqueries and CTEs: The Router Planner can manage subqueries and Common Table Expressions (CTEs), routing the entire query to a single node as long as all involved tables have filters on their distribution columns.

  • Standard Planner Reliance: Router Planner relies on PostgreSQL's standard_planner() to learn the necessary filter restrictions on the tables.

Main C Functions Involved

  • PlanRouterQuery(): Responsible for creating the router plan.
  • TargetShardIntervalsForRestrictInfo(): Retrieves the shard intervals based on restrictions provided by PostgreSQL's standard_planner().

Example Router Planner Queries

-- Fetch user data and their respective orders for a given user_id
SELECT u.username, o.order_id
FROM users_table u, orders_table o
WHERE u.user_id = o.user_id AND u.user_id = 42;

-- With Subqueries:

-- Fetch the username and their total order amount 
-- for a specific user
SELECT u.username, 
       (SELECT MAX(o.product_id) FROM orders_table o 
        WHERE o.user_id = 42 AND 
        o.user_id = u.user_id)
FROM users_table u
WHERE u.user_id = 42;

-- Router planner works with CTEs (and UPDATE/DELETE Query):
-- Update the status of the most recent order for a specific user
WITH RecentOrder AS (
  SELECT MAX(order_id) as last_order_id
  FROM orders_table
  WHERE user_id = 42
)
UPDATE orders_table
SET status = 'COMPLETED'
FROM RecentOrder
WHERE orders_table.user_id = 42 AND 
		orders_table.order_id = RecentOrder.last_order_id;

Certainly, I'll adjust the heading levels for you:

Query Pushdown Planning in Citus

Overview

While Router and Fast-Path Router Planners are proficient at dealing with single-shard commands—making them ideal for multi-tenant and OLTP applications—Citus also excels in analytical use-cases. In these scenarios, a single query is broken down into multiple parallel sub-queries, which are run on various shards across multiple machines, thereby speeding up query execution times significantly.

What is Query Pushdown Planning?

Query Pushdown Planning is an extension of the Router Planning paradigm. Unlike the latter, which deals with single-shard, single-node queries, Query Pushdown can route a query to multiple shards across multiple nodes. Instead of verifying that all tables have the same filters, as in Router Planning, Query Pushdown ascertains that all tables are joined on their distribution keys.

Core Functions

The core C function responsible for this check is RestrictionEquivalenceForPartitionKeys(), which ensures that tables in the query are joined based on their distribution keys. Initially intended for subqueries, Query Pushdown has been extended to include other cases as well. The decision to utilize Query Pushdown is determined by the ShouldUseSubqueryPushDown() function.

Understanding Query Pushdown

Understanding Query Pushdown Planning and how it extends the simpler Router Planning can help you fully utilize Citus for your analytical workloads.

Key Characteristics of Query Pushdown

  • High Parallelism: The query is broken down into multiple sub-queries, leading to parallel execution on multiple shards and nodes.
  • Worker Subquery: You will typically notice the alias worker_subquery in the SQL queries sent to the shards, indicating a pushdown operation.

Examples of query pushdown

Basic Example

-- Count of distinct product_ids where user_ids from two different tables match
SELECT count(DISTINCT product_id) 
FROM (
  SELECT DISTINCT user_id as distinct_user_id 
  FROM users_table
) foo, orders_table 
WHERE orders_table.user_id = distinct_user_id;

Subquery in Target List

-- Sum of max price per product category, filtered by a subquery in the target list
SELECT 
  (SELECT MAX(price) FROM products_table p WHERE p.category = o.category),
  COUNT(DISTINCT o.product_id)
FROM orders_table o, users_table u
WHERE o.user_id = u.user_id AND u.user_id IN 
(SELECT user_id FROM special_users_table)
GROUP BY o.category;

Subquery in WHERE Clause

-- Number of distinct users who have placed an order
SELECT COUNT(DISTINCT u.user_id)
FROM users_table u
WHERE u.user_id IN (
  SELECT o.user_id
  FROM orders_table o
);

More Examples

-- Count of distinct products per user, with maximum order date from orders
--  as a subquery in the target list
 SELECT
  (SELECT MAX(o.order_date) FROM orders_table o WHERE o.user_id = u.user_id),
  COUNT(DISTINCT o.product_id)
FROM orders_table o, users_table u
WHERE o.user_id = u.user_id
GROUP BY u.user_id;

UPDATE and DELETE with Query Pushdown

-- Update status in orders_table for users whose email ends with '@example.com'
UPDATE orders_table o
SET status = 'DISCOUNTED'
FROM users_table u
WHERE o.user_id = u.user_id AND u.email LIKE '%@example.com';
-- Delete orders for users who were born before '2000-01-01'
DELETE FROM orders_table o
USING users_table u
WHERE o.user_id = u.user_id AND u.date_of_birth < '2000-01-01';

Recursive Planning

Central to understanding Citus' approach to distributed query planning are two closely interrelated concepts: "Query Pushdown Planning" and "Recursive Planning." These dual strategies lay the foundation for Citus' capacity to manage complex query structures across multiple shards and nodes effectively.

While Query Pushdown Planning optimizes queries by breaking them into smaller components that can run in parallel across multiple shards, Recursive Planning takes a more nuanced approach. It works its way through the query tree from the deepest level upwards, scrutinizing each subquery to determine its suitability for pushdown.

The essence of recursive planning lies in treating each recursively planned query in isolation. This means correlated subqueries can't take advantage of recursive planning. However, (sub)queries on local tables can be done via recursive planning.

This process is primarily executed in the RecursivelyPlanSubqueryWalker() C function. In this function, the engine goes to the innermost subquery and assesses whether it can safely be pushed down as a stand-alone query. If it can, the query engine simply moves on. However, if the subquery isn't suitable for pushdown, Citus generates a separate "sub-plan" for that subquery, substituting it with a read_intermediate_result() function call. These sub-plans are later executed as independent queries, a task overseen by the ExecuteSubPlans() function.

The engine continues this way, moving upward through each level of subqueries, evaluating and, if needed, creating sub-plans until it reaches the top-level query.

Intermediate Results as Reference Tables

One of the key aspects of Recursive Planning is the use of "intermediate results." These are essentially the outcomes of subqueries that have been recursively planned and executed on worker nodes. Once these intermediate results are obtained, they are treated much like reference tables in the subsequent stages of query planning and execution. The key advantage here is that, like reference tables, these intermediate results can be joined with distributed tables on any column, not just the distribution key.

Full SQL Coverage via Recursive Planning

The practice of recursively creating sub-plans and generating intermediate results offers a workaround for achieving full SQL coverage in Citus. If each subquery in a complex SQL query can be replaced with an intermediate result, then the entire query essentially becomes a query on a reference table. This feature is a crucial aspect for many users who require comprehensive SQL support in their distributed systems.

Trade-offs of using recursive planning

While Recursive Planning brings a lot to the table, it's not without its drawbacks. First, the method inherently adds more network round-trips, as each recursively planned query is executed separately, and its results are pushed back to all worker nodes. Secondly, when functions like read_intermediate_results are used to fetch data from these intermediate results, it can confound the Postgres planner, particularly in the context of complex joins. As a result, query estimations may be inaccurate, leading to suboptimal execution plans.

Understanding these facets of Recursive Planning can provide you with a comprehensive view of how Citus approaches distributed query planning, allowing you to better optimize your database operations.

This may seem complex at first glance, but it's a bit like a step-by-step puzzle-solving process that the Citus query engine performs to optimize your database queries effectively. To help clarify these intricate mechanics, we'll present a series of examples.

Recursive Plan Example 1:

In the simplest example, we'll have a single subquery which is NOT pushdown-safe due to LIMIT 1, hence creating a subplan

SET client_min_messages TO DEBUG1;
SELECT count(*) FROM (SELECT * FROM users_table LIMIT 1) as foo;
SET
Time: 0.765 ms
DEBUG:  push down of limit count: 1
DEBUG:  generating subplan 7_1 for subquery SELECT user_id, username, email, date_of_birth, country_code FROM public.users_table LIMIT 1
DEBUG:  Plan 7 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.user_id, intermediate_result.username, intermediate_result.email, intermediate_result.date_of_birth, intermediate_result.country_code FROM read_intermediate_result('7_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id bigint, username character varying(50), email character varying(50), date_of_birth date, country_code character varying(3))) foo

Recursive Plan Example 2:

Now, we have multiple subqueries in the same level which are NOT pushdown-safe due to LIMIT 1 and GROUP BY non distribution keys, hence creating a subplan

SELECT count(*) FROM 
	(SELECT * FROM users_table LIMIT 1) as foo,
	(SELECT count(*) FROM users_table GROUP BY country_code) as bar;
DEBUG:  push down of limit count: 1
DEBUG:  generating subplan 9_1 for subquery SELECT user_id, username, email, date_of_birth, country_code FROM public.users_table LIMIT 1
DEBUG:  generating subplan 9_2 for subquery SELECT count(*) AS count FROM public.users_table GROUP BY country_code
DEBUG:  Plan 9 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.user_id, intermediate_result.username, intermediate_result.email, intermediate_result.date_of_birth, intermediate_result.country_code FROM read_intermediate_result('9_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id bigint, username character varying(50), email character varying(50), date_of_birth date, country_code character varying(3))) foo, (SELECT intermediate_result.count FROM read_intermediate_result('9_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) bar

Recursive Plan Example 3:

We have a subquery foo that is NOT safe-to-pushdown but once that subquery is replaced with an intermediate result, the rest of the query becomes safe-to-pushdown

SELECT count(*) FROM
  (SELECT 1 FROM (SELECT user_id FROM users_table LIMIT 1) as foo,
                           (SELECT * FROM orders_table) as o1,
                           (SELECT * FROM users_table) as u2
                           WHERE
                                  foo.user_id = o1.user_id AND
                                  o1.user_id = u2.user_id) as top_level_subquery;
DEBUG:  push down of limit count: 1
DEBUG:  generating subplan 1_1 for subquery SELECT user_id FROM public.users_table LIMIT 1
DEBUG:  Plan 1 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT 1 AS "?column?" FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('1_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id bigint)) foo, (SELECT orders_table.order_id, orders_table.user_id, orders_table.product_id, orders_table.order_date, orders_table.status FROM public.orders_table) o1, (SELECT users_table.user_id, users_table.username, users_table.email, users_table.date_of_birth, users_table.country_code FROM public.users_table) u2 WHERE ((foo.user_id OPERATOR(pg_catalog.=) o1.user_id) AND (o1.user_id OPERATOR(pg_catalog.=) u2.user_id))) top_level_subquery

More advanced recursive planning constructs

In the previous parts of the recursive planning examples, we only dealt with a subquery at a time. However, recursive planning is capable of considering multiple subqueries in the same query level or converting tables to subqueries in the same level. In this part of the document, let's discuss these advanced query planning capabilities.

Set operations via recursive planning (and query pushdown)

Set operations like UNION, UNION ALL, and EXCEPT are essentially two subqueries in the same query level.

Note: The rules for set operation planning on Citus can be confusing and should be taken carefully.

Citus is capable of "pushing down" certain set operations: UNION and UNION ALL. To allow this, two rules must be met, which are defined in the SafeToPushdownUnionSubquery() C code.

  1. The set operation cannot be on the top level; it should be wrapped into a subquery. This is purely an implementation limitation that can and should be eased.
  2. For all subqueries, each leaf query should have a "distribution key" on the target list, and the ordinal positions of these "distribution keys" should match across all set operations. This second limitation is required to preserve correctness.

Set operation query examples:

-- safe to pushdown
SELECT * FROM (SELECT * FROM users_table UNION SELECT * FROM users_table) as foo;
-- not safe to pushdown because the set operation is NOT wrapped into a subquery.
-- Each leaf query is recursively planned.
SELECT * FROM users_table UNION SELECT * FROM users_table;
-- not safe to pushdown because the distribution columns do NOT match (e.g., not existing)
SELECT * FROM (SELECT username FROM users_table UNION SELECT username FROM users_table) as foo;
-- not safe to pushdown because the distribution columns do NOT match.
SELECT * FROM (SELECT user_id + 1 FROM users_table UNION SELECT user_id - 1 FROM users_table) as foo;
-- EXCEPT is never safe to pushdown
SELECT * FROM (SELECT * FROM users_table EXCEPT SELECT * FROM users_table) as foo;

Set operations and joins

Although not very common, some users might have joins along with set operations. Example queries might look like:

  • (SELECT .. t1 JOIN t2) UNION (SELECT t2 JOIN t3)
  • (SELECT .. t1 UNION SELECT t2) JOIN t3 ..
  • ((SELECT .. t1 JOIN t2) UNION (SELECT t2 JOIN t3)) JOIN t4

For all these cases, similar rules apply:

  • JOINs should be made on the distribution keys.
  • SET operations should satisfy the SafeToPushdownUnionSubquery() conditions.

When combined, all conditions should match.

Safe to Pushdown Examples:

-- All joins are on the distribution key and all the unions have the distribution key in the same ordinal position.
SELECT * FROM (
	(SELECT user_id FROM users_table u1 JOIN users_table u2 USING (user_id)) 
	UNION 
	(SELECT user_id FROM users_table u1 JOIN users_table u2 USING (user_id)) 
) as foo;
-- All joins are on the distribution key and all the unions have the distribution key in the same ordinal position.
SELECT * FROM 
	(SELECT user_id FROM users_table u1 UNION 
	 SELECT user_id FROM users_table u2) as foo
		JOIN
	users_table u2
	USING (user_id);

HAVING subqueries via recursive planning

Postgres allows the HAVING clause to contain subqueries. If the subqueries in the HAVING clause don't reference the outer query (i.e., not correlated), then it's possible to recursively plan the subquery in the HAVING clause. This involves using the RecursivelyPlanAllSubqueries() function specifically for the HAVING clause.

Example:

-- Find user_ids who have placed more orders than the average number of orders per user.
SELECT 
    u.user_id, 
    COUNT(o.order_id) AS total_orders
FROM 
    users_table u 
JOIN 
    orders_table o ON u.user_id = o.user_id
GROUP BY 
    u.user_id
HAVING 
    COUNT(o.order_id) > (SELECT AVG(order_count) FROM (
                            SELECT 
                                user_id, 
                                COUNT(order_id) AS order_count
                            FROM 
                                orders_table
                            GROUP BY 
                                user_id) AS subquery);

Non-colocated subqueries via recursive planning

Assume that there are two subqueries; each subquery is individually joined on their distribution keys. However, when the two subqueries are joined on arbitrary keys, the non-colocated subquery join logic kicks in, as described in RecursivelyPlanNonColocatedSubqueries().

Non-colocated subquery Example 1:

-- Find users who do not have orders with status 'shipped' and 'pending'
-- Sub1 and Sub2 are individually safe to pushdown.
-- The join condition between them is: sub1.user_id != sub2.user_id, which does not preserve distribution key equality.
-- Citus qualifies sub1 as the anchor subquery and checks whether all other subqueries are joined on the distribution key.
-- In this case, sub2 is not joined on the distribution key, so Citus decides to recursively plan the whole sub2.
SELECT a.user_id, b.user_id
FROM (
    SELECT u.user_id
    FROM users_table u
    JOIN orders_table o ON u.user_id = o.user_id
    WHERE o.status = 'shipped'
    GROUP BY u.user_id
) AS sub1
JOIN (
    SELECT u.user_id
    FROM users_table u
    JOIN orders_table o ON u.user_id = o.user_id
    WHERE o.status = 'pending'
    GROUP BY u.user_id
) AS sub2 ON sub1.user_id != sub2.user_id;

Non-colocated subquery Example 2:

-- Similar logic also applies for subqueries in the WHERE clause.
-- Both the query in the FROM clause and the subquery in the WHERE clause are individually safe to pushdown.
-- However, as a whole, the query is not safe to pushdown.
-- Therefore, Citus decides to recursively plan the subquery in the WHERE clause.
SELECT o1.order_id, o1.order_date
FROM orders_table o1, users_table u1
WHERE o1.user_id = u1.user_id
AND o1.order_date IN (
    SELECT o2.order_date
    FROM orders_table o2, users_table u2
    WHERE o2.user_id = u2.user_id AND o2.status = 'shipped'
);

Local table - distributed table JOINs via recursive planning

In Citus, joins between a local table and a distributed table require special handling. The local table data resides on the Citus coordinator node, while the distributed table data is across multiple worker nodes. The RecursivelyPlanLocalTableJoins() C function handles this.

Performance Characteristics

Local and distributed table joins have specific performance traits. They push down filters and projections, meaning only relevant data is pulled to the coordinator. See the RequiredAttrNumbersForRelation() and ReplaceRTERelationWithRteSubquery() functions for more details.

How It Works

  1. Citus scans the query tree to find joins between local and distributed tables.
  2. Upon finding such a join, Citus forms a sub-plan for the local table.
  3. This sub-plan retrieves relevant data from the local table into an intermediate result and distributes it across worker nodes.
  4. The original query is then rewritten, replacing the local table with these intermediate results.
  5. Finally, this new query, now only involving distributed tables, is executed using Citus's standard query execution engine.

Example 1

For example, consider a local table local_users and a distributed table orders_table. A query like this:

SELECT * 
FROM local_users l, orders_table o 
WHERE l.user_id = o.user_id;

Would be internally transformed by Citus as follows:

-- Create a temporary reference table and populate it with local table data
CREATE TEMP TABLE temp_local_users AS SELECT * FROM local_users;
SELECT create_reference_table('temp_local_users');

-- Replace the local table with the temporary distributed table in the original query
SELECT * 
FROM temp_local_users t, orders_table o 
WHERE t.user_id = o.user_id;

Configuration Option

By tweaking citus.local_table_join_policy, you can control how Citus behaves for queries involving local and distributed tables. The default behavior is to pull local table data to the coordinator, with exceptions for distributed tables filtered on primary key or unique index.

Example 2

For instance, when the distributed table is guaranteed to return at most one row, Citus chooses to recursively plan the distributed table:

SELECT * 
FROM local_users l, orders_table o 
WHERE l.user_id = o.user_id AND o.primary_key = 55;

Ref table LEFT JOIN distributed table JOINs via recursive planning

Very much like local-distributed table joins, Citus can't push down queries formatted as:

"... ref_table LEFT JOIN distributed_table ..."

This is the case when the outer side is a recurring tuple (e.g., reference table, intermediate results, or set returning functions).

In these situations, Citus recursively plans the "distributed" part of the join. Even though it may seem excessive to recursively plan a distributed table, remember that Citus pushes down the filters and projections. Functions involved here include RequiredAttrNumbersForRelation() and ReplaceRTERelationWithRteSubquery().

The core function handling this logic is RecursivelyPlanRecurringTupleOuterJoinWalker(). There are likely numerous optimizations possible (e.g., first pushing down an inner JOIN then an outer join), but these have not been implemented due to their complexity.

Example Query

Here's an example that counts the number of orders for each status, including only statuses that also appear in the reference table:

SELECT os.status, COUNT(o.order_id)
FROM order_status os
LEFT JOIN orders_table o ON os.status = o.status
GROUP BY os.status;

Debug Messages

DEBUG:  recursively planning right side of the left join since the outer side is a recurring rel
DEBUG:  recursively planning distributed relation "orders_table" "o" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG:  Wrapping relation "orders_table" "o" to a subquery
DEBUG:  generating subplan 45_1 for subquery SELECT order_id, status FROM public.orders_table o WHERE true

Recursive Planning When FROM Clause has Reference Table (or Recurring Tuples)

This section discusses a specific scenario in Citus's recursive query planning: handling queries where the main query's FROM clause is recurring, but there are subqueries in the SELECT or WHERE clauses involving distributed tables.

Definitions

  • Recurring: Here, "recurring" implies that the FROM clause doesn't contain any distributed tables. Instead, it may have reference tables, local tables, or set-returning functions.

  • Subqueries in SELECT and WHERE: In case the main query's FROM clause is recurring, then no distributed tables should be present in the SELECT and WHERE subqueries.

Citus's Approach

Citus solves this by recursively planning these problematic subqueries, effectively replacing them with calls to read_intermediate_result().

Handling the WHERE Clause

For the WHERE clause, the function RecursivelyPlanAllSubqueries is called, transforming all subqueries within it.

-- Main query FROM clause is recurring, but 
-- WHERE clause contains a pushdownable subquery from 
-- orders_table (distributed table)
SELECT country_name 
FROM country_codes 
WHERE country_code IN 
	(SELECT country_code FROM users_table WHERE user_id IN (SELECT user_id FROM orders_table));

Handling the SELECT Clause

Similarly, RecursivelyPlanAllSubqueries is called for the SELECT clause to replace any existing subqueries.

-- Main query FROM clause is recurring, but SELECT clause contains a subquery from orders_table (distributed table)
SELECT 
	(SELECT COUNT(*) FROM orders_table WHERE status = 'shipped') AS shipped_orders, country_name 
FROM country_codes;

In both examples, since the main query's FROM clause is recurring and involves subqueries on distributed tables in WHERE or SELECT, Citus uses RecursivelyPlanAllSubqueries to manage these subqueries.

Logical Planner & Optimizer

At the high level, multi-task queries go through the logical planner. However, when it comes to query pushdown or the recursive planner, the logical planner does very little. Most of its complexity deals with multi-shard queries that don't fall into these categories.

Simple Example

The simplest example of a query processed by the logical planner would be:

SELECT * FROM users_table;
Academic Background

The logical planner implements the concepts from the paper: "Correctness of query execution strategies in distributed databases." The paper is available here.

If you find the paper hard to read, Marco provides a good introduction to the same concepts in the following presentation:

Core Functions

We assume you have either watched the video or read the paper. The core C functions involved are MultiLogicalPlanCreate(), MultiNodeTree(), and MultiLogicalPlanOptimize().

The core function MultiLogicalPlanCreate() maps the SQL query to a C structure (e.g., MultiNode). Then MultiLogicalPlanOptimize() applies available optimizations to the MultiNode.

For instance, one simple optimization pushes the "filter" operation below the "MultiCollect." Such rules are defined in the function Commutative() in multi_logical_optimizer.c.

Final Notes

Overall, the algorithm aims to move as much computation as possible closer to the data. This code path has not been updated for a while, so readers should debug the code themselves.

Multi Join Order

Context and Use Case:
This query planning mechanism is primarily geared towards data warehouse type of query planning. It's worth noting that the Citus team has not actively pursued optimizations in this direction, resulting in some non-optimized code paths.

Join Order Optimization:
In Citus' logical planner, the JoinOrderList() function serves to choose the most efficient join order possible. However, its primary focus has been on joins that require repartitioning, as well as some specific non-repartition joins. For example, joins on distribution keys that are not eligible for pushdown planning may pass through this code path, although no optimizations are made in those cases.

Algorithm Simplicity:
The current algorithm, encapsulated in the BestJoinOrder() function, is relatively naive. While it aims to minimize the number of repartition joins, it does not provide a performance evaluation for each of them. This function provides room for performance optimizations, especially when dealing with complex joins that necessitate repartitioning.

Control via GUCs:
Two GUCs control the behavior of repartitioning in Citus: citus.enable_single_hash_repartition_joins and citus.repartition_join_bucket_count_per_node.

  • citus.enable_single_hash_repartition_joins:
    The default value is "off". When "off", both tables involved in the join are repartitioned. When "on", if one table is already joined on its distribution key, only the other table is repartitioned.

  • citus.repartition_join_bucket_count_per_node:
    This setting defines the level of parallelism during repartitioning. The reason for the "off" default is tied to this GUC. Opting for a fixed bucket count, rather than dynamically adjusting based on shard count, provides more stability and safety. If you ever consider changing these defaults, be cautious of the potential performance implications.

Combine Query

  • Overview:
    The multi-task SELECT queries pull results to the coordinator, and the tuples returned always go through the "combine query".

  • Structure and Source:
    The combineQuery can be traced back through the DistributedPlan->combineQuery struct. This query is essentially constructed in the CreatePhysicalDistributedPlan function. However, the actual source comes from MasterExtendedOpNode() within the logical optimizer. For deeper insights into this logic, you can refer to the paper and video links shared under the "Logical Planner & Optimizer" section.

  • Example:
    The simplest example is the following where Citus sends count(*) to the shards, and needs to do a sum() on top of the results collected from the workers.

    SET client_min_messages TO DEBUG4;
    DEBUG:  generated sql query for task 1  
    DETAIL:  query string: "SELECT count(*) AS count FROM public.users_table_102008 users_table WHERE true"  
    ....  
    DEBUG:  combine query: SELECT COALESCE((pg_catalog.sum(count))::bigint, '0'::bigint) AS count FROM pg_catalog.citus_extradata_container(10, NULL::cstring(0), NULL::cstring(0), '(i 1)'::cstring(0)) remote_scan(count bigint)  
    D

    I apologize for the mistake. Thank you for bringing it to my attention. Here's the text with formatting adjustments, without changing or dropping any text or comments.

CTE Processing

  • In Postgres 13 and Later Versions:
    In Postgres 13 and later versions, CTEs (Common Table Expressions) are almost like subqueries. Usually, these CTEs are transformed into subqueries during standart_planning(). Citus follows the same approach via RecursivelyInlineCtesInQueryTree().

  • Additional Consideration in Citus:
    For Citus, there's an additional consideration. CTEs that aren't inlined get materialized. In the Citus context, materialization converts these CTEs into intermediate results. Some users leverage this for achieving full-SQL coverage.

  • Extra Check in Citus:
    Citus includes an extra check before inlining CTEs, conducted by the function TryCreateDistributedPlannedStmt. Here, the planner first tries to inline all CTEs and then checks whether Citus can still plan the query. If not, the CTEs remain as is, leading to their materialization. If all CTEs are materialized (e.g., read_intermediate_result), then the query becomes equivalent of a query on reference table, hence full SQL.

Examples for Better Understanding:
I understand the logic might seem complex at first. Simple examples will be provided for better understanding.

-- a CTE that is inlined as subquery, and does a query-pushdown
WITH cte_1 AS (SELECT DISTINCT user_id FROM orders_table)
SELECT * FROM cte_1;

So, from Citus' query planning perspective the above CTE is equivalent to the following subquery

SELECT * FROM 
   (SELECT DISTINCT user_id FROM orders_table) cte_1;

Once a CTE is inlined, then the rest of the query planning logic kicks in for example, below, the cte is inlined and then because the subquery is NOT safe to pushdown it is recurively planned

WITH cte_1 AS (SELECT DISTINCT product_id FROM orders_table)
SELECT * FROM cte_1;
..
DEBUG:  CTE cte_1 is going to be inlined via distributed planning
DEBUG: generating subplan 81_1 for subquery SELECT DISTINCT product_id FROM public.orders_table
DEBUG:  Plan 81 query after replacing subqueries and CTEs: SELECT product_id FROM (SELECT intermediate_result.product_id FROM read_intermediate_result('81_1'::text, 'binary'::citus_copy_format) intermediate_result(product_id bigint)) cte_1;
-- the same query as the first query
-- but due to MATERIALIZED keyword
-- Citus converts the CTE to intermediate result
WITH cte_1 AS MATERIALIZED (SELECT DISTINCT user_id FROM orders_table)
SELECT * FROM cte_1;

-- the same query as the first query
-- but as the same cte used twice
-- Citus converts the CTE to intermediate result
WITH cte_1 AS (SELECT DISTINCT user_id FROM orders_table)
SELECT * FROM cte_1 as c1 JOIN 
 					cte_1 as c2 USING (user_id);
  • Citus Specific Materialization:
    Citus first tries to inline the CTEs, but if it decides that after inlining the query cannot be supported due Citus' SQL limitations, it lets the CTE to be materialized.

As of writing this document, Citus does NOT support GROUPING SETs on distributed tables/subqueries. So, when we inline the CTE, then Citus would try to plan a query with GROUPING SETs on a distributed table, which would fail. Then, citus would materialize the cte and the final query would be GROUPING SET on an intermediate result, hence can be supported

WITH users_that_have_orders AS (SELECT users_table.* FROM users_table JOIN orders_table USING (user_id))
SELECT 
	max(date_of_birth) 
FROM users_that_have_orders
GROUP BY GROUPING SETS (user_id, email);
...
DEBUG:  CTE users_that_have_orders is going to be inlined via distributed planning
...
DEBUG:  Planning after CTEs inlined failed with
message: could not run distributed query with GROUPING SETS, CUBE, or ROLLUP
hint: Consider using an equality filter on the distributed table''s partition column.
...
DEBUG:  generating subplan 98_1 for CTE users_that_have_orders: SELECT users_table.user_id, users_table.username, users_table.email, users_table.date_of_birth, users_table.country_code FROM (public.users_table JOIN public.orders_table USING (user_id))

INSERT Query Planning

At a High-Level Overview:

  • There are approximately 4 different ways that an INSERT command can be planned in Citus. The first one is the INSERT ... SELECT command, which will be discussed separately.

INSERT with Sublink (Not Supported):

INSERT INTO users_table (user_id) VALUES ((SELECT count(8) FROM orders_table));
ERROR:  subqueries are not supported within INSERT queries
HINT:  Try rewriting your queries with 'INSERT INTO ... SELECT' syntax.

INSERT INTO users_table (user_id) VALUES (1) RETURNING (SELECT count(*) FROM users_table);
ERROR:  subqueries are not supported within INSERT queries
HINT:  Try rewriting your queries with 'INSERT INTO ... SELECT' syntax.

Simple Inserts with a Single VALUES Clause:
-- As noted in the "fast-path router planner", these INSERT commands are planned with fast-path planning. This does not require calling into standard_planner(), and the distribution key should be extracted from the query itself.

INSERT INTO users_table VALUES (1, 'onder', '[email protected]', now() - '5 years'::interval, 'TR');

Main Functions:
The main functions involved in this path are RegenerateTaskListForInsert(), FastPathRouterQuery(), and RouterInsertTaskList. For single-row INSERT tasks, Job->deferredPruning=true, meaning we can always do the shard pruning during execution.

Multi-row INSERTs:
For multi-row INSERTs, RouterInsertTaskList() becomes slightly more interesting. Citus groups rows by target shard.

INSERT INTO orders_table (order_id, user_id) VALUES 
	(1, 1), (2, 2), (3, 1), (4, 3), (5, 2);

Debug Info:
Debug information shows how the query is rebuilt for different user_ids.

-- for user_id: 1
DEBUG:  query after rebuilding:  INSERT INTO public.orders_table_102041 AS citus_table_alias (order_id, user_id) VALUES ('1'::bigint,'1'::bigint), ('3'::bigint,'1'::bigint)

-- for user_id: 3
DEBUG:  query after rebuilding:  INSERT INTO public.orders_table_102055 AS citus_table_alias (order_id, user_id) VALUES ('4'::bigint,'3'::bigint)

-- for user_id: 2
DEBUG:  query after rebuilding:  INSERT INTO public.orders_table_102064 AS citus_table_alias (order_id, user_id) VALUES ('2'::bigint,'2'::bigint), ('5'::bigint,'2'::bigint)

INSERT.. SELECT and MERGE Command Query Planning

Overview:
-- This section discusses INSERT .. SELECT and MERGE commands, which share almost identical planning logic.

Planning Methods:
Broadly, there are three methods to plan these commands:

  1. Pushdown
  2. Pull-to-coordinator
  3. Repartition

Performance Considerations:
When it comes to performance and resource utilization, pushdown is generally the most efficient. For handling large data sizes, the repartition method scales better than the pull-to-coordinator method.

Further Reading:
For more detailed information on pushdown and repartition methods, refer to this blog post. The post focuses on the MERGE command but is also applicable to INSERT .. SELECT.

Examples:
The following section will delve into examples, starting with simple ones and moving to more complex scenarios.

Certainly, here's your text with formatting adjustments for improved readability. I've made sure not to alter any text, comments, or SQL code.

INSERT.. SELECT Advanced Scenarios

Overview:
The INSERT .. SELECT pushdown logic builds upon the pushdownplanning for SELECT commands. The key requirements include colocated tables and matching distribution columns. Relevant C functions are CreateDistributedInsertSelectPlan, DistributedInsertSelectSupported(), and AllDistributionKeysInQueryAreEqual.

Additional Conditions for INSERT .. SELECT pushdown:

  • The destination table's distribution keys should match the source query's distribution column.

Simplest INSERT .. SELECT Pushdown Example:

INSERT INTO users_table SELECT * FROM users_table;

INSERT .. SELECT with Subqueries/Joins:
Provided subqueries can be pushed down, additional checks such as matching distribution columns are performed.

INSERT INTO users_table 
  SELECT users_table.* FROM users_table, 
         (SELECT user_id FROM users_table JOIN orders_table USING (user_id)) as foo
  WHERE foo.user_id = users_table.user_id;

Non-pushdownable Scenarios:

Due to Distribution Key Mismatch:
Citus opts for repartitioning since no "merge step" is needed for the SELECT query. The deciding function is IsRedistributablePlan().

INSERT INTO users_table (user_id) SELECT user_id + 1 FROM users_table;

Due to LIMIT:
The SELECT query requires a "merge step" for the LIMIT clause. Citus uses the pull-to-coordinator strategy.

INSERT INTO users_table SELECT * FROM users_table LIMIT 5;

Pull-to-Coordinator Details:
Citus typically pulls SELECT results and initiates a COPY command to the destination table. See NonPushableInsertSelectExecScan().

Special Cases:
ON CONFLICT or RETURNING:
In these cases, a simple COPY is insufficient. Citus pushes results as "colocated intermediate files" on the workers, which are colocated with the target table's shards. Then, Citus performs an INSERT .. SELECT on these colocated intermediate results. See ExecutePlanIntoColocatedIntermediateResults() and GenerateTaskListWithColocatedIntermediateResults().

Example: Pull-to-coordinator with COPY back to shards:

INSERT INTO users_table SELECT * FROM users_table LIMIT 5;

Example: Pull-to-coordinator with push as colocated intermediate results:

INSERT INTO users_table SELECT * FROM users_table LIMIT 5 ON CONFLICT(user_id) DO NOTHING;

Certainly, here's your text with adjusted formatting for better readability. No text, comments, or SQL code have been changed.

MERGE Command Query Planning

Overview:
The MERGE command planning is similar to INSERT .. SELECT. The key difference is in the pull-to-coordinator strategy. MERGE always uses "colocated intermediate result" files, as the final executed command must be a MERGE command, not a COPY. The entry function in the code is CreateMergePlan().

Further Reading:
For more insights, check out this blog post.

Pushdown MERGE Example:
The join is based on the distribution key.

MERGE INTO users_table u
USING orders_table o
ON (u.user_id = o.user_id)
WHEN MATCHED AND o.status = 'DONE' THEN DELETE;

Pull-to-Coordinator MERGE Example:
The source query requires a "merge step" on the coordinator.

MERGE INTO users_table u
USING (SELECT * FROM orders_table ORDER BY order_date LIMIT 50) o
ON (u.user_id = o.user_id)
WHEN MATCHED AND o.status = 'DONE' THEN DELETE;

Repartition MERGE Example:
The join is NOT on the distribution key, and the source query doesn't require a "merge step" on the coordinator. Note that this example is mostly hypothetical to illustrate the case.

MERGE INTO users_table u
USING (SELECT * FROM orders_table ORDER BY order_date) o
ON (u.user_id = o.product_id)
WHEN MATCHED AND o.status = 'DONE' THEN DELETE;

Certainly, here's your SQL documentation with improved formatting. No text, SQL code, or comments have been altered, only the format has been adjusted for readability.

UPDATE / DELETE Planning

Overview:
The planning logic for UPDATE/DELETE queries is quite similar to what we've discussed for INSERT and MERGE commands. There are essentially four primary methods of planning:

1) Fast-Path Router Planning:
Targets a single shard and filters on the distribution key in the WHERE clause.

UPDATE users_table SET email = '[email protected]' WHERE user_id = 5;

2) Router Planning:
Targets a single shard, but all the shards are on a single node and are colocated.

UPDATE users_table u 
	SET email = ''
	FROM orders_table o 
	WHERE o.user_id = u.user_id AND 
			u.user_id = 5 AND 
			o.status = 'done';

3) Pushdown Planning:
The query can be pushed down to worker nodes, targeting multiple shards. Joins are also possible if they are on distribution keys.

UPDATE users_table SET email = '[email protected]' 
WHERE user_id IN (SELECT user_id FROM orders_table WHERE status = 'in progress');

Additional Example for Pushdown with Materialized CTE:

WITH high_value_users AS (
  SELECT user_id FROM orders_table WHERE status = 'done' ORDER BY order_date LIMIT 50
)
UPDATE users_table SET username = 'High Value'
WHERE user_id IN (SELECT user_id FROM high_value_users);

4) Recursive Planning:
Used for more complex queries, like those with subqueries or joins that can't be pushed down. The queries are planned recursively.

DELETE FROM users_table WHERE user_id 
IN (SELECT user_id FROM orders_table WHERE total > 100 ORDER BY total DESC LIMIT 5);

Certainly! I've formatted your SQL documentation for better readability. No text, SQL code, or comments have been altered, only the format for better clarity.

Correlated/Lateral Subqueries in Planning

Overview:
Correlated or LATERAL subqueries have special behavior in Citus. They can often be pushed down, especially when the join is on the distribution key. There are limitations for joins not on the distribution key.

Key Code Details:
For more information on the code, check the following functions:
DeferErrorIfCannotPushdownSubquery() ->
ContainsReferencesToOuterQuery() ->
DeferErrorIfSubqueryRequiresMerge().

Example 1: Using LATERAL, where the join is on the distribution key.

SELECT u.*, o_sum.total
FROM users_table u, 
LATERAL (SELECT count(DISTINCT status) as total FROM orders_table o WHERE o.user_id = u.user_id) o_sum;

Example 2: Complex LATERAL with GROUP BY on a non-distribution key. It's pushdownable because the join is on the distribution key.

SELECT u.*, o_info.product, o_info.total
FROM users_table u,
LATERAL (
  SELECT o.product_id as product, count(DISTINCT o.status) as total
  FROM orders_table o WHERE o.user_id = u.user_id
  GROUP BY o.product_id
) o_info;

Debug and Error Messages:
When it's not possible to push down correlated subqueries, recursive planning also can't be used.

SELECT u.*
FROM users_table u,
LATERAL (
  SELECT o.product_id as product
  FROM orders_table o WHERE o.user_id != u.user_id
) o_info;

DEBUG:  skipping recursive planning for the subquery since it contains references to outer queries
ERROR:  complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
/*
Distributed Query Planning in Citus as of Citus 12.1
Goal:
This document aims to provide a comprehensive understanding of how Citus
handles distributed query planning. We will use a set of realistic tables
to demonstrate various query operations. Through these examples, we hope
to offer a step-by-step guide on how Citus chooses to distribute data
and execute queries.
Non-Goal:
This document does not aim to provide the high level design of the
distributed planner, but instead the lowest levels of details
possible. For high-level flow, go to https://postgresconf.org/system/events/document/000/000/233/Distributing_Queries_the_Citus_Way.pdf
This is part of a larger effort to transfer knowledge about Citus'
internal operations to the development team.
Tables Included:
- Users Table (Distributed)
- Orders Table (Distributed)
- Products Table (Distributed)
- Country Codes (Reference)
- Order Status (Reference)
- Product Categories (Reference)
authors: [email protected], [email protected]
*/;
-- Distributed Table: Users Table
CREATE TABLE users_table (
user_id bigserial PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(50),
date_of_birth DATE,
country_code VARCHAR(3)
);
SELECT create_distributed_table('users_table', 'user_id');
-- Distributed Table: Orders Table
CREATE TABLE orders_table (
order_id bigserial,
user_id BIGINT REFERENCES users_table(user_id),
product_id BIGINT,
order_date TIMESTAMPTZ,
status VARCHAR(20)
);
SELECT create_distributed_table('orders_table', 'user_id');
-- Distributed Table: Products Table
CREATE TABLE products_table (
product_id bigserial PRIMARY KEY,
product_name VARCHAR(100),
category_id INT,
price NUMERIC(10, 2)
);
SELECT create_distributed_table('products_table', 'product_id');
-- Reference Table: Country Codes
CREATE TABLE country_codes (
country_code VARCHAR(3) PRIMARY KEY,
country_name VARCHAR(50)
);
-- Reference Table: Order Status
CREATE TABLE order_status (
status VARCHAR(20) PRIMARY KEY,
description TEXT
);
SELECT create_reference_table('order_status');
-- Reference Table: Product Categories
CREATE TABLE product_categories (
category_id INT PRIMARY KEY,
category_name VARCHAR(50)
);
SELECT create_reference_table('product_categories');
------------------------------------
-- Fast Path Router Planner
------------------------------------
-- The Fast Path Router Planner is specialized in optimizing queries that are
-- both simple in structure and certain to touch a single shard. Importantly,
-- it targets queries on a single shard distributed, citus local or reference
-- tables. This does not mean the planner is restricted to trivial queries;
-- it can handle complex SQL constructs like `GROUP BY`, `HAVING`, `DISTINCT`,
-- etc., as long as these operate on a single table and involve an equality
-- condition on the distribution key (`distribution_key = X`). The main SQL
-- limitation for fast path distributed query planning is the subquery/CTE
-- support. Those are left to the next planner: Router planner.
-- The aim of this planner is to avoid relying on PostgreSQL's standard_planner()
-- for planning, which performs unnecessary computations like cost estimation,
-- irrelevant for distributed planning. Skipping the standard_planner has
-- significant performance gains for OLTP workloads.
-- By focusing on "shard-reachable" queries, the Fast Path Router Planner
-- is able to bypass the need for more computationally expensive planning
-- processes, thereby accelerating query execution.
-- Main C Functions Involved:
-- `FastPathRouterPlan()`: The primary function for creating the fast-path query plan.
-- `FastPathRouterQuery()`: Validates if a query is eligible for fast-path routing by
-- checking its structure and the WHERE clause.
-- with set client_min_messages to debug4;
-- you should see the following in the DEBUG messages:
-- "DEBUG: Distributed planning for a fast-path router query"
-- Fetches the count of users born in the same year, but only
-- for a single country
SELECT EXTRACT(YEAR FROM date_of_birth) as birth_year, COUNT(*)
FROM users_table
WHERE country_code = 'USA' AND user_id = 15
GROUP BY birth_year
HAVING COUNT(*) > 10;
-- all INSERT commands are by definition fast path
-- router queries in the sense that they do not
-- need any information from Postgres' standard_planner()
INSERT INTO orders_table (user_id, product_id, order_date, status)
VALUES (42, 555, now(), 'NEW');
-- UPDATE/DELETEs can also be qualified as fast path router
-- queries
UPDATE products_table SET price = price * 1.1 WHERE product_id = 555;
-- fast path queries has another important characteristics
-- names "deferredPruning"
-- For regular queries, Citus does the shard pruning during the
-- planning phase, meaning that the shard that the query touches
-- are calculated during the planning phase. However, in an ideal
-- world, the shard pruning should happen during the execution
-- and, for certain class of queries, we support that. In the code,
-- that is denoted by "Job->deferredPruning" field.
-- given that fast path queries are performance critical, they
-- can be planned with prepared statements. When this is done,
-- "Job->deferredPruning" becomes "true". And, the meaning of that
-- is Citus can support PREPARED statements as expected. The first
-- 6 executions of the plan does distributed planning, rest is cached
-- similar to Postgres' plan caching, and the shard pruning is done
-- during the execution phase. And, if you attach a debugger, you'd see
-- that on the first 6th execution, the debugger will stop at
-- distributed_planner() function, but on the rest, it will not.
-- The shard prunning for the cached command will happen in
-- CitusBeginScan() function
-- To see that in action, checkout the DEBUG messages:
set client_min_messages to debug4;
PREPARE p1 (bigint) AS SELECT * FROM users_table WHERE user_id = $1;
-- 1st execute
execute p1(1);
DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan
....
┌─────────┬──────────┬───────┬───────────────┬──────────────┐
│ user_id │ username │ email │ date_of_birth │ country_code │
├─────────┼──────────┼───────┼───────────────┼──────────────┤
└─────────┴──────────┴───────┴───────────────┴──────────────┘
(0 rows)
-- 2nd execute
execute p1(1);
DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan
....
┌─────────┬──────────┬───────┬───────────────┬──────────────┐
│ user_id │ username │ email │ date_of_birth │ country_code │
├─────────┼──────────┼───────┼───────────────┼──────────────┤
└─────────┴──────────┴───────┴───────────────┴──────────────┘
(0 rows)
...
execute p1(1);
execute p1(1);
execute p1(1);
...
-- 6th execute
execute p1(1);
DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan
....
┌─────────┬──────────┬───────┬───────────────┬──────────────┐
│ user_id │ username │ email │ date_of_birth │ country_code │
├─────────┼──────────┼───────┼───────────────┼──────────────┤
└─────────┴──────────┴───────┴───────────────┴──────────────┘
(0 rows)
-- now, on the 7th execute, you would **NOT** see the fast-path
-- planning anymore, because the query comes from Postgres'
-- query cache
execute p1(1);
DEBUG: constraint value: '1'::bigint
DEBUG: shard count after pruning for users_table: 1
DEBUG: opening 1 new connections to localhost:9702
DEBUG: established connection to localhost:9702 for session 8 in 9499 microseconds
DEBUG: task execution (0) for placement (46) on anchor shard (102049) finished in 1281 microseconds on worker node localhost:9702
DEBUG: Total number of commands sent over the session 8: 1 to node localhost:9702
┌─────────┬──────────┬───────┬───────────────┬──────────────┐
│ user_id │ username │ email │ date_of_birth │ country_code │
├─────────┼──────────┼───────┼───────────────┼──────────────┤
└─────────┴──────────┴───────┴───────────────┴──────────────┘
(0 rows)
----------------------------------
-- Router Planner in Citus
----------------------------------
-- The Router Planner plays a key role in Citus' query optimization
-- landscape. While sharing some common traits with the Fast Path
-- Router Planner, it offers unique capabilities as well.
-- Router (and fast path router) planners are the bedrock for the
-- multi-tenant use cases.
-- Similarities with Fast Path Router Planner:
-- **Single Node Routing**: Both planners send queries to a single
-- node. Unlike the Fast Path Planner, the Router Planner can
-- work with multiple colocated tables, provided they have
-- filters on their distribution columns.
-- **Query Routing Mechanics**: Router Planner takes the query,
-- verifies if it can be routed, and if so, it replaces
-- original table names with their corresponding shard names,
-- directing the query to the appropriate nodes.
-- Differences:
-- **Subqueries and CTEs**: The Router Planner can manage subqueries and Common Table Expressions (CTEs),
-- routing the entire query to a single node as long as all involved
-- tables have filters on their distribution columns.
-- **Standard Planner Reliance**: Router Planner relies on
-- PostgreSQL's `standard_planner()` to learn the necessary
-- filter restrictions on the tables.
-- Main C Functions Involved:
-- `PlanRouterQuery()`: Responsible for creating the router plan.
-- `TargetShardIntervalsForRestrictInfo()`: Retrieves the shard
-- intervals based on restrictions provided by PostgreSQL's
-- `standard_planner()`.
-- Fetch user data and their respective orders for a given user_id
SELECT u.username, o.order_id
FROM users_table u, orders_table o
WHERE u.user_id = o.user_id AND u.user_id = 42;
-- With Subqueries:
-- Fetch the username and their total order amount
-- for a specific user
SELECT u.username,
(SELECT MAX(o.product_id) FROM orders_table o
WHERE o.user_id = 42 AND
o.user_id = u.user_id)
FROM users_table u
WHERE u.user_id = 42;
-- Router planner works with CTEs (and UPDATE/DELETE Query):
-- Update the status of the most recent order for a specific user
WITH RecentOrder AS (
SELECT MAX(order_id) as last_order_id
FROM orders_table
WHERE user_id = 42
)
UPDATE orders_table
SET status = 'COMPLETED'
FROM RecentOrder
WHERE orders_table.user_id = 42 AND
orders_table.order_id = RecentOrder.last_order_id;
----------------------------------------
--- Query Pushdown Planning in Citus
----------------------------------------
-- While Router and Fast-Path Router Planners are proficient
-- at dealing with single-shard commands—making them ideal for
-- multi-tenant and OLTP applications—Citus also excels in
-- analytical use-cases. In these scenarios, a single query
-- is broken down into multiple parallel sub-queries, which
-- are run on various shards across multiple machines, thereby
-- speeding up query execution times significantly.
-- What is Query Pushdown Planning?
-- Query Pushdown Planning is an extension of the Router Planning
-- paradigm. Unlike the latter, which deals with single-shard,
-- single-node queries, Query Pushdown can route a query to
-- multiple shards across multiple nodes. Instead of verifying that
-- all tables have the same filters, as in Router Planning, Query
-- Pushdown ascertains that all tables are joined on their
-- distribution keys.
-- The core C function responsible for this check is
-- `RestrictionEquivalenceForPartitionKeys()`, which ensures that
-- tables in the query are joined based on their distribution keys.
-- Initially intended for subqueries, Query Pushdown has been extended
-- to include other cases as well. The decision to utilize
-- Query Pushdown is determined by the `ShouldUseSubqueryPushDown()` function.
-- Understanding Query Pushdown Planning and how it extends the
-- simpler Router Planning can help you fully utilize Citus
-- for your analytical workloads.
-- Key Characteristics of Query Pushdown:
-- **High Parallelism**: The query is broken down into multiple
-- sub-queries, leading to parallel execution on multiple shards and nodes.
-- **Worker Subquery**: You will typically notice the alias
-- `worker_subquery` in the SQL queries sent to the shards, indicating a pushdown operation.
-- To give a simple example of query pushdown, see below where
-- all the tables involved are transitively joined on the
-- distribution keys. This allows the planner to understand that
-- it is safe to pushdown the join to the worker nodes almost as-is
-- Count of distinct product_ids where user_ids from two different tables match
SELECT count(DISTINCT product_id)
FROM (
SELECT DISTINCT user_id as distinct_user_id
FROM users_table
) foo, orders_table
WHERE orders_table.user_id = distinct_user_id;
-- Subquery in Target List:
-- Sum of max price per product category, filtered by a subquery in the target list
SELECT
(SELECT MAX(price) FROM products_table p WHERE p.category = o.category),
COUNT(DISTINCT o.product_id)
FROM orders_table o, users_table u
WHERE o.user_id = u.user_id AND u.user_id IN
(SELECT user_id FROM special_users_table)
GROUP BY o.category;
-- Subquery in WHERE Clause:
-- Number of distinct users who have placed an order
SELECT COUNT(DISTINCT u.user_id)
FROM users_table u
WHERE u.user_id IN (
SELECT o.user_id
FROM orders_table o
);
-- Count of distinct products per user, with maximum order date from orders
-- as a subquery in the target list
SELECT
(SELECT MAX(o.order_date) FROM orders_table o WHERE o.user_id = u.user_id),
COUNT(DISTINCT o.product_id)
FROM orders_table o, users_table u
WHERE o.user_id = u.user_id
GROUP BY u.user_id;
-- In contrast to SELECT queries, UPDATE and DELETE queries
-- that are pushdown-eligible do not have the "worker_subquery"
-- alias in the queries that are sent to the shards.
-- UPDATE with Query Pushdown
-- Update status in orders_table for users whose email ends
-- with '@example.com'
UPDATE orders_table o
SET status = 'DISCOUNTED'
FROM users_table u
WHERE o.user_id = u.user_id AND u.email LIKE '%@example.com';
-- DELETE with Query Pushdown
-- Delete orders for users who were born before '2000-01-01'
DELETE FROM orders_table o
USING users_table u
WHERE o.user_id = u.user_id AND u.date_of_birth < '2000-01-01';
--------------------------------------
-- Recursive Planning
-------------------------------------
-- Central to understanding Citus' approach to distributed query
-- planning are two closely interrelated concepts:
-- "Query Pushdown Planning" and "Recursive Planning."
-- These dual strategies lay the foundation for Citus'
-- capacity to manage complex query structures across
-- multiple shards and nodes effectively.
-- While Query Pushdown Planning optimizes queries by breaking
-- them into smaller components that can run in parallel across
-- multiple shards, Recursive Planning takes a more nuanced approach.
-- It works its way through the query tree from the deepest
-- level upwards, scrutinizing each subquery to determine its
-- suitability for pushdown.
-- The essence of recursive planning lies in treating
-- each recursively planned query in isolation. This
-- means correlated subqueries can't take advantage
-- of recursive planning. However, (sub)queries on local
-- tables can be done via recursive planning.
-- This process is primarily executed in
-- the `RecursivelyPlanSubqueryWalker()` C function.
-- In this function, the engine goes to the innermost subquery
-- and assesses whether it can safely be pushed down as a
-- stand-alone query. If it can, the query engine simply moves on.
-- However, if the subquery isn't suitable for pushdown, Citus
-- generates a separate "sub-plan" for that subquery, substituting
-- it with a `read_intermediate_result()` function call. These
-- sub-plans are later executed as independent queries, a task
-- overseen by the `ExecuteSubPlans()` function.
-- The engine continues this way, moving upward through each level
-- of subqueries, evaluating and, if needed, creating sub-plans until
-- it reaches the top-level query.
-- Intermediate Results as Reference Tables
-- One of the key aspects of Recursive Planning is the use of
-- "intermediate results." These are essentially the outcomes
-- of subqueries that have been recursively planned and executed
-- on worker nodes. Once these intermediate results are obtained,
-- they are treated much like reference tables in the subsequent
-- stages of query planning and execution. The key advantage here
-- is that, like reference tables, these intermediate results can
-- be joined with distributed tables on any column, not just the
-- distribution key.
-- Full SQL Coverage via Recursive Planning
-- The practice of recursively creating sub-plans and generating
-- intermediate results offers a workaround for achieving full
-- SQL coverage in Citus. If each subquery in a complex SQL query
-- can be replaced with an intermediate result, then the entire
-- query essentially becomes a query on a reference table.
-- This feature is a crucial aspect for many users who require
-- comprehensive SQL support in their distributed systems.
-- Trade-offs
-- While Recursive Planning brings a lot to the table, it's not
-- without its drawbacks. First, the method inherently adds more
-- network round-trips, as each recursively planned query is
-- executed separately, and its results are pushed back to all
-- worker nodes. Secondly, when functions like
-- `read_intermediate_results` are used to fetch data from these
-- intermediate results, it can confound the Postgres planner,
-- particularly in the context of complex joins. As a result,
-- query estimations may be inaccurate, leading to suboptimal
-- execution plans.
-- Understanding these facets of Recursive Planning can provide
-- you with a comprehensive view of how Citus approaches
-- distributed query planning, allowing you to better optimize your
-- database operations.
-- This may seem complex at first glance, but it's a bit
-- like a step-by-step puzzle-solving process that the Citus
-- query engine performs to optimize your database queries
-- effectively. To help clarify these intricate mechanics,
-- we'll present a series of examples.
-- Recursive Plan Example 1:
-- in the simplest example, we'll have a single
-- subquery which is NOT pushdown-safe due to LIMIT 1,
-- hence creating a subplan
SET client_min_messages TO DEBUG1;
SELECT count(*) FROM (SELECT * FROM users_table LIMIT 1) as foo;
SET
Time: 0.765 ms
DEBUG: push down of limit count: 1
DEBUG: generating subplan 7_1 for subquery SELECT user_id, username, email, date_of_birth, country_code FROM public.users_table LIMIT 1
DEBUG: Plan 7 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.user_id, intermediate_result.username, intermediate_result.email, intermediate_result.date_of_birth, intermediate_result.country_code FROM read_intermediate_result('7_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id bigint, username character varying(50), email character varying(50), date_of_birth date, country_code character varying(3))) foo
┌───────┐
│ count │
├───────┤
│ 0 │
└───────┘
(1 row)
-- Recursive Plan Example 2:
-- now, we have multiple subqueries in the same level
-- which are NOT pushdown-safe due to LIMIT 1 and GROUP BY
-- non distribution keys, hence creating a subplan
SELECT count(*) FROM
(SELECT * FROM users_table LIMIT 1) as foo,
(SELECT count(*) FROM users_table GROUP BY country_code) as bar;
DEBUG: push down of limit count: 1
DEBUG: generating subplan 9_1 for subquery SELECT user_id, username, email, date_of_birth, country_code FROM public.users_table LIMIT 1
DEBUG: generating subplan 9_2 for subquery SELECT count(*) AS count FROM public.users_table GROUP BY country_code
DEBUG: Plan 9 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.user_id, intermediate_result.username, intermediate_result.email, intermediate_result.date_of_birth, intermediate_result.country_code FROM read_intermediate_result('9_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id bigint, username character varying(50), email character varying(50), date_of_birth date, country_code character varying(3))) foo, (SELECT intermediate_result.count FROM read_intermediate_result('9_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) bar
┌───────┐
│ count │
├───────┤
│ 0 │
└───────┘
(1 row)
-- Recursive Plan Example 3:
-- we have a subquery foo that is NOT safe-to-pushdown
-- but once that subquery is replaced with an intermediate
-- result, the rest of the query becomes safe-to-pushdown
SELECT count(*) FROM
(SELECT 1 FROM (SELECT user_id FROM users_table LIMIT 1) as foo,
(SELECT * FROM orders_table) as o1,
(SELECT * FROM users_table) as u2
WHERE
foo.user_id = o1.user_id AND
o1.user_id = u2.user_id) as top_level_subquery;
DEBUG: push down of limit count: 1
DEBUG: generating subplan 1_1 for subquery SELECT user_id FROM public.users_table LIMIT 1
DEBUG: Plan 1 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT 1 AS "?column?" FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('1_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id bigint)) foo, (SELECT orders_table.order_id, orders_table.user_id, orders_table.product_id, orders_table.order_date, orders_table.status FROM public.orders_table) o1, (SELECT users_table.user_id, users_table.username, users_table.email, users_table.date_of_birth, users_table.country_code FROM public.users_table) u2 WHERE ((foo.user_id OPERATOR(pg_catalog.=) o1.user_id) AND (o1.user_id OPERATOR(pg_catalog.=) u2.user_id))) top_level_subquery("?column?")
-------------------------------------------------
--- More advanced recursive planning constructs
-------------------------------------------------
-- In the previous parts of the recursive planning examples,
-- we only dealt with a subquery at a time.
-- However, recursive planning is capable of considering multiple
-- subqueries in the same query level or converting tables to
-- subqueries in the same level. In this part of the document,
-- let's discuss these advanced query planning capabilities.
-------------------------------------------------
--- Set operations via recursive planning (and query pushdown)
-------------------------------------------------
-- If you think about set operations (UNION/UNION ALL/EXCEPT),
-- they are essentially two subqueries in the same query level.
-- The reader should admit that the rules for set operation
-- planning on Citus can be confusing and should be taken
-- carefully
-- Citus is capable of "pushing down" certain set operations: UNION and UNION ALL
-- there are two rules for allowing to push down set operations,
-- see `SafeToPushdownUnionSubquery()` C code.
-- First, the set operation cannot be on the top level. It should be
-- wrapped into a subquery. This is purely an implementation limitaion
-- that can/should be eased. Second, for all subqueries, each
-- leaf query should have "distribution key" on the target list
-- and the ordinal position of the "distribution keys" should match
-- for all the set operations. The second limitation is required
-- to preserve the correctness. We cannot return correct results
-- unless the distribution keys match in the target list
-- Set operation pushdown example:
-- safe to pushdown
SELECT * FROM (SELECT * FROM users_table UNION SELECT * FROM users_table) as foo;
-- not safe to pushdown because the
-- set operations is NOT wrapped into subquery
-- each leaf query is recursively planned
SELECT * FROM users_table UNION SELECT * FROM users_table;
-- not safe to pushdown because the
-- distribution columns do NOT match
-- (e.g., not existing)
SELECT * FROM (SELECT username FROM users_table UNION
SELECT username FROM users_table) as foo;
-- not safe to pushdown because the
-- distribution columns do NOT match
SELECT * FROM (SELECT user_id + 1 FROM users_table UNION
SELECT user_id - 1 FROM users_table) as foo;
-- EXCEPT is never safe to pushdown
SELECT * FROM (SELECT * FROM users_table EXCEPT SELECT * FROM users_table) as foo;
-------------------------------------------------
--- Set operations and joins
-------------------------------------------------
-- although not very common, some users might have
-- joins along with set operations so we might have
-- queries like the following:
-- (SELECT .. t1 JOIN t2) UNION (SELECT t2 JOIN t3)
-- (SELECT .. t1 UNION SELECT t2) JOIN t3 ..
-- ((SELECT .. t1 JOIN t2) UNION (SELECT t2 JOIN t3)) JOIN t4
-- in all these cases, the similar rules applies
-- JOINs should happen on the distribution keys
-- and SET operations should suffice `SafeToPushdownUnionSubquery()`
-- when combined, all conditions should match
-- safe to pushdown
-- all joins are on the distribution key
-- and all the unions have distribution key
-- in the same ordinal position
SELECT * FROM (
(SELECT user_id FROM users_table u1 JOIN users_table u2 USING (user_id))
UNION
(SELECT user_id FROM users_table u1 JOIN users_table u2 USING (user_id))
) as foo;
-- safe to pushdown
-- all joins are on the distribution key
-- and all the unions have distribution key
-- in the same ordinal position
SELECT * FROM
(SELECT user_id FROM users_table u1 UNION
SELECT user_id FROM users_table u2) as foo
JOIN
users_table u2
USING (user_id);
-------------------------------------------------
--- HAVING subqueries via recursive planning
-------------------------------------------------
-- Postgres allows HAVING clause to have subqueries
-- and when the subqueries in the HAVING clause doesn't
-- have any references from the outer query (e.g., not corrolated)
-- then, we can recursively plan the subquery in HAVING clause
-- see `RecursivelyPlanAllSubqueries()` function and its usage
-- on the HAVING clause
-- Find the user_ids who have placed more orders than the average number of orders per user.
SELECT
u.user_id,
COUNT(o.order_id) AS total_orders
FROM
users_table u
JOIN
orders_table o ON u.user_id = o.user_id
GROUP BY
u.user_id
HAVING
COUNT(o.order_id) > (SELECT AVG(order_count) FROM (
SELECT
user_id,
COUNT(order_id) AS order_count
FROM
orders_table
GROUP BY
user_id) AS subquery);
-------------------------------------------------
--- Non-colocated subqueries via recursive planning
-------------------------------------------------
-- assume that there are two subqueries, each subquery is individually
-- joined on their distirbution keys. However, the two subqueries
-- are joined on arbitrary keys. In that case, the non-colocated
-- subquery join logic kicks in, see `RecursivelyPlanNonColocatedSubqueries()`
-- Find users who do not have orders with status 'shipped' and 'pending'
-- sub1 and sub2 are individually safe to pushdown subqueries
-- such that if you run the queries separately, you'll see that
-- the queries are pushed down to the worker nodes
-- however sub1 and sub2 has the following join condition: sub1.user_id != sub2.user_id
-- meaning that, they do not preserve the distribution key equality across
-- the subqueries
-- in that case, citus qualifies sub1 as the anchor subquery, see CreateColocatedJoinChecker()
-- then, checks all other subqueries whether they are joined on the
-- distribution key with the sub1.
-- in this specific example, sub2 is not joined on the distribution
-- key, hence Citus decides to recursively plan the whole sub2
-- and supports the SQL command
SELECT a.user_id, b.user_id
FROM (
SELECT u.user_id
FROM users_table u
JOIN orders_table o ON u.user_id = o.user_id
WHERE o.status = 'shipped'
GROUP BY u.user_id
) AS sub1
JOIN (
SELECT u.user_id
FROM users_table u
JOIN orders_table o ON u.user_id = o.user_id
WHERE o.status = 'pending'
GROUP BY u.user_id
) AS sub2 ON sub1.user_id != sub2.user_id;
-- similar logic also works for subqueries
-- in where clause
-- both the query in the FROM clause and subquery
-- in the WHERE clause are individually safe to pushdown
-- however, as a whole the query is not safe to pushdown
-- hence Citus decides to recursively plan the subquery in
-- WHERE clause via `RecursivelyPlanNonColocatedSubqueries` logic
-- Find orders that were placed on the same date as orders with status 'shipped'
SELECT o1.order_id, o1.order_date
FROM orders_table o1, users_table u1
WHERE o1.user_id = u1.user_id
AND o1.order_date IN (
SELECT o2.order_date
FROM orders_table o2, users_table u2
WHERE o2.user_id = u2.user_id AND o2.status = 'shipped'
);
-------------------------------------------------
--- Local table - distributed table JOINs via recursive planning
-------------------------------------------------
-- In Citus, when a query involves a join between a local table
-- and a distributed table, special handling is required
-- because one part of the data resides on the Citus coordinator
-- node (local table), while the other part is distributed across
-- multiple worker nodes (distributed table).
-- This is where "Recursive Planning for Local Table and Distributed Table Joins"
-- comes into play. At the core of this mechanism is the
-- `RecursivelyPlanLocalTableJoins()` C function.
-- Local & distributed table joins has an important performance
-- charecteristics. It pushes down the filters and projections
-- meaning that it only pulls the relevant data to the coordinator
-- not the whole table. See `RequiredAttrNumbersForRelation()` and
-- `ReplaceRTERelationWithRteSubquery()` functions
-- How It Works:
-- 1. Citus traverses the query tree to identify joins between local tables and distributed tables.
-- 2. Once such a join is identified, Citus creates a "sub-plan" for the local table.
-- 3. This sub-plan effectively pulls the relevant data from the local table into an intermediate result and distributed across the worker nodes.
-- 4. The original query is then rewritten to replace the local table with the intermediate results
-- 5. Finally, this new query, which now involves only distributed tables, is executed using Citus's standard distributed query execution engine.
-- For example, consider a local table `local_users` and
-- a distributed table `orders_table`. A query like:
SELECT *
FROM local_users l, orders_table o
WHERE l.user_id = o.user_id;
-- Would be internally transformed by Citus as if you had
-- written something like:
-- Create a temporary reference table and populate it with data from the local table
CREATE TEMP TABLE temp_local_users AS SELECT * FROM local_users;
SELECT create_reference_table('temp_local_users');
-- Then execute the original query but replace the local table with the temporary distributed table
SELECT *
FROM temp_local_users t, orders_table o
WHERE t.user_id = o.user_id;
-- This ensures that Citus can execute the query in
-- a parallel manner across multiple nodes, improving performance.
-- Also by tweaking citus.local_table_join_policy, you
-- can fine-tune how Citus behaves when it encounters a
-- query involving local and distributed tables, giving you
-- more control over the query performance. The default action
-- is to pull the data from the local tables to the coordinator,
-- which the exception that when distributed table has a filter
-- on the primary key / unique index. In that case, we know that
-- at most 1 row is expected from the distributed table
-- now, prefer to pull the distributed table
-- as it is guranteed to return at most one row
SELECT *
FROM local_users l, orders_table o
WHERE l.user_id = o.user_id AND o.primary_key = 55;
-------------------------------------------------
--- Ref table LEFT JOIN distributed table JOINs via recursive planning
-------------------------------------------------
-- Very much similar to the local - distributed table joins
-- Citus cannot pushdown queries in the form of:
-- "... ref_table LEFT JOIN distributed_table"
-- in fact, this is true when the outer side is recurring
-- tuple (e.g., reference table or intermediate results or
-- set returning functions etc)
-- in such cases, Citus recursively plans the "distributed"
-- part of the join. Although it might sound too much to
-- recursively plan a distributed table, remember that
-- Citus pushes down the filters and projections. See
-- `RequiredAttrNumbersForRelation()` and
-- `ReplaceRTERelationWithRteSubquery()` functions
-- The main function responsible from this logic
-- is RecursivelyPlanRecurringTupleOuterJoinWalker()
-- note that there are probably a good amount of optimizations
-- we can do (e.g., first pushdown an inner JOIN then outer join)
-- but those were complex to implement, hence we have not
-- done so.
-- Count the number of orders for each status, but only include statuses that also appear in the reference table
-- see the debug messages, we only pulled the relevant columns
SELECT os.status, COUNT(o.order_id)
FROM order_status os
LEFT JOIN orders_table o ON os.status = o.status
GROUP BY os.status;
...
DEBUG: recursively planning right side of the left join since the outer side is a recurring rel
DEBUG: recursively planning distributed relation "orders_table" "o" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: Wrapping relation "orders_table" "o" to a subquery
DEBUG: generating subplan 45_1 for subquery SELECT order_id, status FROM public.orders_table o WHERE true
...
-------------------------------------------------
--- Recurisve planning when FROM clause has reference table (or recurring tuples)
-------------------------------------------------
-- The code handles a particular case in Citus's recursive
-- query planning logic, dealing with queries where the main
-- query's `FROM` clause is recurring but subqueries in
-- the `SELECT` or `WHERE` clauses include distributed tables.
-- **Recurring**: Here, "recurring" means that the `FROM` clause
-- doesn't include any distributed tables; it may contain
-- reference tables, local tables, or set-returning functions.
-- **Subqueries in SELECT and WHERE**: If the main query's
-- `FROM` clause is recurring, then no distributed tables
-- can appear in the `SELECT` and `WHERE` subqueries.
-- Citus addresses this situation by recursively planning these problematic subqueries, effectively replacing
-- them with calls to `read_intermediate_result()`.
-- Handling the WHERE Clause:
-- The function `RecursivelyPlanAllSubqueries` is called for the
-- `WHERE` clause (if it exists), transforming all
-- subqueries within it.
-- Main query FROM clause is recurring, but
-- WHERE clause contains a pushdownable subquery from
-- orders_table (distributed table)
SELECT country_name
FROM country_codes
WHERE country_code IN
(SELECT country_code FROM users_table WHERE user_id IN (SELECT user_id FROM orders_table));
-- Handling the SELECT Clause:
-- Likewise, `RecursivelyPlanAllSubqueries` is called for the
-- `SELECT` clause, replacing any subqueries that are there.
-- Main query FROM clause is recurring, but SELECT clause contains a subquery from orders_table (distributed table)
SELECT
(SELECT COUNT(*) FROM orders_table WHERE status = 'shipped') AS shipped_orders, country_name
FROM country_codes;
-- In both examples, since the main query's `FROM` clause
-- is recurring and involves subqueries on distributed
-- tables in `WHERE` or `SELECT`, Citus would call
-- `RecursivelyPlanAllSubqueries` to handle these subqueries.
--------------------------------------------------------
---- Logical Planner & Optimizer
--------------------------------------------------------
-- At a high level, all multi-task queries goes through
-- logical planner. However, for query pushdown / recursive
-- planner, the logical planner does very little. The complexity
-- regarding the logical planner deals mostly with queries that
-- falls other multi-shard cases.
-- the simplest example of a query that goes through
-- logical planner would be as simple as the following query
SELECT * FROM users_table;
-- At a high level, logical planner tries to implement
-- the paper: Correctness of query execution strategies in distributed databases.
-- https://dl.acm.org/doi/pdf/10.1145/319996.320009
-- Reading the paper is hard, right? Thanks to Marco, there is
-- a very good introduction to the same concepts on the following
-- presentation: https://www.youtube.com/watch?v=xJghcPs0ibQ
-- https://speakerdeck.com/marcocitus/scaling-out-postgre-sql
-- Now, we assume you either watched the video or
-- read the paper :) The C functions `MultiLogicalPlanCreate()`
-- `MultiNodeTree()` and `MultiLogicalPlanOptimize()` constitues
-- the core of the logical planning
-- The core of the `MultiLogicalPlanCreate()` is to map the
-- SQL query to a C structure (e.g., `MultiNode`) which then
-- `MultiLogicalPlanOptimize()` can apply the avaliable
-- optimizations for the given `MultiNode`. For example,
-- one of the simplest optimization is to push the "filter"
-- operation below the "MultiCollect". Such rules are defined
-- in the function `Commutative()` defined in
-- `multi_logical_optimizer.c`.
-- Overall, the algorithm aims to push as much computation
-- as possible close to the data. This code path has not been
-- worked on for a while, so the readers should debug the code
-- themselves.
--------------------------------------------------------
---- Multi Join Order
--------------------------------------------------------
-- **Context and Use Case**: This query planning mechanism
-- is primarily geared towards data warehouse type of query
-- planning. It's worth noting that the Citus team has not
-- actively pursued optimizations in this direction,
-- resulting in some non-optimized code paths.
-- **Join Order Optimization**: In Citus' logical planner,
-- the `JoinOrderList()` function serves to choose the most
-- efficient join order possible. However, its primary focus
-- has been on joins that require repartitioning, as well as
-- some specific non-repartition joins. For example, joins
-- on distribution keys that are not eligible for pushdown
-- planning may pass through this code path, although no
-- optimizations are made in those cases.
-- **Algorithm Simplicity**: The current algorithm,
-- encapsulated in the `BestJoinOrder()` function, is
-- relatively naive. While it aims to minimize the
-- number of repartition joins, it does not provide a
-- performance evaluation for each of them. This function
-- provides room for performance optimizations, especially
-- when dealing with complex joins that necessitate
-- repartitioning.
-- Two GUCs control the behavior of repartitioning in Citus:
-- `citus.enable_single_hash_repartition_joins` and
-- `citus.repartition_join_bucket_count_per_node`.
-- The default value for `citus.enable_single_hash_repartition_joins`
-- is "off". When "off", both tables involved in the join are
-- repartitioned. When "on", if one table is already joined on its
-- distribution key, only the other table is repartitioned.
-- The reason for the "off" default is tied to the second GUC:
-- `citus.repartition_join_bucket_count_per_node`. This setting
-- defines the level of parallelism during repartitioning.
-- Opting for a fixed bucket count, rather than dynamically adjusting
-- based on shard count, provides more stability and safety. If you
-- ever consider changing these defaults, be cautious of the potential
-- performance implications.
--------------------------------------------------------
---- Combine query
--------------------------------------------------------
-- the multi-task SELECT queries pulls results to the coordinator
-- and the tuples returned always goes through the "combine query"
-- The `combineQuery` can be traced back through the
-- `DistributedPlan->combineQuery` struct. This query is essentially
-- constructed in the `CreatePhysicalDistributedPlan` function.
-- However, the actual source comes from `MasterExtendedOpNode()`
-- within the logical optimizer. For deeper insights into this logic,
-- you can refer to the paper and video links shared under the
-- "Logical Planner & Optimizer" section.
-- the simplest example is the following where Citus sends count(*) to the
-- shards, and needs to do a sum() on top of the results collected
-- from the workers
SET client_min_messages TO DEBUG4;
DEBUG: generated sql query for task 1
DETAIL: query string: "SELECT count(*) AS count FROM public.users_table_102008 users_table WHERE true"
....
DEBUG: combine query: SELECT COALESCE((pg_catalog.sum(count))::bigint, '0'::bigint) AS count FROM pg_catalog.citus_extradata_container(10, NULL::cstring(0), NULL::cstring(0), '(i 1)'::cstring(0)) remote_scan(count bigint)
D
----------------------------------------------
---- CTE Processing
----------------------------------------------
-- In Postgres 13 and later versions, CTEs (Common Table Expressions)
-- are almost like subqueries. Usually, these CTEs are transformed
-- into subqueries during `standart_planning()`. Citus follows the
-- same approach via `RecursivelyInlineCtesInQueryTree()`.
-- For Citus, there's an additional consideration. CTEs that
-- aren't inlined get materialized. In the Citus context,
-- materialization converts these CTEs into intermediate results.
-- Some users leverage this for achieving full-SQL coverage.
-- Citus includes an extra check before inlining CTEs, conducted by
-- the function `TryCreateDistributedPlannedStmt`. Here, the planner
-- first tries to inline all CTEs and then checks whether Citus can
-- still plan the query. If not, the CTEs remain as is, leading to
-- their materialization. If all CTEs are materialized
-- (e.g., read_intermediate_result), then the query becomes
-- equivalent of a query on reference table, hence full SQL.
-- I understand the logic might seem complex at first. Simple
-- examples will be provided for better understanding.
-- a CTE that is inlined as subquery, and
-- does a query-pushdown
WITH cte_1 AS (SELECT DISTINCT user_id FROM orders_table)
SELECT * FROM cte_1;
-- So, from Citus' query planning perspective
-- the above CTE is equivalent to the following subquery
SELECT * FROM
(SELECT DISTINCT user_id FROM orders_table) cte_1;
-- one a CTE is inlined, then the rest of the query
-- planning logic kicks in
-- for example, below, the cte is inlined and then
-- because the subquery is NOT safe to pushdown
-- it is recurively planned
WITH cte_1 AS (SELECT DISTINCT product_id FROM orders_table)
SELECT * FROM cte_1;
..
DEBUG: CTE cte_1 is going to be inlined via distributed planning
DEBUG: generating subplan 81_1 for subquery SELECT DISTINCT product_id FROM public.orders_table
DEBUG: Plan 81 query after replacing subqueries and CTEs: SELECT product_id FROM (SELECT intermediate_result.product_id FROM read_intermediate_result('81_1'::text, 'binary'::citus_copy_format) intermediate_result(product_id bigint)) cte_1
...
-- ok, so which CTEs are materialized
-- Citus follows the same rules as Postgres
-- See here: https://www.postgresql.org/docs/current/queries-with.html#id-1.5.6.12.7
-- the same query as the first query
-- but due to MATERIALIZED keyword
-- Citus converts the CTE to intermediate result
WITH cte_1 AS MATERIALIZED (SELECT DISTINCT user_id FROM orders_table)
SELECT * FROM cte_1;
-- the same query as the first query
-- but as the same cte used twice
-- Citus converts the CTE to intermediate result
WITH cte_1 AS (SELECT DISTINCT user_id FROM orders_table)
SELECT * FROM cte_1 as c1 JOIN
cte_1 as c2 USING (user_id);
-- now getting into Citus specific materialization
-- of CTEs. Remember that Citus first tries to inline
-- the CTEs, but if it decides that after inlining
-- the query cannot be supported due Citus' SQL
-- limitations, it lets the CTE to be materialized
-- as of writing this document, Citus does NOT support
-- GROUPING SETs on distributed tables/subqueries. So,
-- when we inline the CTE, then Citus would try to plan
-- a query with GROUPING SETs on a distributed table, which
-- would fail. Then, citus would materialize the cte
-- and the final query would be GROUPING SET on an
-- intermediate result, hence can be supported
WITH users_that_have_orders AS (SELECT users_table.* FROM users_table JOIN orders_table USING (user_id))
SELECT
max(date_of_birth)
FROM users_that_have_orders
GROUP BY GROUPING SETS (user_id, email);
...
DEBUG: CTE users_that_have_orders is going to be inlined via distributed planning
...
DEBUG: Planning after CTEs inlined failed with
message: could not run distributed query with GROUPING SETS, CUBE, or ROLLUP
hint: Consider using an equality filter on the distributed table''s partition column.
...
DEBUG: generating subplan 98_1 for CTE users_that_have_orders: SELECT users_table.user_id, users_table.username, users_table.email, users_table.date_of_birth, users_table.country_code FROM (public.users_table JOIN public.orders_table USING (user_id))
---------------------------------------------
--- INSERT query planning
---------------------------------------------
-- At a high-level, there are ~4 different ways that
-- an INSERT command can be planned in Citus
-- The first one ise INSERT ... SELECT command, which
-- we'll discuss separately.
-- The second is, INSERT with sublink that Citus currently
-- does not support
INSERT INTO users_table (user_id) VALUES ((SELECT count(8) FROM orders_table));
ERROR: subqueries are not supported within INSERT queries
HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax.
INSERT INTO users_table (user_id) VALUES (1) RETURNING (SELECT count(*) FROM users_table);
ERROR: subqueries are not supported within INSERT queries
HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax.
-- the third class of INSERT is simple inserts with a single VALUES clause
-- as noted in the "fast-path router planner", the INSERT commands
-- are planned with fast-path planning. Remember that fast-path planning
-- doesn't require to call into standard_planner() and the distribution
-- key should be extracted from the query itself. Both conditions
-- are satisfied
-- the main functions to follow the insert code
-- path involves `RegenerateTaskListForInsert()`,
-- `FastPathRouterQuery()` and `RouterInsertTaskList`
-- for single-row INSERT tasks we always have Job->deferredPruning=true
-- meaning that we can always to the shard pruning on the
-- execution -- which is ideal
INSERT INTO users_table VALUES (1, 'onder', '[email protected]', now() - '5 years'::interval, 'TR');
-- for multi-row INSERTs, `RouterInsertTaskList()` becomes slightly
-- more interesting. Citus handles Multi-row INSERTs by grouping
-- their rows by target shard
-- remember that orders_table is distributed on user_id column
-- so Citus will group different user_ids into a single command
INSERT INTO orders_table (order_id, user_id) VALUES
(1, 1), (2, 2), (3, 1), (4, 3), (5, 2);
..
-- for user_id: 1
DEBUG: query after rebuilding: INSERT INTO public.orders_table_102041 AS citus_table_alias (order_id, user_id) VALUES ('1'::bigint,'1'::bigint), ('3'::bigint,'1'::bigint)
..
-- for user_id: 3
DEBUG: query after rebuilding: INSERT INTO public.orders_table_102055 AS citus_table_alias (order_id, user_id) VALUES ('4'::bigint,'3'::bigint)
..
-- for user_id: 2
DEBUG: query after rebuilding: INSERT INTO public.orders_table_102064 AS citus_table_alias (order_id, user_id) VALUES ('2'::bigint,'2'::bigint), ('5'::bigint,'2'::bigint)
---------------------------------------------
--- INSERT.. SELECT and MERGE Command Query Planning
---------------------------------------------
-- In this section, we discuss INSERT .. SELECT and MERGE
-- commands together. This is because they share almost
-- identical planning logic.
-- Broadly speaking, there are three methods to plan these
-- commands:
-- 1) Pushdown
-- 2) Pull-to-coordinator
-- 3) Repartition
-- When considering performance and resource utilization,
-- pushdown is generally the most efficient. For handling
-- large data sizes, the repartition method scales better
-- than the pull-to-coordinator approach.
-- For a deeper dive into pushdown and repartition methods,
-- refer to this blog post. It focuses on the MERGE command
-- but is also applicable to INSERT .. SELECT:
-- https://www.citusdata.com/blog/2023/07/27/how-citus-12-supports-postgres-merge/
-- Let's now delve into examples, starting with simple ones
-- and gradually moving to more complex scenarios.
-------------------------------------------------
--- INSERT.. SELECT Advanced Scenarios
-------------------------------------------------
-- The INSERT .. SELECT pushdown logic extends upon the
-- pushdown-planning for SELECT commands. Key requirements
-- include colocated tables and matching distribution columns.
-- Relevant C functions are `CreateDistributedInsertSelectPlan`,
-- `DistributedInsertSelectSupported()`, and
-- `AllDistributionKeysInQueryAreEqual`.
-- Additional Conditions for INSERT .. SELECT:
-- The destination table's distribution keys should match
-- the source query's distribution column.
-- Simplest INSERT .. SELECT Pushdown Example:
INSERT INTO users_table SELECT * FROM users_table;
-- INSERT .. SELECT with Subqueries/Joins:
-- Provided subqueries can be pushed down, additional checks,
-- such as matching distribution columns, are performed.
INSERT INTO users_table
SELECT users_table.* FROM users_table,
(SELECT user_id FROM users_table JOIN orders_table USING (user_id)) as foo
WHERE foo.user_id = users_table.user_id;
-- Non-pushdownable Scenario Due to Distribution Key Mismatch:
-- Citus opts for repartitioning since no "merge step" is needed
-- for the SELECT query. The deciding function is `IsRedistributablePlan()`.
INSERT INTO users_table (user_id) SELECT user_id + 1 FROM users_table;
-- Non-pushdownable Scenario Due to LIMIT:
-- The SELECT query requires a "merge step" for the LIMIT clause.
-- Citus uses the pull-to-coordinator strategy.
INSERT INTO users_table SELECT * FROM users_table LIMIT 5;
-- Pull-to-Coordinator Details:
-- Typically, Citus pulls SELECT results and initiates a COPY
-- command to the destination table. See `NonPushableInsertSelectExecScan()`.
-- Special Cases: ON CONFLICT or RETURNING:
-- In these cases, a simple COPY is insufficient. Citus pushes
-- results as "colocated intermediate files" on the workers,
-- which are colocated with the target table's shards. Then,
-- Citus performs an INSERT .. SELECT on these colocated
-- intermediate results. See `ExecutePlanIntoColocatedIntermediateResults()`
-- and `GenerateTaskListWithColocatedIntermediateResults()`.
-- Example: Pull-to-coordinator with COPY back
INSERT INTO users_table SELECT * FROM users_table LIMIT 5;
-- Example: Pull-to-coordinator with push as colocated intermediate results
INSERT INTO users_table SELECT * FROM users_table LIMIT 5 ON CONFLICT(user_id) DO NOTHING;
-------------------------------------------------
--- MERGE Command Query Planning
-------------------------------------------------
-- MERGE command closely resembles INSERT .. SELECT in terms
-- of planning. The key difference lies in the pull-to-coordinator
-- strategy. MERGE always uses "colocated intermediate result"
-- files, as the final executed command must be a MERGE command.
-- A COPY command cannot be used in this case. The entry function
-- in the code is `CreateMergePlan()`.
-- For additional insights, consult the blog:
-- https://www.citusdata.com/blog/2023/07/27/how-citus-12-supports-postgres-merge/
-- Pushdown MERGE Example:
-- The join occurs on the distribution key.
MERGE INTO users_table u
USING orders_table o
ON (u.user_id = o.user_id)
WHEN MATCHED AND o.status = 'DONE' THEN DELETE;
-- Pull-to-Coordinator MERGE Example:
-- The source query requires a "merge step" on the coordinator.
MERGE INTO users_table u
USING (SELECT * FROM orders_table ORDER BY order_date LIMIT 50) o
ON (u.user_id = o.user_id)
WHEN MATCHED AND o.status = 'DONE' THEN DELETE;
-- Repartition MERGE Example:
-- The join is NOT on the distribution key, and the source query
-- does not require a "merge step" on the coordinator.
-- Note: This is mostly a hypothetical query to illustrate the case.
MERGE INTO users_table u
USING (SELECT * FROM orders_table ORDER BY order_date) o
ON (u.user_id = o.product_id)
WHEN MATCHED AND o.status = 'DONE' THEN DELETE;
-------------------------------------------------
--- UPDATE / DELETE Planning
-------------------------------------------------
-- You'll notice a pattern here. Planning for UPDATE/DELETE
-- is similar to what we've discussed for other query types.
-- Essentially, there are 4 primary methods for planning all
-- of which should be familar to you at this point:
-- 1) Fast-Path Router Planning:
-- In this case, the query targets a single shard and the
-- WHERE clause filters on the distribution key.
-- Example:
UPDATE users_table SET email = '[email protected]' WHERE user_id = 5;
-- 2) Router Planning:
-- The query targets a single shard but all the shards
-- belong to a single node and colocated
-- Example:
UPDATE users_table u
SET email = ''
FROM orders_table o
WHERE o.user_id = u.user_id AND
u.user_id = 5 and
o.status = 'done';
-- 3) Pushdown Planning:
-- Here, the query can be pushed down to worker nodes to multiple
-- shards. It can even involve joins if they are on distribution
-- keys.
-- Example:
UPDATE users_table SET email = '[email protected]'
WHERE user_id IN (SELECT user_id FROM orders_table WHERE status = 'in progress');
-- Additional Example for Pushdown Planning with materialized CTE:
-- In this example, we use a CTE that is materialized and then
-- the rest of the UPDATE pushed down.
WITH high_value_users AS (
SELECT user_id FROM orders_table WHERE status = 'done' ORDER BY order_date LIMIT 50
)
UPDATE users_table SET username = 'High Value'
WHERE user_id IN (SELECT user_id FROM high_value_users);
-- 4) Recursive Planning:
-- This type is used for more complex queries, like those
-- involving subqueries or joins that can't be pushed down.
-- These queries are planned recursively.
-- Example:
DELETE FROM users_table WHERE user_id
IN (SELECT user_id FROM orders_table WHERE total > 100 ORDER BY total DESC LIMIT 5);
-----------------------------------------------------
--- Correlated/Lateral Subqueries in Planning
-----------------------------------------------------
-- Correlated or LATERAL subqueries are special in Citus.
-- They are generally only pushdownable, especially if
-- the join happens on the distribution key. Non-distribution
-- key correlated subqueries are one of the key SQL limitations
-- left in Citus.
-- Interesting flexibility comes when the correlation
-- is on the distribution key. Citus can push down these
-- subqueries even if they contain SQL constructs like
-- GROUP BY on non-distribution keys, which usually
-- require a merge step on the coordinator.
-- For code details, check:
-- `DeferErrorIfCannotPushdownSubquery()` ->
-- `ContainsReferencesToOuterQuery()` ->
-- `DeferErrorIfSubqueryRequiresMerge()`.
-- Example 1: Using LATERAL, join is on the distribution key.
SELECT u.*, o_sum.total
FROM users_table u,
LATERAL (SELECT count(DISTINCT status) as total FROM orders_table o WHERE o.user_id = u.user_id) o_sum;
-- Example 2: Complex LATERAL with GROUP BY on non-distribution key.
-- Since the join is on the distribution key, it's pushdownable.
SELECT u.*, o_info.product, o_info.total
FROM users_table u,
LATERAL (
SELECT o.product_id as product, count(DISTINCT o.status) as total
FROM orders_table o WHERE o.user_id = u.user_id
GROUP BY o.product_id
) o_info;
-- as can be seen from the debug message
-- when it is not OK to pushdown the corrolated
-- subqueries, the recursive planner cannot kick in
-- as well. the main reason is that recursive planning
-- needs to operate on completely isolated subqueries
-- whereas correlation prevents that
SELECT u.*
FROM users_table u,
LATERAL (
SELECT o.product_id as product
FROM orders_table o WHERE o.user_id != u.user_id
) o_info;
..
DEBUG: skipping recursive planning for the subquery since it contains references to outer queries
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment