Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save den-crane/922c5457b60fcb3e49cf20b07a2c5ee3 to your computer and use it in GitHub Desktop.
Save den-crane/922c5457b60fcb3e49cf20b07a2c5ee3 to your computer and use it in GitHub Desktop.
Clickhouse vector.dev Kafka source example
sources:
  sourceKafka:
    type: kafka
    bootstrap_servers: "kraftbroker1.mykafka.localdomain:9092"
    group_id: "kafka_logs"
    topics: ["topic1", "topic2", "topic3"]
    sasl:
      enabled: true
      mechanism: "PLAIN"
      username: "admin"
      password: "xxxx"
    tls:
      enabled: true
      verify_certificate: false
      verify_hostname: false
      ca_file: "/etc/vector/kafka_tls/cert-signed_Kafka.pem"
      crt_file: "/etc/vector/kafka_tls/ca-cert_Kafka.pem"
      key_file: "/etc/vector/kafka_tls/key_Kafka.pem"
      key_pass: "yyyyyy"
    auto_offset_reset: "beginning"

transforms:
  to_1:
    type: remap
    inputs:
      - "sourceKafka"
    source: |
      .message = .message
      .topic = .topic
      .message_key = .message_key
      .offset = .offset
      .partition = .partition
      .headers = .headers

sinks:
  clickhouse:
    inputs:
      - "to_1"
    type: clickhouse
    host: "http://localhost:8123"
    auth:
      strategy: "basic"
      user: "logs_user"
      password: "zzzz"
    database: "logs"
    table: "logs_null"
    encoding:
      timestamp_format: "unix"
    healthcheck: true
    buffer:
      type: "disk"
      max_size: 524288000
      when_full: "block"
    batch:
      max_events: 100000
      max_bytes:  104857600
      timeout_secs: 10
create user logs_user identified with sha256_password by 'zzzz' on cluster '{cluster}';
GRANT SELECT, INSERT ON logs.logs_null TO logs_user on cluster '{cluster}';
GRANT INSERT ON logs.logs_shard TO logs_user on cluster '{cluster}';


create database logs on cluster '{cluster}';

CREATE TABLE logs.logs_null on cluster '{cluster}'
(
  `message` String,
  `topic` String,
  `headers` String,
  `message_key` String,
  `partition` Int64,
  `offset` Int64,
  `timestamp` String
)
ENGINE = Null;


CREATE TABLE logs.logs_shard on cluster '{cluster}'
(
  `message` String,
  `topic` String,
  `headers` String
  `message_key` String,
  `partition` Int64,
  `offset` Int64,
  `timestamp` DateTime64(3) Default now64(3)
)
ENGINE = MergeTree
PARTITION BY toDate(timestamp)
ORDER BY (timestamp)
TTL toDate(timestamp) + toIntervalMonth(6)
SETTINGS index_granularity = 8192, max_bytes_to_merge_at_max_space_in_pool = '40G', ttl_only_drop_parts = 1


CREATE MATERIALIZED VIEW logs.logs_shard_mv on cluster '{cluster}' TO logs.logs_shard
AS SELECT
      message,
      topic,
      headers,
      message_key,
      partition,
      offset,
      if( (parseDateTime64BestEffortOrZero(timestamp) AS _ts) = 0, now64(3), _ts) AS timestamp
FROM logs.logs_null;


CREATE TABLE logs.logs on cluster '{cluster}' as logs.logs_shard ENGINE = Distributed('{cluster}', 'logs', 'logs_shard')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment