Skip to content

Instantly share code, notes, and snippets.

@philipz
Created July 15, 2025 10:07
Show Gist options
  • Save philipz/6310ad7ef354600a49f0b5fdd3bc6334 to your computer and use it in GitHub Desktop.
Save philipz/6310ad7ef354600a49f0b5fdd3bc6334 to your computer and use it in GitHub Desktop.
Debezium WAL 容量問題完整解決方案

Debezium WAL 容量問題完整解決方案

1. WAL 檔案增長的根本原因分析

1.1 Debezium CDC 抄寫過程中 WAL 檔案累積的機制

核心機制分析: PostgreSQL 的 Write-Ahead Log (WAL) 在 Debezium CDC 場景中扮演關鍵角色。當資料變更時,變更首先寫入 WAL,然後才更新資料檔案。Debezium 使用 PostgreSQL 的邏輯複製功能,透過 pgoutput 或 wal2json 外掛程式從 WAL 中讀取變更事件。

WAL 累積的具體流程:

  1. 交易啟動:資料修改先寫入 WAL
  2. WAL 刷新:WAL 記錄在交易提交前刷新到磁碟
  3. 複製槽保護:Debezium 建立的複製槽(replication slot)防止 WAL 被清理
  4. 消費確認:只有在 Debezium 確認處理完成後,WAL 才能被回收

Full-Page Writes 的影響:

  • 檢查點後首次頁面修改需要寫入完整頁面(8KB)
  • 頻繁檢查點導致更多 Full-Page Writes
  • 高寫入負載下,WAL 生成量可達每分鐘數 GB

1.2 PostgreSQL WAL 管理機制與 Debezium 的交互作用

複製槽行為模式:

-- 檢查複製槽狀態
SELECT slot_name, active, wal_status, safe_wal_size,
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as retained_wal
FROM pg_replication_slots;

WAL 保留邏輯:

  • restart_lsn:複製槽可能需要的最早 LSN
  • confirmed_flush_lsn:已確認處理的最新 LSN
  • WAL 保護機制:PostgreSQL 保留從 restart_lsn 到當前位置的所有 WAL

1.3 大量抄寫場景下的 WAL 生成模式

高負載特徵:

  • 正常操作:16MB WAL 段按順序生成
  • 高寫入負載:每分鐘可生成多 GB WAL
  • 檢查點頻率影響:頻繁檢查點增加 WAL 容量

大量抄寫的 WAL 生成計算:

百萬級記錄場景:
- 平均記錄大小:1KB
- 每日 WAL 生成:約 1GB
- 推薦 max_wal_size:8-16GB

千萬級記錄場景:
- 平均記錄大小:1KB  
- 每日 WAL 生成:約 10GB
- 推薦 max_wal_size:32-64GB

2. WAL 容量問題導致 Kafka 錯誤的技術鏈路

2.1 WAL 空間不足的影響鏈路

錯誤傳播路徑:

  1. WAL 磁碟空間不足 → PostgreSQL 暫停寫入操作
  2. Debezium 連接中斷 → 複製槽變為非活動狀態
  3. Kafka Connect 任務失敗 → 連接器狀態變為 FAILED
  4. 資料抄寫中斷 → 下游系統失去資料更新

具體錯誤表現:

PostgreSQL 錯誤:
ERROR: could not write to file "pg_wal/00000001000000000000001A": No space left on device

Debezium 錯誤:
[2025-01-15 10:30:15] ERROR Replication slot "debezium_slot" has been invalidated
[2025-01-15 10:30:15] WARN WAL file 'pg_wal/00000001000000000000001A' has already been removed

Kafka Connect 錯誤:
ConnectException: The connector has encountered an error while fetching events from the database

2.2 Kafka Connect 框架的錯誤處理機制

任務失敗處理:

# 錯誤容忍配置
errors.tolerance=all
errors.retry.timeout=300000
errors.retry.delay.max.ms=60000

# 死信佇列配置
errors.deadletterqueue.topic.name=debezium-dlq
errors.deadletterqueue.context.headers.enable=true

自動重試機制:

  • 任務失敗後不會自動重啟
  • 需要透過 REST API 手動重啟:POST /connectors/{name}/restart
  • 重啟後從上次記錄的 LSN 位置繼續

2.3 抄寫中斷的具體技術原因

主要中斷場景:

  1. 複製槽失效:超過 max_slot_wal_keep_size 限制
  2. 網路連接中斷:資料庫連接池耗盡
  3. 序列化錯誤:Schema 不一致導致的轉換失敗
  4. 記憶體不足:Kafka Connect 工作節點記憶體不足

3. 生產環境解決方案

3.1 PostgreSQL WAL 配置最佳化

核心配置參數:

# 基本 WAL 設定
wal_level = logical
max_wal_size = 32GB              # 適用於千萬級記錄
min_wal_size = 2GB
wal_keep_size = 8GB

# PostgreSQL 13+ 新參數
max_slot_wal_keep_size = 50GB    # 防止無限 WAL 保留

# 檢查點配置
checkpoint_timeout = 30min       # 減少檢查點頻率
checkpoint_completion_target = 0.9

# 複製設定
max_replication_slots = 20
max_wal_senders = 20

不同規模的配置建議:

# 百萬級記錄配置
max_wal_size = 8GB
max_slot_wal_keep_size = 10GB
checkpoint_timeout = 20min

# 千萬級記錄配置  
max_wal_size = 32GB
max_slot_wal_keep_size = 50GB
checkpoint_timeout = 30min
wal_buffers = 256MB

3.2 Debezium Connector 配置調優

核心性能配置:

# 連接設定
database.hostname=localhost
database.port=5432
slot.name=debezium_slot
publication.name=dbz_publication
plugin.name=pgoutput

# 快照配置
snapshot.mode=incremental
snapshot.fetch.size=20000
snapshot.parallel.threads=4
incremental.snapshot.chunk.size=4096

# 批次處理優化
max.batch.size=8192
max.queue.size=32768
max.queue.size.in.bytes=419430400
poll.interval.ms=100

# 心跳機制
heartbeat.interval.ms=30000
heartbeat.action.query=SELECT pg_logical_emit_message(false, 'debezium-heartbeat', now()::text)

LSN 管理配置:

# LSN 確認控制
flush.lsn.source=true
read.only=false
provide.transaction.metadata=true

# 錯誤處理
retriable.restart.connector.wait.ms=30000
connection.timeout.ms=30000

3.3 Kafka Connect 相關設定優化

工作節點配置:

# 核心設定
group.id=connect-cluster
offset.storage.topic=connect-offsets
config.storage.topic=connect-configs
status.storage.topic=connect-status

# 複製因子
offset.storage.replication.factor=3
config.storage.replication.factor=3
status.storage.replication.factor=3

# 性能調優
tasks.max=8
connector.client.config.override.policy=All
consumer.max.poll.records=500
producer.batch.size=32768
producer.linger.ms=100
producer.compression.type=lz4

JVM 記憶體配置:

# Kafka Connect JVM 設定
export KAFKA_HEAP_OPTS="-Xmx8g -Xms8g"
export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35"

3.4 監控和告警機制建立

關鍵監控指標:

-- WAL 使用量監控
SELECT pg_size_pretty(sum(size)) as total_wal_size 
FROM pg_ls_waldir();

-- 複製槽延遲監控
SELECT slot_name, 
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag_bytes
FROM pg_replication_slots 
WHERE active = true;

-- 檢查點頻率監控
SELECT checkpoints_timed, checkpoints_req,
       round(checkpoints_req::numeric / (checkpoints_timed + checkpoints_req) * 100, 2) as pct_req_checkpoints
FROM pg_stat_bgwriter;

告警閾值設定:

# Prometheus 告警規則
groups:
  - name: debezium_wal_alerts
    rules:
      - alert: WalDirectoryTooLarge
        expr: pg_wal_directory_size_bytes > 40 * 1024 * 1024 * 1024
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "WAL 目錄大小超過 40GB"
          
      - alert: ReplicationSlotLagTooHigh
        expr: pg_replication_slot_lag_bytes > 1 * 1024 * 1024 * 1024
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "複製槽延遲超過 1GB"

4. 預防性架構設計

4.1 WAL 檔案清理策略

自動清理機制:

-- 建立清理函數
CREATE OR REPLACE FUNCTION cleanup_old_wal()
RETURNS void AS $$
BEGIN
    -- 檢查並清理過期的 WAL 檔案
    PERFORM pg_switch_wal();
    
    -- 強制執行檢查點
    CHECKPOINT;
    
    -- 記錄清理動作
    INSERT INTO wal_cleanup_log VALUES (now(), 'Cleanup completed');
END;
$$ LANGUAGE plpgsql;

-- 設定定期清理任務
SELECT cron.schedule('cleanup-wal', '0 */6 * * *', 'SELECT cleanup_old_wal();');

複製槽管理策略:

-- 監控複製槽狀態
CREATE VIEW slot_health AS
SELECT slot_name, 
       active,
       wal_status,
       pg_size_pretty(safe_wal_size) as safe_wal_size,
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag
FROM pg_replication_slots;

-- 自動清理無效槽
CREATE OR REPLACE FUNCTION cleanup_invalid_slots()
RETURNS void AS $$
BEGIN
    -- 刪除標記為 'lost' 的槽
    FOR slot_record IN 
        SELECT slot_name FROM pg_replication_slots 
        WHERE wal_status = 'lost'
    LOOP
        EXECUTE format('SELECT pg_drop_replication_slot(%L)', slot_record.slot_name);
    END LOOP;
END;
$$ LANGUAGE plpgsql;

4.2 分批抄寫的實作模式

增量快照實作:

# 增量快照配置
snapshot.mode=incremental
incremental.snapshot.chunk.size=2048
incremental.snapshot.watermarking.strategy=insert_insert

# 信號觸發快照
# 透過信號表觸發特定表的快照
signal.data.collection=public.debezium_signal
signal.enabled.channels=source,jmx

分批處理程式碼範例:

// 自定義分批處理邏輯
public class BatchSnapshotProcessor {
    private static final int BATCH_SIZE = 10000;
    
    public void processBatch(String tableName, long startOffset, long endOffset) {
        String query = String.format(
            "SELECT * FROM %s WHERE id BETWEEN %d AND %d ORDER BY id",
            tableName, startOffset, endOffset
        );
        
        // 執行批次查詢並處理結果
        processResults(executeQuery(query));
    }
    
    public void triggerIncrementalSnapshot(String tableName) {
        String signalQuery = String.format(
            "INSERT INTO debezium_signal VALUES ('snap-%s', 'execute-snapshot', " +
            "'{\"data-collections\": [\"%s\"], \"type\": \"incremental\"}')",
            tableName, tableName
        );
        
        executeSignal(signalQuery);
    }
}

4.3 備份和恢復策略

WAL 歸檔配置:

# PostgreSQL 歸檔配置
archive_mode = on
archive_command = 'cp %p /backup/wal_archive/%f'
archive_timeout = 300

# 歸檔清理策略
wal_keep_size = 4GB
max_slot_wal_keep_size = 20GB

備份腳本範例:

#!/bin/bash
# WAL 備份腳本

BACKUP_DIR="/backup/wal_archive"
RETENTION_DAYS=7

# 建立備份目錄
mkdir -p $BACKUP_DIR

# 執行 WAL 歸檔
pg_basebackup -D $BACKUP_DIR/base_backup_$(date +%Y%m%d) -Ft -z -P

# 清理過期備份
find $BACKUP_DIR -type f -mtime +$RETENTION_DAYS -delete

# 記錄備份狀態
echo "$(date): WAL backup completed" >> /var/log/postgresql/backup.log

4.4 容量規劃建議

儲存容量計算:

WAL 容量規劃公式:
總 WAL 容量 = 每日 WAL 生成量 × 保留天數 × 安全係數

範例計算:
- 每日資料變更:1000萬筆記錄
- 平均記錄大小:1KB
- 每日 WAL 生成:約 10GB
- 保留期間:3天
- 安全係數:1.5
- 建議容量:10GB × 3 × 1.5 = 45GB

對應配置:
max_wal_size = 32GB
max_slot_wal_keep_size = 50GB

效能規劃建議:

# 百萬級記錄環境
hardware:
  cpu: 8-16 cores
  memory: 32GB
  storage: 500GB SSD
  network: 1Gbps

postgresql_config:
  max_wal_size: 8GB
  shared_buffers: 8GB
  effective_cache_size: 24GB

# 千萬級記錄環境
hardware:
  cpu: 16-32 cores
  memory: 64GB
  storage: 1TB NVMe
  network: 10Gbps

postgresql_config:
  max_wal_size: 32GB
  shared_buffers: 16GB
  effective_cache_size: 48GB

5. 故障恢復和資料一致性

5.1 中斷後的恢復程序

自動恢復流程:

#!/bin/bash
# Debezium 恢復腳本

CONNECTOR_NAME="postgres-connector"
CONNECT_URL="http://localhost:8083"

# 檢查連接器狀態
check_connector_status() {
    STATUS=$(curl -s $CONNECT_URL/connectors/$CONNECTOR_NAME/status | jq -r '.connector.state')
    echo "Current status: $STATUS"
    return $([ "$STATUS" = "RUNNING" ])
}

# 重啟連接器
restart_connector() {
    echo "Restarting connector..."
    curl -X POST $CONNECT_URL/connectors/$CONNECTOR_NAME/restart
    
    # 等待連接器啟動
    sleep 10
    
    # 重啟任務
    curl -X POST $CONNECT_URL/connectors/$CONNECTOR_NAME/tasks/0/restart
}

# 驗證複製槽狀態
verify_replication_slot() {
    psql -c "SELECT slot_name, active, wal_status FROM pg_replication_slots WHERE slot_name = 'debezium_slot';"
}

# 主要恢復邏輯
main() {
    if ! check_connector_status; then
        restart_connector
        verify_replication_slot
        
        # 等待穩定
        sleep 30
        
        if check_connector_status; then
            echo "Recovery successful"
        else
            echo "Recovery failed, manual intervention required"
            exit 1
        fi
    fi
}

main

5.2 資料完整性驗證

一致性檢查腳本:

-- 建立資料一致性檢查函數
CREATE OR REPLACE FUNCTION verify_data_consistency(
    source_table text,
    target_table text,
    pk_column text
) RETURNS TABLE(
    missing_in_target bigint,
    missing_in_source bigint,
    checksum_mismatch bigint
) AS $$
BEGIN
    -- 檢查目標表中缺少的記錄
    SELECT COUNT(*) INTO missing_in_target
    FROM (
        SELECT pk_column FROM source_table
        EXCEPT
        SELECT pk_column FROM target_table
    ) t;
    
    -- 檢查源表中缺少的記錄
    SELECT COUNT(*) INTO missing_in_source
    FROM (
        SELECT pk_column FROM target_table
        EXCEPT
        SELECT pk_column FROM source_table
    ) t;
    
    -- 檢查內容不一致的記錄
    SELECT COUNT(*) INTO checksum_mismatch
    FROM source_table s
    JOIN target_table t ON s.pk_column = t.pk_column
    WHERE md5(s::text) != md5(t::text);
    
    RETURN QUERY SELECT missing_in_target, missing_in_source, checksum_mismatch;
END;
$$ LANGUAGE plpgsql;

自動對帳程序:

import psycopg2
import hashlib
from datetime import datetime

class DataConsistencyChecker:
    def __init__(self, source_conn, target_conn):
        self.source_conn = source_conn
        self.target_conn = target_conn
    
    def check_table_consistency(self, table_name, pk_column):
        # 獲取源表資料的校驗和
        source_checksum = self.get_table_checksum(
            self.source_conn, table_name, pk_column
        )
        
        # 獲取目標表資料的校驗和
        target_checksum = self.get_table_checksum(
            self.target_conn, table_name, pk_column
        )
        
        # 比較校驗和
        inconsistent_records = []
        for pk, source_hash in source_checksum.items():
            if pk not in target_checksum:
                inconsistent_records.append({
                    'pk': pk, 
                    'issue': 'missing_in_target'
                })
            elif source_hash != target_checksum[pk]:
                inconsistent_records.append({
                    'pk': pk, 
                    'issue': 'checksum_mismatch'
                })
        
        return inconsistent_records
    
    def get_table_checksum(self, conn, table_name, pk_column):
        cursor = conn.cursor()
        cursor.execute(f"""
            SELECT {pk_column}, md5(t.*::text) as checksum 
            FROM {table_name} t
            ORDER BY {pk_column}
        """)
        
        return {row[0]: row[1] for row in cursor.fetchall()}
    
    def generate_consistency_report(self, table_name):
        inconsistencies = self.check_table_consistency(table_name, 'id')
        
        report = {
            'timestamp': datetime.now().isoformat(),
            'table': table_name,
            'total_inconsistencies': len(inconsistencies),
            'issues': inconsistencies
        }
        
        return report

5.3 LSN 管理

LSN 追蹤和恢復:

-- 建立 LSN 追蹤表
CREATE TABLE lsn_tracking (
    connector_name varchar(100),
    table_name varchar(100),
    last_lsn pg_lsn,
    last_updated timestamp DEFAULT now(),
    PRIMARY KEY (connector_name, table_name)
);

-- LSN 更新函數
CREATE OR REPLACE FUNCTION update_lsn_tracking(
    p_connector_name varchar,
    p_table_name varchar,
    p_lsn pg_lsn
) RETURNS void AS $$
BEGIN
    INSERT INTO lsn_tracking (connector_name, table_name, last_lsn)
    VALUES (p_connector_name, p_table_name, p_lsn)
    ON CONFLICT (connector_name, table_name)
    DO UPDATE SET 
        last_lsn = p_lsn,
        last_updated = now();
END;
$$ LANGUAGE plpgsql;

-- 查詢 LSN 延遲
CREATE VIEW lsn_lag AS
SELECT connector_name,
       table_name,
       last_lsn,
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), last_lsn)) as lag_bytes,
       last_updated
FROM lsn_tracking;

6. 企業級最佳實務

6.1 多租戶環境下的 WAL 管理

租戶隔離策略:

# 租戶 A 的連接器配置
name=tenant-a-connector
table.include.list=tenant_a.orders,tenant_a.customers
slot.name=debezium_slot_tenant_a
publication.name=tenant_a_publication
topic.prefix=tenant_a

# 租戶 B 的連接器配置
name=tenant-b-connector
table.include.list=tenant_b.orders,tenant_b.customers
slot.name=debezium_slot_tenant_b
publication.name=tenant_b_publication
topic.prefix=tenant_b

Schema 層級隔離:

-- 為每個租戶建立獨立的 Publication
CREATE PUBLICATION tenant_a_publication FOR TABLES IN SCHEMA tenant_a;
CREATE PUBLICATION tenant_b_publication FOR TABLES IN SCHEMA tenant_b;

-- 建立租戶特定的複製槽
SELECT pg_create_logical_replication_slot('tenant_a_slot', 'pgoutput');
SELECT pg_create_logical_replication_slot('tenant_b_slot', 'pgoutput');

6.2 高可用性配置

Kafka Connect 叢集部署:

# Kubernetes 部署配置
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-connect
spec:
  replicas: 3
  selector:
    matchLabels:
      app: kafka-connect
  template:
    metadata:
      labels:
        app: kafka-connect
    spec:
      containers:
      - name: connect
        image: debezium/connect:2.4
        ports:
        - containerPort: 8083
        env:
        - name: GROUP_ID
          value: "connect-cluster"
        - name: CONFIG_STORAGE_TOPIC
          value: "connect-configs"
        - name: OFFSET_STORAGE_TOPIC
          value: "connect-offsets"
        - name: STATUS_STORAGE_TOPIC
          value: "connect-status"
        resources:
          requests:
            memory: "4Gi"
            cpu: "2"
          limits:
            memory: "8Gi"
            cpu: "4"

PostgreSQL 高可用性配置:

# 主伺服器配置
wal_level = logical
max_wal_senders = 10
max_replication_slots = 10
hot_standby = on

# 備用伺服器配置
hot_standby = on
hot_standby_feedback = on
max_standby_streaming_delay = 30s

6.3 效能調優指標

關鍵效能指標 (KPI):

performance_metrics:
  throughput:
    - events_per_second: "目標: >10,000"
    - bytes_per_second: "目標: >10MB"
    
  latency:
    - end_to_end_latency: "目標: <100ms"
    - wal_flush_latency: "目標: <10ms"
    
  reliability:
    - connector_uptime: "目標: >99.9%"
    - data_loss_events: "目標: 0"
    
  resource_utilization:
    - cpu_usage: "目標: <70%"
    - memory_usage: "目標: <80%"
    - disk_usage: "目標: <85%"

效能調優腳本:

#!/bin/bash
# 效能調優腳本

# 檢查 PostgreSQL 效能
check_postgresql_performance() {
    echo "=== PostgreSQL 效能檢查 ==="
    
    # 檢查 WAL 生成速率
    psql -c "SELECT pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), '0/0')) as wal_generated;"
    
    # 檢查檢查點統計
    psql -c "SELECT * FROM pg_stat_bgwriter;"
    
    # 檢查複製槽狀態
    psql -c "SELECT slot_name, active, wal_status, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag FROM pg_replication_slots;"
}

# 檢查 Kafka Connect 效能
check_kafka_connect_performance() {
    echo "=== Kafka Connect 效能檢查 ==="
    
    # 檢查連接器狀態
    curl -s http://localhost:8083/connectors | jq
    
    # 檢查任務狀態
    curl -s http://localhost:8083/connectors/postgres-connector/status | jq
}

# 系統資源檢查
check_system_resources() {
    echo "=== 系統資源檢查 ==="
    
    # CPU 使用率
    top -bn1 | grep "Cpu(s)" | awk '{print $2}' | cut -d'%' -f1
    
    # 記憶體使用率
    free -h
    
    # 磁碟使用率
    df -h
}

# 執行所有檢查
main() {
    check_postgresql_performance
    check_kafka_connect_performance
    check_system_resources
    
    echo "效能檢查完成"
}

main

結論

Debezium WAL 容量問題的解決需要系統性的方法,從資料庫配置、連接器優化到架構設計都需要綜合考慮。關鍵成功因素包括:

  1. 適當的 WAL 容量規劃:根據資料量合理設定 max_wal_sizemax_slot_wal_keep_size
  2. 有效的監控機制:即時追蹤 WAL 使用量和複製槽狀態
  3. 完善的錯誤處理:建立自動恢復和告警機制
  4. 預防性維護:定期清理、容量規劃和效能調優
  5. 企業級架構:高可用性、多租戶支援和災難恢復

透過實施這些解決方案,可以確保 Debezium CDC 系統在大規模資料環境中的穩定運行,實現高效能的資料同步和一致性保證。

@philipz
Copy link
Author

philipz commented Jul 15, 2025

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment