Skip to content

Instantly share code, notes, and snippets.

@yangl
Created January 13, 2022 06:37
Show Gist options
  • Save yangl/ade9a7dc20a07672085f73b71daf6001 to your computer and use it in GitHub Desktop.
Save yangl/ade9a7dc20a07672085f73b71daf6001 to your computer and use it in GitHub Desktop.

MM2同步下游集群消息位点差值更新机制,默认lag大于等于100条就会触发

MirrorSourceTask.java

        // true if we should emit an offset sync
        boolean update(long upstreamOffset, long downstreamOffset) {
            boolean shouldSyncOffsets = false;
            long upstreamStep = upstreamOffset - lastSyncUpstreamOffset;
            long downstreamTargetOffset = lastSyncDownstreamOffset + upstreamStep;
            if (lastSyncDownstreamOffset == -1L
                    || downstreamOffset - downstreamTargetOffset >= maxOffsetLag
                    || upstreamOffset - previousUpstreamOffset != 1L
                    || downstreamOffset < previousDownstreamOffset) {
                lastSyncUpstreamOffset = upstreamOffset;
                lastSyncDownstreamOffset = downstreamOffset;
                shouldSyncOffsets = true;
            }
            previousUpstreamOffset = upstreamOffset;
            previousDownstreamOffset = downstreamOffset;
            return shouldSyncOffsets;
        }

由于重试,下游集群消息有可能会重,从而导致同一条消息在下游集群的位点和上游集群的差值越来越大。

如下:

1	2	3	4	5	6	7	8	9



11	12	13	15	16	17	18	19	20
		14
		
		

10	10	11	11	11	11	11	11	11		



(downstreamOffset - upstreamOffset) - (lastSyncDownstreamOffset - lastSyncUpstreamOffset) >= maxOffsetLag
    @Test
    public void testOffsetSync() {
        MirrorSourceTask.PartitionState partitionState = new MirrorSourceTask.PartitionState(50);

        assertTrue(partitionState.update(0, 100), "always emit offset sync on first update");
        assertTrue(partitionState.update(2, 102), "upstream offset skipped -> resync");
        assertFalse(partitionState.update(3, 152), "no sync");
        assertFalse(partitionState.update(4, 153), "no sync");
        assertFalse(partitionState.update(5, 154), "no sync");
        assertTrue(partitionState.update(6, 205), "one past target offset");
        assertTrue(partitionState.update(2, 206), "upstream reset");
        assertFalse(partitionState.update(3, 207), "no sync");
        assertTrue(partitionState.update(4, 3), "downstream reset");
        assertFalse(partitionState.update(5, 4), "no sync");
    }

    @Test
    public void testZeroOffsetSync() {
        MirrorSourceTask.PartitionState partitionState = new MirrorSourceTask.PartitionState(0);

        // if max offset lag is zero, should always emit offset syncs
        assertTrue(partitionState.update(0, 100), "zeroOffsetSync downStreamOffset 100 is incorrect");
        assertTrue(partitionState.update(2, 102), "zeroOffsetSync downStreamOffset 102 is incorrect");
        assertTrue(partitionState.update(3, 153), "zeroOffsetSync downStreamOffset 153 is incorrect");
        assertTrue(partitionState.update(4, 154), "zeroOffsetSync downStreamOffset 154 is incorrect");
        assertTrue(partitionState.update(5, 155), "zeroOffsetSync downStreamOffset 155 is incorrect");
        assertTrue(partitionState.update(6, 207), "zeroOffsetSync downStreamOffset 207 is incorrect");
        assertTrue(partitionState.update(2, 208), "zeroOffsetSync downStreamOffset 208 is incorrect");
        assertTrue(partitionState.update(3, 209), "zeroOffsetSync downStreamOffset 209 is incorrect");
        assertTrue(partitionState.update(4, 3), "zeroOffsetSync downStreamOffset 3 is incorrect");
        assertTrue(partitionState.update(5, 4), "zeroOffsetSync downStreamOffset 4 is incorrect");
    }
@yangl
Copy link
Author

yangl commented Sep 1, 2022

使用独立集群时注意其缺失REST能力,从而导致无法感知主题/消费组的变更引起新增的部分无法同步!
社区也以试到该问题并提出了改进,详见KIP-710状态

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment