This part of the documentation aims to provide a comprehensive understanding of how Citus handles distributed query planning. We will use a set of realistic tables to demonstrate various query queries. Through these examples, we hope to offer a step-by-step guide on how Citus chooses to plan queries.
Non-Goal: This document does not aim to provide the high level design of the distributed planner, but instead the lowest levels of details possible. For high-level flow, go to https://postgresconf.org/system/events/document/000/000/233/Distributing_Queries_the_Citus_Way.pdf
Tables Included:
- Users Table (Distributed)
- Orders Table (Distributed)
- Products Table (Distributed)
- Country Codes (Reference)
- Order Status (Reference)
- Product Categories (Reference)
-- Distributed Table: Users Table
CREATE TABLE users_table (
user_id bigserial PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(50),
date_of_birth DATE,
country_code VARCHAR(3)
);
SELECT create_distributed_table('users_table', 'user_id');
-- Distributed Table: Orders Table
CREATE TABLE orders_table (
order_id bigserial,
user_id BIGINT REFERENCES users_table(user_id),
product_id BIGINT,
order_date TIMESTAMPTZ,
status VARCHAR(20)
);
SELECT create_distributed_table('orders_table', 'user_id');
-- Distributed Table: Products Table
CREATE TABLE products_table (
product_id bigserial PRIMARY KEY,
product_name VARCHAR(100),
category_id INT,
price NUMERIC(10, 2)
);
SELECT create_distributed_table('products_table', 'product_id');
-- Reference Table: Country Codes
CREATE TABLE country_codes (
country_code VARCHAR(3) PRIMARY KEY,
country_name VARCHAR(50)
);
-- Reference Table: Order Status
CREATE TABLE order_status (
status VARCHAR(20) PRIMARY KEY,
description TEXT
);
SELECT create_reference_table('order_status');
-- Reference Table: Product Categories
CREATE TABLE product_categories (
category_id INT PRIMARY KEY,
category_name VARCHAR(50)
);
SELECT create_reference_table('product_categories');
The Fast Path Router Planner is specialized in optimizing queries that are both simple in structure and certain to touch a single shard. Importantly, it targets queries on a single shard distributed, citus local or reference tables. This does not mean the planner is restricted to trivial queries; it can handle complex SQL constructs like GROUP BY
, HAVING
, DISTINCT
, etc., as long as these operate on a single table and involve an equality condition on the distribution key (distribution_key = X
). The main SQL limitation for fast path distributed query planning is the subquery/CTE support. Those are left to the next planner: Router planner.
The aim of this planner is to avoid relying on PostgreSQL's standard_planner() for planning, which performs unnecessary computations like cost estimation, irrelevant for distributed planning. Skipping the standard_planner has significant performance gains for OLTP workloads. By focusing on "shard-reachable" queries, the Fast Path Router Planner is able to bypass the need for more computationally expensive planning processes, thereby accelerating query execution.
FastPathRouterPlan()
: The primary function for creating the fast-path query plan.FastPathRouterQuery()
: Validates if a query is eligible for fast-path routing by checking its structure and the WHERE clause.
With set client_min_messages to debug4; you should see the following in the DEBUG messages: "DEBUG: Distributed planning for a fast-path router query"
-- Fetches the count of users born in the same year, but only
-- for a single country
SELECT EXTRACT(YEAR FROM date_of_birth) as birth_year, COUNT(*)
FROM users_table
WHERE country_code = 'USA' AND user_id = 15
GROUP BY birth_year
HAVING COUNT(*) > 10;
-- all INSERT commands are by definition fast path
-- router queries in the sense that they do not
-- need any information from Postgres' standard_planner()
INSERT INTO orders_table (user_id, product_id, order_date, status)
VALUES (42, 555, now(), 'NEW');
-- UPDATE/DELETEs can also be qualified as fast path router
-- queries
UPDATE products_table SET price = price * 1.1 WHERE product_id = 555;
Fast path queries have another important characteristic named "deferredPruning."
For regular queries, Citus does the shard pruning during the planning phase, meaning that the shard that the query touches are calculated during the planning phase. However, in an ideal world, the shard pruning should happen during the execution and, for a certain class of queries, we support that. In the code, that is denoted by "Job->deferredPruning" field.
Given that fast path queries are performance critical, they can be planned with prepared statements. When this is done, "Job->deferredPruning" becomes "true". And, the meaning of that is Citus can support PREPARED statements as expected. The first 6 executions of the plan do distributed planning, the rest is cached similar to Postgres' plan caching, and the shard pruning is done during the execution phase. And, if you attach a debugger, you'd see that on the first 6th execution, the debugger will stop at distributed_planner() function, but on the rest, it will not. The shard pruning for the cached command will happen in CitusBeginScan() function.
To see that in action, checkout the DEBUG messages:
set client_min_messages to debug4;
PREPARE p1 (bigint) AS SELECT * FROM users_table WHERE user_id = $1;
-- 1st execute
execute p1(1);
DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan
....
(0 rows)
-- 2nd execute
execute p1(1);
DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan
....
(0 rows)
...
execute p1(1);
execute p1(1);
execute p1(1);
...
-- 6th execute
execute p1(1);
DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan
....
(0 rows)
-- now, on the 7th execute, you would **NOT** see the fast-path
-- planning anymore, because the query comes from Postgres'
-- query cache
execute p1(1);
DEBUG: constraint value: '1'::bigint
DEBUG: shard count after pruning for users_table: 1
DEBUG: opening 1 new connections to localhost:9702
DEBUG: established connection to localhost:9702 for session 8 in 9499 microseconds
DEBUG: task execution (0) for placement (46) on anchor shard (102049) finished in 1281 microseconds on worker node localhost:9702
DEBUG: Total number of commands sent over the session 8: 1 to node localhost:9702
(0 rows)
The Router Planner plays a key role in Citus' query optimization landscape. While sharing some common traits with the Fast Path Router Planner, it offers unique capabilities as well. Router (and fast path router) planners are the bedrock for the multi-tenant use cases.
-
Single Node Routing: Both planners send queries to a single node. Unlike the Fast Path Planner, the Router Planner can work with multiple colocated tables, provided they have filters on their distribution columns.
-
Query Routing Mechanics: Router Planner takes the query, verifies if it can be routed, and if so, it replaces original table names with their corresponding shard names, directing the query to the appropriate nodes.
-
Subqueries and CTEs: The Router Planner can manage subqueries and Common Table Expressions (CTEs), routing the entire query to a single node as long as all involved tables have filters on their distribution columns.
-
Standard Planner Reliance: Router Planner relies on PostgreSQL's
standard_planner()
to learn the necessary filter restrictions on the tables.
PlanRouterQuery()
: Responsible for creating the router plan.TargetShardIntervalsForRestrictInfo()
: Retrieves the shard intervals based on restrictions provided by PostgreSQL'sstandard_planner()
.
-- Fetch user data and their respective orders for a given user_id
SELECT u.username, o.order_id
FROM users_table u, orders_table o
WHERE u.user_id = o.user_id AND u.user_id = 42;
-- With Subqueries:
-- Fetch the username and their total order amount
-- for a specific user
SELECT u.username,
(SELECT MAX(o.product_id) FROM orders_table o
WHERE o.user_id = 42 AND
o.user_id = u.user_id)
FROM users_table u
WHERE u.user_id = 42;
-- Router planner works with CTEs (and UPDATE/DELETE Query):
-- Update the status of the most recent order for a specific user
WITH RecentOrder AS (
SELECT MAX(order_id) as last_order_id
FROM orders_table
WHERE user_id = 42
)
UPDATE orders_table
SET status = 'COMPLETED'
FROM RecentOrder
WHERE orders_table.user_id = 42 AND
orders_table.order_id = RecentOrder.last_order_id;
Certainly, I'll adjust the heading levels for you:
While Router and Fast-Path Router Planners are proficient at dealing with single-shard commands—making them ideal for multi-tenant and OLTP applications—Citus also excels in analytical use-cases. In these scenarios, a single query is broken down into multiple parallel sub-queries, which are run on various shards across multiple machines, thereby speeding up query execution times significantly.
Query Pushdown Planning is an extension of the Router Planning paradigm. Unlike the latter, which deals with single-shard, single-node queries, Query Pushdown can route a query to multiple shards across multiple nodes. Instead of verifying that all tables have the same filters, as in Router Planning, Query Pushdown ascertains that all tables are joined on their distribution keys.
The core C function responsible for this check is RestrictionEquivalenceForPartitionKeys()
, which ensures that tables in the query are joined based on their distribution keys. Initially intended for subqueries, Query Pushdown has been extended to include other cases as well. The decision to utilize Query Pushdown is determined by the ShouldUseSubqueryPushDown()
function.
Understanding Query Pushdown Planning and how it extends the simpler Router Planning can help you fully utilize Citus for your analytical workloads.
- High Parallelism: The query is broken down into multiple sub-queries, leading to parallel execution on multiple shards and nodes.
- Worker Subquery: You will typically notice the alias
worker_subquery
in the SQL queries sent to the shards, indicating a pushdown operation.
-- Count of distinct product_ids where user_ids from two different tables match
SELECT count(DISTINCT product_id)
FROM (
SELECT DISTINCT user_id as distinct_user_id
FROM users_table
) foo, orders_table
WHERE orders_table.user_id = distinct_user_id;
-- Sum of max price per product category, filtered by a subquery in the target list
SELECT
(SELECT MAX(price) FROM products_table p WHERE p.category = o.category),
COUNT(DISTINCT o.product_id)
FROM orders_table o, users_table u
WHERE o.user_id = u.user_id AND u.user_id IN
(SELECT user_id FROM special_users_table)
GROUP BY o.category;
-- Number of distinct users who have placed an order
SELECT COUNT(DISTINCT u.user_id)
FROM users_table u
WHERE u.user_id IN (
SELECT o.user_id
FROM orders_table o
);
-- Count of distinct products per user, with maximum order date from orders
-- as a subquery in the target list
SELECT
(SELECT MAX(o.order_date) FROM orders_table o WHERE o.user_id = u.user_id),
COUNT(DISTINCT o.product_id)
FROM orders_table o, users_table u
WHERE o.user_id = u.user_id
GROUP BY u.user_id;
-- Update status in orders_table for users whose email ends with '@example.com'
UPDATE orders_table o
SET status = 'DISCOUNTED'
FROM users_table u
WHERE o.user_id = u.user_id AND u.email LIKE '%@example.com';
-- Delete orders for users who were born before '2000-01-01'
DELETE FROM orders_table o
USING users_table u
WHERE o.user_id = u.user_id AND u.date_of_birth < '2000-01-01';
Central to understanding Citus' approach to distributed query planning are two closely interrelated concepts: "Query Pushdown Planning" and "Recursive Planning." These dual strategies lay the foundation for Citus' capacity to manage complex query structures across multiple shards and nodes effectively.
While Query Pushdown Planning optimizes queries by breaking them into smaller components that can run in parallel across multiple shards, Recursive Planning takes a more nuanced approach. It works its way through the query tree from the deepest level upwards, scrutinizing each subquery to determine its suitability for pushdown.
The essence of recursive planning lies in treating each recursively planned query in isolation. This means correlated subqueries can't take advantage of recursive planning. However, (sub)queries on local tables can be done via recursive planning.
This process is primarily executed in the RecursivelyPlanSubqueryWalker()
C function. In this function, the engine goes to the innermost subquery and assesses whether it can safely be pushed down as a stand-alone query. If it can, the query engine simply moves on. However, if the subquery isn't suitable for pushdown, Citus generates a separate "sub-plan" for that subquery, substituting it with a read_intermediate_result()
function call. These sub-plans are later executed as independent queries, a task overseen by the ExecuteSubPlans()
function.
The engine continues this way, moving upward through each level of subqueries, evaluating and, if needed, creating sub-plans until it reaches the top-level query.
One of the key aspects of Recursive Planning is the use of "intermediate results." These are essentially the outcomes of subqueries that have been recursively planned and executed on worker nodes. Once these intermediate results are obtained, they are treated much like reference tables in the subsequent stages of query planning and execution. The key advantage here is that, like reference tables, these intermediate results can be joined with distributed tables on any column, not just the distribution key.
The practice of recursively creating sub-plans and generating intermediate results offers a workaround for achieving full SQL coverage in Citus. If each subquery in a complex SQL query can be replaced with an intermediate result, then the entire query essentially becomes a query on a reference table. This feature is a crucial aspect for many users who require comprehensive SQL support in their distributed systems.
While Recursive Planning brings a lot to the table, it's not without its drawbacks. First, the method inherently adds more network round-trips, as each recursively planned query is executed separately, and its results are pushed back to all worker nodes. Secondly, when functions like read_intermediate_results
are used to fetch data from these intermediate results, it can confound the Postgres planner, particularly in the context of complex joins. As a result, query estimations may be inaccurate, leading to suboptimal execution plans.
Understanding these facets of Recursive Planning can provide you with a comprehensive view of how Citus approaches distributed query planning, allowing you to better optimize your database operations.
This may seem complex at first glance, but it's a bit like a step-by-step puzzle-solving process that the Citus query engine performs to optimize your database queries effectively. To help clarify these intricate mechanics, we'll present a series of examples.
In the simplest example, we'll have a single subquery which is NOT pushdown-safe due to LIMIT 1, hence creating a subplan
SET client_min_messages TO DEBUG1;
SELECT count(*) FROM (SELECT * FROM users_table LIMIT 1) as foo;
SET
Time: 0.765 ms
DEBUG: push down of limit count: 1
DEBUG: generating subplan 7_1 for subquery SELECT user_id, username, email, date_of_birth, country_code FROM public.users_table LIMIT 1
DEBUG: Plan 7 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.user_id, intermediate_result.username, intermediate_result.email, intermediate_result.date_of_birth, intermediate_result.country_code FROM read_intermediate_result('7_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id bigint, username character varying(50), email character varying(50), date_of_birth date, country_code character varying(3))) foo
Now, we have multiple subqueries in the same level which are NOT pushdown-safe due to LIMIT 1 and GROUP BY non distribution keys, hence creating a subplan
SELECT count(*) FROM
(SELECT * FROM users_table LIMIT 1) as foo,
(SELECT count(*) FROM users_table GROUP BY country_code) as bar;
DEBUG: push down of limit count: 1
DEBUG: generating subplan 9_1 for subquery SELECT user_id, username, email, date_of_birth, country_code FROM public.users_table LIMIT 1
DEBUG: generating subplan 9_2 for subquery SELECT count(*) AS count FROM public.users_table GROUP BY country_code
DEBUG: Plan 9 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.user_id, intermediate_result.username, intermediate_result.email, intermediate_result.date_of_birth, intermediate_result.country_code FROM read_intermediate_result('9_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id bigint, username character varying(50), email character varying(50), date_of_birth date, country_code character varying(3))) foo, (SELECT intermediate_result.count FROM read_intermediate_result('9_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) bar
We have a subquery foo that is NOT safe-to-pushdown but once that subquery is replaced with an intermediate result, the rest of the query becomes safe-to-pushdown
SELECT count(*) FROM
(SELECT 1 FROM (SELECT user_id FROM users_table LIMIT 1) as foo,
(SELECT * FROM orders_table) as o1,
(SELECT * FROM users_table) as u2
WHERE
foo.user_id = o1.user_id AND
o1.user_id = u2.user_id) as top_level_subquery;
DEBUG: push down of limit count: 1
DEBUG: generating subplan 1_1 for subquery SELECT user_id FROM public.users_table LIMIT 1
DEBUG: Plan 1 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT 1 AS "?column?" FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('1_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id bigint)) foo, (SELECT orders_table.order_id, orders_table.user_id, orders_table.product_id, orders_table.order_date, orders_table.status FROM public.orders_table) o1, (SELECT users_table.user_id, users_table.username, users_table.email, users_table.date_of_birth, users_table.country_code FROM public.users_table) u2 WHERE ((foo.user_id OPERATOR(pg_catalog.=) o1.user_id) AND (o1.user_id OPERATOR(pg_catalog.=) u2.user_id))) top_level_subquery
In the previous parts of the recursive planning examples, we only dealt with a subquery at a time. However, recursive planning is capable of considering multiple subqueries in the same query level or converting tables to subqueries in the same level. In this part of the document, let's discuss these advanced query planning capabilities.
Set operations like UNION, UNION ALL, and EXCEPT are essentially two subqueries in the same query level.
Note: The rules for set operation planning on Citus can be confusing and should be taken carefully.
Citus is capable of "pushing down" certain set operations: UNION and UNION ALL. To allow this, two rules must be met, which are defined in the SafeToPushdownUnionSubquery()
C code.
- The set operation cannot be on the top level; it should be wrapped into a subquery. This is purely an implementation limitation that can and should be eased.
- For all subqueries, each leaf query should have a "distribution key" on the target list, and the ordinal positions of these "distribution keys" should match across all set operations. This second limitation is required to preserve correctness.
-- safe to pushdown
SELECT * FROM (SELECT * FROM users_table UNION SELECT * FROM users_table) as foo;
-- not safe to pushdown because the set operation is NOT wrapped into a subquery.
-- Each leaf query is recursively planned.
SELECT * FROM users_table UNION SELECT * FROM users_table;
-- not safe to pushdown because the distribution columns do NOT match (e.g., not existing)
SELECT * FROM (SELECT username FROM users_table UNION SELECT username FROM users_table) as foo;
-- not safe to pushdown because the distribution columns do NOT match.
SELECT * FROM (SELECT user_id + 1 FROM users_table UNION SELECT user_id - 1 FROM users_table) as foo;
-- EXCEPT is never safe to pushdown
SELECT * FROM (SELECT * FROM users_table EXCEPT SELECT * FROM users_table) as foo;
Although not very common, some users might have joins along with set operations. Example queries might look like:
(SELECT .. t1 JOIN t2) UNION (SELECT t2 JOIN t3)
(SELECT .. t1 UNION SELECT t2) JOIN t3 ..
((SELECT .. t1 JOIN t2) UNION (SELECT t2 JOIN t3)) JOIN t4
For all these cases, similar rules apply:
- JOINs should be made on the distribution keys.
- SET operations should satisfy the
SafeToPushdownUnionSubquery()
conditions.
When combined, all conditions should match.
-- All joins are on the distribution key and all the unions have the distribution key in the same ordinal position.
SELECT * FROM (
(SELECT user_id FROM users_table u1 JOIN users_table u2 USING (user_id))
UNION
(SELECT user_id FROM users_table u1 JOIN users_table u2 USING (user_id))
) as foo;
-- All joins are on the distribution key and all the unions have the distribution key in the same ordinal position.
SELECT * FROM
(SELECT user_id FROM users_table u1 UNION
SELECT user_id FROM users_table u2) as foo
JOIN
users_table u2
USING (user_id);
Postgres allows the HAVING clause to contain subqueries. If the subqueries in the HAVING clause don't reference the outer query (i.e., not correlated), then it's possible to recursively plan the subquery in the HAVING clause. This involves using the RecursivelyPlanAllSubqueries()
function specifically for the HAVING clause.
-- Find user_ids who have placed more orders than the average number of orders per user.
SELECT
u.user_id,
COUNT(o.order_id) AS total_orders
FROM
users_table u
JOIN
orders_table o ON u.user_id = o.user_id
GROUP BY
u.user_id
HAVING
COUNT(o.order_id) > (SELECT AVG(order_count) FROM (
SELECT
user_id,
COUNT(order_id) AS order_count
FROM
orders_table
GROUP BY
user_id) AS subquery);
Assume that there are two subqueries; each subquery is individually joined on their distribution keys. However, when the two subqueries are joined on arbitrary keys, the non-colocated subquery join logic kicks in, as described in RecursivelyPlanNonColocatedSubqueries()
.
-- Find users who do not have orders with status 'shipped' and 'pending'
-- Sub1 and Sub2 are individually safe to pushdown.
-- The join condition between them is: sub1.user_id != sub2.user_id, which does not preserve distribution key equality.
-- Citus qualifies sub1 as the anchor subquery and checks whether all other subqueries are joined on the distribution key.
-- In this case, sub2 is not joined on the distribution key, so Citus decides to recursively plan the whole sub2.
SELECT a.user_id, b.user_id
FROM (
SELECT u.user_id
FROM users_table u
JOIN orders_table o ON u.user_id = o.user_id
WHERE o.status = 'shipped'
GROUP BY u.user_id
) AS sub1
JOIN (
SELECT u.user_id
FROM users_table u
JOIN orders_table o ON u.user_id = o.user_id
WHERE o.status = 'pending'
GROUP BY u.user_id
) AS sub2 ON sub1.user_id != sub2.user_id;
-- Similar logic also applies for subqueries in the WHERE clause.
-- Both the query in the FROM clause and the subquery in the WHERE clause are individually safe to pushdown.
-- However, as a whole, the query is not safe to pushdown.
-- Therefore, Citus decides to recursively plan the subquery in the WHERE clause.
SELECT o1.order_id, o1.order_date
FROM orders_table o1, users_table u1
WHERE o1.user_id = u1.user_id
AND o1.order_date IN (
SELECT o2.order_date
FROM orders_table o2, users_table u2
WHERE o2.user_id = u2.user_id AND o2.status = 'shipped'
);
In Citus, joins between a local table and a distributed table require special handling. The local table data resides on the Citus coordinator node, while the distributed table data is across multiple worker nodes. The RecursivelyPlanLocalTableJoins()
C function handles this.
Local and distributed table joins have specific performance traits. They push down filters and projections, meaning only relevant data is pulled to the coordinator. See the RequiredAttrNumbersForRelation()
and ReplaceRTERelationWithRteSubquery()
functions for more details.
- Citus scans the query tree to find joins between local and distributed tables.
- Upon finding such a join, Citus forms a sub-plan for the local table.
- This sub-plan retrieves relevant data from the local table into an intermediate result and distributes it across worker nodes.
- The original query is then rewritten, replacing the local table with these intermediate results.
- Finally, this new query, now only involving distributed tables, is executed using Citus's standard query execution engine.
For example, consider a local table local_users
and a distributed table orders_table
. A query like this:
SELECT *
FROM local_users l, orders_table o
WHERE l.user_id = o.user_id;
Would be internally transformed by Citus as follows:
-- Create a temporary reference table and populate it with local table data
CREATE TEMP TABLE temp_local_users AS SELECT * FROM local_users;
SELECT create_reference_table('temp_local_users');
-- Replace the local table with the temporary distributed table in the original query
SELECT *
FROM temp_local_users t, orders_table o
WHERE t.user_id = o.user_id;
By tweaking citus.local_table_join_policy
, you can control how Citus behaves for queries involving local and distributed tables. The default behavior is to pull local table data to the coordinator, with exceptions for distributed tables filtered on primary key or unique index.
For instance, when the distributed table is guaranteed to return at most one row, Citus chooses to recursively plan the distributed table:
SELECT *
FROM local_users l, orders_table o
WHERE l.user_id = o.user_id AND o.primary_key = 55;
Very much like local-distributed table joins, Citus can't push down queries formatted as:
"... ref_table LEFT JOIN distributed_table ..."
This is the case when the outer side is a recurring tuple (e.g., reference table, intermediate results, or set returning functions).
In these situations, Citus recursively plans the "distributed" part of the join. Even though it may seem excessive to recursively plan a distributed table, remember that Citus pushes down the filters and projections. Functions involved here include RequiredAttrNumbersForRelation()
and ReplaceRTERelationWithRteSubquery()
.
The core function handling this logic is RecursivelyPlanRecurringTupleOuterJoinWalker()
. There are likely numerous optimizations possible (e.g., first pushing down an inner JOIN then an outer join), but these have not been implemented due to their complexity.
Here's an example that counts the number of orders for each status, including only statuses that also appear in the reference table:
SELECT os.status, COUNT(o.order_id)
FROM order_status os
LEFT JOIN orders_table o ON os.status = o.status
GROUP BY os.status;
DEBUG: recursively planning right side of the left join since the outer side is a recurring rel
DEBUG: recursively planning distributed relation "orders_table" "o" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: Wrapping relation "orders_table" "o" to a subquery
DEBUG: generating subplan 45_1 for subquery SELECT order_id, status FROM public.orders_table o WHERE true
This section discusses a specific scenario in Citus's recursive query planning: handling queries where the main query's FROM
clause is recurring, but there are subqueries in the SELECT
or WHERE
clauses involving distributed tables.
-
Recurring: Here, "recurring" implies that the
FROM
clause doesn't contain any distributed tables. Instead, it may have reference tables, local tables, or set-returning functions. -
Subqueries in SELECT and WHERE: In case the main query's
FROM
clause is recurring, then no distributed tables should be present in theSELECT
andWHERE
subqueries.
Citus solves this by recursively planning these problematic subqueries, effectively replacing them with calls to read_intermediate_result()
.
For the WHERE
clause, the function RecursivelyPlanAllSubqueries
is called, transforming all subqueries within it.
-- Main query FROM clause is recurring, but
-- WHERE clause contains a pushdownable subquery from
-- orders_table (distributed table)
SELECT country_name
FROM country_codes
WHERE country_code IN
(SELECT country_code FROM users_table WHERE user_id IN (SELECT user_id FROM orders_table));
Similarly, RecursivelyPlanAllSubqueries
is called for the SELECT
clause to replace any existing subqueries.
-- Main query FROM clause is recurring, but SELECT clause contains a subquery from orders_table (distributed table)
SELECT
(SELECT COUNT(*) FROM orders_table WHERE status = 'shipped') AS shipped_orders, country_name
FROM country_codes;
In both examples, since the main query's FROM
clause is recurring and involves subqueries on distributed tables in WHERE
or SELECT
, Citus uses RecursivelyPlanAllSubqueries
to manage these subqueries.
At the high level, multi-task queries go through the logical planner. However, when it comes to query pushdown or the recursive planner, the logical planner does very little. Most of its complexity deals with multi-shard queries that don't fall into these categories.
The simplest example of a query processed by the logical planner would be:
SELECT * FROM users_table;
The logical planner implements the concepts from the paper: "Correctness of query execution strategies in distributed databases." The paper is available here.
If you find the paper hard to read, Marco provides a good introduction to the same concepts in the following presentation:
We assume you have either watched the video or read the paper. The core C functions involved are MultiLogicalPlanCreate()
, MultiNodeTree()
, and MultiLogicalPlanOptimize()
.
The core function MultiLogicalPlanCreate()
maps the SQL query to a C structure (e.g., MultiNode
). Then MultiLogicalPlanOptimize()
applies available optimizations to the MultiNode
.
For instance, one simple optimization pushes the "filter" operation below the "MultiCollect." Such rules are defined in the function Commutative()
in multi_logical_optimizer.c
.
Overall, the algorithm aims to move as much computation as possible closer to the data. This code path has not been updated for a while, so readers should debug the code themselves.
Context and Use Case:
This query planning mechanism is primarily geared towards data warehouse type of query planning. It's worth noting that the Citus team has not actively pursued optimizations in this direction, resulting in some non-optimized code paths.
Join Order Optimization:
In Citus' logical planner, the JoinOrderList()
function serves to choose the most efficient join order possible. However, its primary focus has been on joins that require repartitioning, as well as some specific non-repartition joins. For example, joins on distribution keys that are not eligible for pushdown planning may pass through this code path, although no optimizations are made in those cases.
Algorithm Simplicity:
The current algorithm, encapsulated in the BestJoinOrder()
function, is relatively naive. While it aims to minimize the number of repartition joins, it does not provide a performance evaluation for each of them. This function provides room for performance optimizations, especially when dealing with complex joins that necessitate repartitioning.
Control via GUCs:
Two GUCs control the behavior of repartitioning in Citus: citus.enable_single_hash_repartition_joins
and citus.repartition_join_bucket_count_per_node
.
-
citus.enable_single_hash_repartition_joins:
The default value is "off". When "off", both tables involved in the join are repartitioned. When "on", if one table is already joined on its distribution key, only the other table is repartitioned. -
citus.repartition_join_bucket_count_per_node:
This setting defines the level of parallelism during repartitioning. The reason for the "off" default is tied to this GUC. Opting for a fixed bucket count, rather than dynamically adjusting based on shard count, provides more stability and safety. If you ever consider changing these defaults, be cautious of the potential performance implications.
-
Overview:
The multi-task SELECT queries pull results to the coordinator, and the tuples returned always go through the "combine query". -
Structure and Source:
ThecombineQuery
can be traced back through theDistributedPlan->combineQuery
struct. This query is essentially constructed in theCreatePhysicalDistributedPlan
function. However, the actual source comes fromMasterExtendedOpNode()
within the logical optimizer. For deeper insights into this logic, you can refer to the paper and video links shared under the "Logical Planner & Optimizer" section. -
Example:
The simplest example is the following where Citus sendscount(*)
to the shards, and needs to do asum()
on top of the results collected from the workers.SET client_min_messages TO DEBUG4; DEBUG: generated sql query for task 1 DETAIL: query string: "SELECT count(*) AS count FROM public.users_table_102008 users_table WHERE true" .... DEBUG: combine query: SELECT COALESCE((pg_catalog.sum(count))::bigint, '0'::bigint) AS count FROM pg_catalog.citus_extradata_container(10, NULL::cstring(0), NULL::cstring(0), '(i 1)'::cstring(0)) remote_scan(count bigint) D
I apologize for the mistake. Thank you for bringing it to my attention. Here's the text with formatting adjustments, without changing or dropping any text or comments.
-
In Postgres 13 and Later Versions:
In Postgres 13 and later versions, CTEs (Common Table Expressions) are almost like subqueries. Usually, these CTEs are transformed into subqueries duringstandart_planning()
. Citus follows the same approach viaRecursivelyInlineCtesInQueryTree()
. -
Additional Consideration in Citus:
For Citus, there's an additional consideration. CTEs that aren't inlined get materialized. In the Citus context, materialization converts these CTEs into intermediate results. Some users leverage this for achieving full-SQL coverage. -
Extra Check in Citus:
Citus includes an extra check before inlining CTEs, conducted by the functionTryCreateDistributedPlannedStmt
. Here, the planner first tries to inline all CTEs and then checks whether Citus can still plan the query. If not, the CTEs remain as is, leading to their materialization. If all CTEs are materialized (e.g., read_intermediate_result), then the query becomes equivalent of a query on reference table, hence full SQL.
Examples for Better Understanding:
I understand the logic might seem complex at first. Simple examples will be provided for better understanding.
-- a CTE that is inlined as subquery, and does a query-pushdown
WITH cte_1 AS (SELECT DISTINCT user_id FROM orders_table)
SELECT * FROM cte_1;
So, from Citus' query planning perspective the above CTE is equivalent to the following subquery
SELECT * FROM
(SELECT DISTINCT user_id FROM orders_table) cte_1;
Once a CTE is inlined, then the rest of the query planning logic kicks in for example, below, the cte is inlined and then because the subquery is NOT safe to pushdown it is recurively planned
WITH cte_1 AS (SELECT DISTINCT product_id FROM orders_table)
SELECT * FROM cte_1;
..
DEBUG: CTE cte_1 is going to be inlined via distributed planning
DEBUG: generating subplan 81_1 for subquery SELECT DISTINCT product_id FROM public.orders_table
DEBUG: Plan 81 query after replacing subqueries and CTEs: SELECT product_id FROM (SELECT intermediate_result.product_id FROM read_intermediate_result('81_1'::text, 'binary'::citus_copy_format) intermediate_result(product_id bigint)) cte_1;
- Which CTEs Are Materialized:
Citus follows the same rules as Postgres. See Postgres documentation.
-- the same query as the first query
-- but due to MATERIALIZED keyword
-- Citus converts the CTE to intermediate result
WITH cte_1 AS MATERIALIZED (SELECT DISTINCT user_id FROM orders_table)
SELECT * FROM cte_1;
-- the same query as the first query
-- but as the same cte used twice
-- Citus converts the CTE to intermediate result
WITH cte_1 AS (SELECT DISTINCT user_id FROM orders_table)
SELECT * FROM cte_1 as c1 JOIN
cte_1 as c2 USING (user_id);
- Citus Specific Materialization:
Citus first tries to inline the CTEs, but if it decides that after inlining the query cannot be supported due Citus' SQL limitations, it lets the CTE to be materialized.
As of writing this document, Citus does NOT support GROUPING SETs on distributed tables/subqueries. So, when we inline the CTE, then Citus would try to plan a query with GROUPING SETs on a distributed table, which would fail. Then, citus would materialize the cte and the final query would be GROUPING SET on an intermediate result, hence can be supported
WITH users_that_have_orders AS (SELECT users_table.* FROM users_table JOIN orders_table USING (user_id))
SELECT
max(date_of_birth)
FROM users_that_have_orders
GROUP BY GROUPING SETS (user_id, email);
...
DEBUG: CTE users_that_have_orders is going to be inlined via distributed planning
...
DEBUG: Planning after CTEs inlined failed with
message: could not run distributed query with GROUPING SETS, CUBE, or ROLLUP
hint: Consider using an equality filter on the distributed table''s partition column.
...
DEBUG: generating subplan 98_1 for CTE users_that_have_orders: SELECT users_table.user_id, users_table.username, users_table.email, users_table.date_of_birth, users_table.country_code FROM (public.users_table JOIN public.orders_table USING (user_id))
At a High-Level Overview:
- There are approximately 4 different ways that an INSERT command can be planned in Citus. The first one is the INSERT ... SELECT command, which will be discussed separately.
INSERT with Sublink (Not Supported):
INSERT INTO users_table (user_id) VALUES ((SELECT count(8) FROM orders_table));
ERROR: subqueries are not supported within INSERT queries
HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax.
INSERT INTO users_table (user_id) VALUES (1) RETURNING (SELECT count(*) FROM users_table);
ERROR: subqueries are not supported within INSERT queries
HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax.
Simple Inserts with a Single VALUES Clause:
-- As noted in the "fast-path router planner", these INSERT commands are planned with fast-path planning. This does not require calling into standard_planner()
, and the distribution key should be extracted from the query itself.
INSERT INTO users_table VALUES (1, 'onder', '[email protected]', now() - '5 years'::interval, 'TR');
Main Functions:
The main functions involved in this path are RegenerateTaskListForInsert()
, FastPathRouterQuery()
, and RouterInsertTaskList
. For single-row INSERT tasks, Job->deferredPruning=true
, meaning we can always do the shard pruning during execution.
Multi-row INSERTs:
For multi-row INSERTs, RouterInsertTaskList()
becomes slightly more interesting. Citus groups rows by target shard.
INSERT INTO orders_table (order_id, user_id) VALUES
(1, 1), (2, 2), (3, 1), (4, 3), (5, 2);
Debug Info:
Debug information shows how the query is rebuilt for different user_ids.
-- for user_id: 1
DEBUG: query after rebuilding: INSERT INTO public.orders_table_102041 AS citus_table_alias (order_id, user_id) VALUES ('1'::bigint,'1'::bigint), ('3'::bigint,'1'::bigint)
-- for user_id: 3
DEBUG: query after rebuilding: INSERT INTO public.orders_table_102055 AS citus_table_alias (order_id, user_id) VALUES ('4'::bigint,'3'::bigint)
-- for user_id: 2
DEBUG: query after rebuilding: INSERT INTO public.orders_table_102064 AS citus_table_alias (order_id, user_id) VALUES ('2'::bigint,'2'::bigint), ('5'::bigint,'2'::bigint)
Overview:
-- This section discusses INSERT .. SELECT
and MERGE
commands, which share almost identical planning logic.
Planning Methods:
Broadly, there are three methods to plan these commands:
- Pushdown
- Pull-to-coordinator
- Repartition
Performance Considerations:
When it comes to performance and resource utilization, pushdown is generally the most efficient. For handling large data sizes, the repartition method scales better than the pull-to-coordinator method.
Further Reading:
For more detailed information on pushdown and repartition methods, refer to this blog post. The post focuses on the MERGE
command but is also applicable to INSERT .. SELECT
.
Examples:
The following section will delve into examples, starting with simple ones and moving to more complex scenarios.
Certainly, here's your text with formatting adjustments for improved readability. I've made sure not to alter any text, comments, or SQL code.
Overview:
The INSERT .. SELECT
pushdown logic builds upon the pushdownplanning for SELECT
commands. The key requirements include colocated tables and matching distribution columns. Relevant C functions are CreateDistributedInsertSelectPlan
, DistributedInsertSelectSupported()
, and AllDistributionKeysInQueryAreEqual
.
Additional Conditions for INSERT .. SELECT pushdown:
- The destination table's distribution keys should match the source query's distribution column.
Simplest INSERT .. SELECT Pushdown Example:
INSERT INTO users_table SELECT * FROM users_table;
INSERT .. SELECT with Subqueries/Joins:
Provided subqueries can be pushed down, additional checks such as matching distribution columns are performed.
INSERT INTO users_table
SELECT users_table.* FROM users_table,
(SELECT user_id FROM users_table JOIN orders_table USING (user_id)) as foo
WHERE foo.user_id = users_table.user_id;
Non-pushdownable Scenarios:
Due to Distribution Key Mismatch:
Citus opts for repartitioning since no "merge step" is needed for the SELECT
query. The deciding function is IsRedistributablePlan()
.
INSERT INTO users_table (user_id) SELECT user_id + 1 FROM users_table;
Due to LIMIT:
The SELECT
query requires a "merge step" for the LIMIT
clause. Citus uses the pull-to-coordinator strategy.
INSERT INTO users_table SELECT * FROM users_table LIMIT 5;
Pull-to-Coordinator Details:
Citus typically pulls SELECT
results and initiates a COPY
command to the destination table. See NonPushableInsertSelectExecScan()
.
Special Cases:
ON CONFLICT or RETURNING:
In these cases, a simple COPY
is insufficient. Citus pushes results as "colocated intermediate files" on the workers, which are colocated with the target table's shards. Then, Citus performs an INSERT .. SELECT
on these colocated intermediate results. See ExecutePlanIntoColocatedIntermediateResults()
and GenerateTaskListWithColocatedIntermediateResults()
.
Example: Pull-to-coordinator with COPY back to shards:
INSERT INTO users_table SELECT * FROM users_table LIMIT 5;
Example: Pull-to-coordinator with push as colocated intermediate results:
INSERT INTO users_table SELECT * FROM users_table LIMIT 5 ON CONFLICT(user_id) DO NOTHING;
Certainly, here's your text with adjusted formatting for better readability. No text, comments, or SQL code have been changed.
Overview:
The MERGE
command planning is similar to INSERT .. SELECT
. The key difference is in the pull-to-coordinator strategy. MERGE
always uses "colocated intermediate result" files, as the final executed command must be a MERGE
command, not a COPY
. The entry function in the code is CreateMergePlan()
.
Further Reading:
For more insights, check out this blog post.
Pushdown MERGE Example:
The join is based on the distribution key.
MERGE INTO users_table u
USING orders_table o
ON (u.user_id = o.user_id)
WHEN MATCHED AND o.status = 'DONE' THEN DELETE;
Pull-to-Coordinator MERGE Example:
The source query requires a "merge step" on the coordinator.
MERGE INTO users_table u
USING (SELECT * FROM orders_table ORDER BY order_date LIMIT 50) o
ON (u.user_id = o.user_id)
WHEN MATCHED AND o.status = 'DONE' THEN DELETE;
Repartition MERGE Example:
The join is NOT on the distribution key, and the source query doesn't require a "merge step" on the coordinator. Note that this example is mostly hypothetical to illustrate the case.
MERGE INTO users_table u
USING (SELECT * FROM orders_table ORDER BY order_date) o
ON (u.user_id = o.product_id)
WHEN MATCHED AND o.status = 'DONE' THEN DELETE;
Certainly, here's your SQL documentation with improved formatting. No text, SQL code, or comments have been altered, only the format has been adjusted for readability.
Overview:
The planning logic for UPDATE/DELETE queries is quite similar to what we've discussed for INSERT and MERGE commands. There are essentially four primary methods of planning:
1) Fast-Path Router Planning:
Targets a single shard and filters on the distribution key in the WHERE clause.
UPDATE users_table SET email = '[email protected]' WHERE user_id = 5;
2) Router Planning:
Targets a single shard, but all the shards are on a single node and are colocated.
UPDATE users_table u
SET email = ''
FROM orders_table o
WHERE o.user_id = u.user_id AND
u.user_id = 5 AND
o.status = 'done';
3) Pushdown Planning:
The query can be pushed down to worker nodes, targeting multiple shards. Joins are also possible if they are on distribution keys.
UPDATE users_table SET email = '[email protected]'
WHERE user_id IN (SELECT user_id FROM orders_table WHERE status = 'in progress');
Additional Example for Pushdown with Materialized CTE:
WITH high_value_users AS (
SELECT user_id FROM orders_table WHERE status = 'done' ORDER BY order_date LIMIT 50
)
UPDATE users_table SET username = 'High Value'
WHERE user_id IN (SELECT user_id FROM high_value_users);
4) Recursive Planning:
Used for more complex queries, like those with subqueries or joins that can't be pushed down. The queries are planned recursively.
DELETE FROM users_table WHERE user_id
IN (SELECT user_id FROM orders_table WHERE total > 100 ORDER BY total DESC LIMIT 5);
Certainly! I've formatted your SQL documentation for better readability. No text, SQL code, or comments have been altered, only the format for better clarity.
Overview:
Correlated or LATERAL subqueries have special behavior in Citus. They can often be pushed down, especially when the join is on the distribution key. There are limitations for joins not on the distribution key.
Key Code Details:
For more information on the code, check the following functions:
DeferErrorIfCannotPushdownSubquery()
->
ContainsReferencesToOuterQuery()
->
DeferErrorIfSubqueryRequiresMerge()
.
Example 1: Using LATERAL, where the join is on the distribution key.
SELECT u.*, o_sum.total
FROM users_table u,
LATERAL (SELECT count(DISTINCT status) as total FROM orders_table o WHERE o.user_id = u.user_id) o_sum;
Example 2: Complex LATERAL with GROUP BY on a non-distribution key. It's pushdownable because the join is on the distribution key.
SELECT u.*, o_info.product, o_info.total
FROM users_table u,
LATERAL (
SELECT o.product_id as product, count(DISTINCT o.status) as total
FROM orders_table o WHERE o.user_id = u.user_id
GROUP BY o.product_id
) o_info;
Debug and Error Messages:
When it's not possible to push down correlated subqueries, recursive planning also can't be used.
SELECT u.*
FROM users_table u,
LATERAL (
SELECT o.product_id as product
FROM orders_table o WHERE o.user_id != u.user_id
) o_info;
DEBUG: skipping recursive planning for the subquery since it contains references to outer queries
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator