Skip to content

Instantly share code, notes, and snippets.

@Slach
Created May 9, 2025 05:40
Show Gist options
  • Select an option

  • Save Slach/7947982853408e8b21a0798e4dd21b8a to your computer and use it in GitHub Desktop.

Select an option

Save Slach/7947982853408e8b21a0798e4dd21b8a to your computer and use it in GitHub Desktop.
Как работает distributed ddl в clickhouse

Распределенные DDL-задания: хранение, выполнение и очистка

Структура хранения заданий в Keeper (ZooKeeper)

Когда в ClickHouse выполняется распределенный DDL-запрос (ON CLUSTER), он сохраняется в распределенной очереди задач в Keeper (аналог ZooKeeper). В конфигурации задается путь для этой очереди (например, /clickhouse/task_queue/ddl), и каждый новый DDL-запрос на весь кластер добавляется как новый узел (znode) в этом разделе. Узел создается с флагом persistent sequential – то есть получает уникальное имя вида query-<число> по возрастанию github.com. Таким образом, в Keeper хранится упорядоченный список задач.

Данные каждого узла (задачи) содержат сериализованную информацию о DDL-запросе. В коде это структура DDLLogEntry, которая при сохранении превращается в текстовые строки. В узле сохраняются поля: версия формата записи, сам текст SQL-запроса, список целевых хостов, инициатор запроса и прочие опциональные поля github.com github.com. Примерно это выглядит так:

  • version – версия формата (для совместимости).

  • query – текст DDL (CREATE TABLE ... ON CLUSTER ...) – специальным образом экранируется и сохраняется.

  • hosts – список целевых серверов кластера, на которых надо выполнить запрос (в виде массива строк "host:port" для каждого) github.com. Этот список формируется на стороне инициатора и включает все реплики целевого кластера.

  • initiator – идентификатор сервера, который инициировал запрос (его host:port) github.com.

  • settings, tracing, initial_query_id и др. – дополнительные поля (могут отсутствовать), например настройки запроса, контекст трасировки и пр., учитываемые в новых версиях формата.

Помимо данных задачи, для каждого узла-записи в очереди создаются специальные дочерние znode-директории: active и finished. Это служебные узлы для отслеживания состояния выполнения:

  • active/ – каталог для отметок о текущем выполнении. Когда сервер берет задачу в работу, он создает внутри active ephemeral-узел с именем своего host ID (обычно строка "host:port"). В коде это явный шаг: “Step 1: Create ephemeral node in active/ status dir” github.com. Этот ephemeral-узел автоматически удалится, если сервер потеряет сессию или завершится, и служит сигналом, что данный сервер сейчас выполняет эту DDL-задачу. Наличие active/<host> защищает запись от преждевременного удаления и позволяет видеть через системные таблицы, кто сейчас выполняет запрос github.com.

  • finished/ – каталог для результатов выполнения. Когда сервер завершает выполнение DDL (успешно или с ошибкой), он создает постоянный узел в finished с именем своего host ID. В этот узел записывается статус выполнения – код 0 при успехе либо текст ошибки github.com. Таким образом, по дочерним узлам в finished/ можно узнать, какие сервера уже выполнили запрос и с каким результатом.

Все задачи хранятся в одном разделе (queue) Keeper – по сути, не раздельно по именам кластеров. Группировка по кластерам происходит логически за счет списка хостов в каждой задаче. То есть запись в очереди сама содержит перечень узлов, для которых она предназначена, и другие сервера (не из списка) эту запись игнорируют. Например, если в кластере cluster_a 5 узлов, то запись будет содержать 5 адресов; другие кластеры (другие списки хостов) имеют свои записи. Так как префикс пути в ZK один (например, /clickhouse/task_queue/ddl), задачи разных кластеров стоят вперемешку, но каждый сервер фильтрует “чужие” задания, глядя на поле hosts github.com. (При желании можно настроить отдельный путь очереди на кластер через конфигурацию, но по умолчанию используется единый путь.)

Определение списка серверов для CREATE TABLE ... ON CLUSTER

При разборе запроса CREATE TABLE ... ON CLUSTER <cluster_name> ClickHouse должен определить, на каких узлах его выполнять. Этот список определяется конфигурацией кластера <cluster_name> в файле настроек (<remote_servers>). Алгоритм следующий:

  • Получение списка адресов кластера. Сервер-инициатор на основе имени кластера находит соответствующий объект Cluster, где описаны шарды и реплики. В коде вызывается метод, который собирает все адреса целевых серверов. Например, cluster->filterAddressesByShardOrReplica(...) вернёт все нужные Address (обычно — адрес каждого реплика каждой шарды, если не указано ограничение только на одну шарду) github.com. Для команды ON CLUSTER без ограничений берутся все реплики во всех шардах указанного кластера.

  • Формирование HostID для каждого сервера. Каждый адрес (структура с host, port и др.) преобразуется в простой идентификатор HostID (имя хоста + порт). Код итерирует по списку адресов и сохраняет их в вектор hosts github.com. Например, адрес node1:9000 станет строкой "node1:9000" в списке. Этот список затем помещается в структуру DDL-записи. Таким образом, в entry.hosts явно перечислены все узлы, где должен исполниться запрос github.com.

  • Идентификация инициатора. Текущий сервер добавляет себя как инициатора: entry.initiator = ... – обычно это тоже строка "host:port" текущего узла github.com. Хотя инициатор может и сам входить в список hosts (если он часть кластера), поле initiator просто указывает, кто запустил команду.

  • Подготовка запроса. Сам AST запрос (CREATE TABLE ...) клонируется и слегка модифицируется: убираются части вроде FORMAT/INTO OUTFILE, подставляются корректные имена БД (если на удалённых репликах разные default DB) через AddDefaultDatabaseVisitor github.com github.com. Однако важно, что текст запроса, который будет записан в ZK, всё ещё содержит кластер (часть ON CLUSTER остаётся внутри или, по крайней мере, имя кластера известно). В результате формируется финальная строка запроса (entry.query), готовая для удалённого запуска.

После этих шагов инициатор с помощью DDLWorker записывает задачу в ZooKeeper (как описано выше). Таким образом, основой для выбора узлов служит конфигурация <remote_servers> на сервере-инициаторе: именно там задан перечень всех реплик кластера. Все эти реплики попадут в список hosts задачи github.com и будут уведомлены о необходимости выполнить запрос. Если кластер настроен с internal_replication=true, то запросы типа CREATE TABLE всё равно должны выполняться на каждой реплике (так как каждая реплика создаёт свою копию таблицы). В некоторых случаях (например, ALTER ... ON CLUSTER для ReplicatedMergeTree) механизм DDL сам решает выполнять операцию только на лидере шарды, но это определяется уже при выполнении (см. ниже) – изначально в список hosts включаются все узлы, а далее возможна координация между репликами одной шарды.

Исполнение задач и очистка очереди distributed_ddl_task

Каждый сервер ClickHouse, подключенный к Keeper, запускает поток-работник DDL (DDLWorker), который следит за появлением новых узлов в очереди и выполняет их при необходимости. Когда в ZooKeeper появляется новая запись query-..., все серверы её увидят, но каждый проверит, есть ли его собственный адрес в поле hosts этой записи. Если нет – задача игнорируется. Если да – сервер должен выполнить этот SQL у себя.

Процесс выполнения на узле выглядит так:

  1. Захват задачи. Перед выполнением сервер помечает в ZooKeeper, что он приступил к задаче. Для этого создается ephemeral-узел в разделе active/ данной записи. Имя узла – идентификатор хоста. Например, если текущий сервер "node1:9000", он сделает пустой ephemeral .../active/node1:9000. В коде это происходит в функции обработки задачи: создаётся ephemeral-узел через create(... Ephemeral) github.com github.com. Пока этот ephemeral-узел существует, считается, что задача на данном хосте выполняется. (Если процесс упадёт, сессия в ZK закроется и узел пропадёт – другие смогут понять, что выполнение прервано.)

  2. Непосредственно выполнение DDL. После пометки “active” запускается выполнение самого DDL-запроса на данном сервере (создание таблицы, изменение и т.д.). Здесь ClickHouse использует обычные механизмы выполнения запросов, с той лишь разницей, что он может учитывать некоторые особенности распределённых запросов. Например, для Replicated-таблиц некоторые DDL (как TRUNCATE или OPTIMIZE) выполняются только на одной реплике шарды – код проверяет, не должен ли запрос исполняться только на лидере реплики (taskShouldBeExecutedOnLeader и логика с getShardID() для уникальности шарда) и при необходимости ждет блокировку на уровне шарды github.com github.com. Но в случае CREATE TABLE ... ON CLUSTER такой особый режим не требуется – все узлы создают таблицу локально.

  3. Фиксация результата (завершение). После попытки выполнения сервер должен отразить результат в ZooKeeper. Перед этим ephemeral-узел в active/ удаляется (это происходит либо явно, либо автоматически при сессии multi-операцией). Затем создаётся постоянный узел в каталоге finished/ с именем этого хоста. Данные узла – структура ExecutionStatus в текстовом виде: обычно это код 0 при успешном выполнении или код ошибки и сообщение, если что-то пошло не так github.com. В коде формируется операция makeCreateRequest(finished_node_path, status, Persistent) и добавляется в транзакцию ZooKeeper github.com. Таким образом, каждое включённое в hosts реплику либо пометит себя “выполнил успешно”, либо зафиксирует ошибку. После создания узла в finished/ эта реплика считает задачу завершённой для себя.

Повторяя процесс, все указанные в задаче узлы либо выполняют запрос, либо в случае недоступности не помечаются вовсе. Задача считается выполненной на всем кластере, когда каждый указанный хост либо создал свою отметку в finished/, либо явно отпал (например, узел был недоступен долгое время). Инициатор запроса может собрать эти результаты – например, команда ON CLUSTER может ждать выполнения (в зависимости от настроек distributed_ddl_output_mode) или их можно посмотреть через системную таблицу system.distributed_ddl_queue. В любом случае, наличие записей под finished/ для всех реплик означает, что запрос разошёлся по всем узлам. Если какие-то узлы отсутствовали, запись будет висеть без их отметки до очистки (см. ниже) – это сигнал о проблеме на тех узлах.

После исполнения наступает вопрос очистки очереди. Если ничего не делать, узлы и данные об выполнении накапливались бы в ZooKeeper. ClickHouse реализует автоматическое удаление старых записей. За это отвечает отдельный поток в DDLWorker – cleanup. Он периодически (раз в минуту по умолчанию github.com) просыпается и проверяет очередь:

  • Поток собирает список всех задач (узлов query-...) в queue_dir github.com и сортирует их по возрастанию номера (старейшие первыми) github.com. Затем для каждой задачи решает, удалять ее или нет.

  • Условие готовности к удалению. В коде нет жёсткой проверки “все хосты отметились в finished”, вместо этого используется время жизни и размер очереди. Задача считается устаревшей, если с момента её создания прошёл заданный период (по умолчанию task_max_lifetime = 7 дней) либо если длина очереди превысила лимит и данная задача находится за пределами окна из последних N записей (по умолчанию max_tasks_in_queue = 1000) github.com. Эти параметры настраиваются через <distributed_ddl> в конфиге. Таким образом, система гарантирует, что недавно добавленные задания остаются в ZK хотя бы некоторое время, даже если все реплики их выполнили, – чтобы, например, отстающие узлы догнали или администратор мог посмотреть статус. Но очень старые или очень ранние в списке задачи будут считаться завершёнными по истечении времени даже если какая-то реплика так и не отметилась.

  • Процесс удаления. Если условие выполнения/устаревания выполняется (canRemoveQueueEntry вернул true) github.com github.com, поток очистки пытается удалить узел. Удаление делается осторожно: сперва удаляется дочерний узел active целиком (чтобы никто больше не начал выполнение) github.com. Если ZK вернул, что active не пуст (ошибка ZNOTEMPTY), значит какая-то реплика ещё выполняет задачу в данный момент – тогда удаление пропускается (отложится до следующего цикла) github.com. Если active успешно удалён или отсутствует, тогда можно удалять основную запись.

  • Удаляются все дочерние узлы, кроме finished, рекурсивно github.com – то есть чистятся любые возможные active/ (уже удалён), shards/, synced/ и другие служебные, оставляя только finished (поскольку в нём ценные данные о статусах). Затем в одной транзакции Multi удаляется сам узел задачи и папка finished github.com. Такая двухфазная схема нужна, чтобы между удалением active и удалением finished какая-нибудь реплика не успела дописать новый статус – поэтому finished удаляется вместе с узлом, проверяя версию узла (через makeCheckRequest перед удалением github.com). В итоге запись полностью исчезает из ZooKeeper.

  • Поток логирует факт удаления: “Task X is outdated, deleting it” github.com. Если по какой-то причине при удалении возникла ошибка (например, временная потеря связи с Keeper), это логируется, но поток продолжит попытки на следующих итерациях, пока условие удаления сохраняется.

Таким образом, очередь distributed_ddl_task очищается автоматически компонентом DDLWorker (cleanup-поток). Задача считается полностью завершённой для удаления, когда прошло достаточно времени после её выполнения (по умолчанию дни) или когда она сильно “устарела” относительно новых заданий. К этому моменту практически всегда все живые узлы либо выполнили запрос, либо уже давно вышли из строя. Очисткой занимается каждый сервер, имеющий доступ к ZooKeeper – любые узлы кластера могут удалить запись, увидев что она удовлетворяет условиям (это не обязательно делает именно инициатор). Логика устроена так, что даже если несколько узлов параллельно попытаются удалить, за счёт транзакций и эпhemerals в ZK это безопасно (лишние попытки выдадут ZNONODE и проигнорируются). В результате, папка distributed_ddl_task в Keeper не разрастается бесконечно: старые выполненные задания удаляются, остаются только актуальные или недавно выполненные (для контроля и отставших реплик).

Примечание: если какой-то сервер был недоступен долго и пропустил удаление задачи, при возврате он обнаружит в ZooKeeper увеличенный указатель максимального выполненного задания (в replicas/<host>/log_ptr или аналог, если используется механизм реплицированных баз) и поймёт, что старые задания уже не существуют. В таких случаях администратору может потребоваться вручную выровнять состояние (например, создать недостающие таблицы на отставшем узле) – механизм DDL Queue старается доставить запрос всем, но не ждёт бесконечно. Однако для штатных ситуаций (когда узлы кластера доступны) описанная схема гарантирует доставку и выполнение DDL на каждом, с последующей уборкой следов из Keeper.

Источники кода: создание узлов очереди github.com, формат данных задачи github.com, список хостов и инициатор github.com github.com, пометки выполнения active/ и finished/ github.com github.com, условия очистки и удаление задач github.com github.com.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment