Skip to content

Instantly share code, notes, and snippets.

@vutkin
Created January 4, 2023 07:23
Show Gist options
  • Save vutkin/d8e706cfc1a04bb4ed79bd68b56ff933 to your computer and use it in GitHub Desktop.
Save vutkin/d8e706cfc1a04bb4ed79bd68b56ff933 to your computer and use it in GitHub Desktop.
MM2 internal topics
MM2 uses the following internal topics for replication purposes:
Heartbeat topic: Emitted from the source cluster and replicated to demonstrate connectivity through connectors. This can be used by downstream consumers to verify that the connector is running and the corresponding source cluster is available. Messages in this topic contain information on the source cluster, target cluster, and timestamp when the heartbeat was created.
Checkpoint topic: Emitted in the target cluster by the connector and contains consumer offsets for each consumer group in the source cluster. The connector will periodically query the source cluster for all committed offsets of consumer groups (except for replicated topics) and emit a message to this topic. Information in this message includes the consumer group id, topic, partition, upstream offset, downstream offset, metadata, and timestamp. Consumers use the checkpoint topic via MirrorClient or RemoteClusterUtils class to get the replicated offsets in the target Apache Kafka cluster.
Offset sync topics: Encodes cluster-to-cluster offset mapping for each replicated topic-partition. Messages in this topic contain topic, partition, upstream offset, and downstream offset.
The summary of the broker setup process is as follows:
MirrorSourceConnector :
Replicates remote topics, topic ACLs & configs of a single source cluster.
Emits offset-syncs to an internal topic.
MirrorSinkConnector:
Consumes from the primary cluster and replicate topics to a single target cluster.
MirrorCheckpointConnector:
Consumes offset-syncsr.
Emits checkpoints to enable failover points.
MirrorHeartBeatConnector:
Emits heartbeats to remote clusters, enabling monitoring of replication process.
By using the DefaulReplicationPolicy the values will be
heartbeatsTopic() → heartbeats
checkpointsTopic(clusterAlias)→ clusterAlias.checkpoint.internal
offsetSyncsTopic(clusterAlias) → mm2-offset-syncs.clusterAlias.internal
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment