sources:
sourceKafka:
type: kafka
bootstrap_servers: "kraftbroker1.mykafka.localdomain:9092"
group_id: "kafka_logs"
topics: ["topic1", "topic2", "topic3"]
sasl:
enabled: true
mechanism: "PLAIN"
username: "admin"
password: "xxxx"
tls:
enabled: true
verify_certificate: false
verify_hostname: false
ca_file: "/etc/vector/kafka_tls/cert-signed_Kafka.pem"
crt_file: "/etc/vector/kafka_tls/ca-cert_Kafka.pem"
key_file: "/etc/vector/kafka_tls/key_Kafka.pem"
key_pass: "yyyyyy"
auto_offset_reset: "beginning"
transforms:
to_1:
type: remap
inputs:
- "sourceKafka"
source: |
.message = .message
.topic = .topic
.message_key = .message_key
.offset = .offset
.partition = .partition
.headers = .headers
sinks:
clickhouse:
inputs:
- "to_1"
type: clickhouse
host: "http://localhost:8123"
auth:
strategy: "basic"
user: "logs_user"
password: "zzzz"
database: "logs"
table: "logs_null"
encoding:
timestamp_format: "unix"
healthcheck: true
buffer:
type: "disk"
max_size: 524288000
when_full: "block"
batch:
max_events: 100000
max_bytes: 104857600
timeout_secs: 10
create user logs_user identified with sha256_password by 'zzzz' on cluster '{cluster}';
GRANT SELECT, INSERT ON logs.logs_null TO logs_user on cluster '{cluster}';
GRANT INSERT ON logs.logs_shard TO logs_user on cluster '{cluster}';
create database logs on cluster '{cluster}';
CREATE TABLE logs.logs_null on cluster '{cluster}'
(
`message` String,
`topic` String,
`headers` String,
`message_key` String,
`partition` Int64,
`offset` Int64,
`timestamp` String
)
ENGINE = Null;
CREATE TABLE logs.logs_shard on cluster '{cluster}'
(
`message` String,
`topic` String,
`headers` String
`message_key` String,
`partition` Int64,
`offset` Int64,
`timestamp` DateTime64(3) Default now64(3)
)
ENGINE = MergeTree
PARTITION BY toDate(timestamp)
ORDER BY (timestamp)
TTL toDate(timestamp) + toIntervalMonth(6)
SETTINGS index_granularity = 8192, max_bytes_to_merge_at_max_space_in_pool = '40G', ttl_only_drop_parts = 1
CREATE MATERIALIZED VIEW logs.logs_shard_mv on cluster '{cluster}' TO logs.logs_shard
AS SELECT
message,
topic,
headers,
message_key,
partition,
offset,
if( (parseDateTime64BestEffortOrZero(timestamp) AS _ts) = 0, now64(3), _ts) AS timestamp
FROM logs.logs_null;
CREATE TABLE logs.logs on cluster '{cluster}' as logs.logs_shard ENGINE = Distributed('{cluster}', 'logs', 'logs_shard')