MirrorSourceTask.java
// true if we should emit an offset sync
boolean update(long upstreamOffset, long downstreamOffset) {
boolean shouldSyncOffsets = false;
long upstreamStep = upstreamOffset - lastSyncUpstreamOffset;
long downstreamTargetOffset = lastSyncDownstreamOffset + upstreamStep;
if (lastSyncDownstreamOffset == -1L
|| downstreamOffset - downstreamTargetOffset >= maxOffsetLag
|| upstreamOffset - previousUpstreamOffset != 1L
|| downstreamOffset < previousDownstreamOffset) {
lastSyncUpstreamOffset = upstreamOffset;
lastSyncDownstreamOffset = downstreamOffset;
shouldSyncOffsets = true;
}
previousUpstreamOffset = upstreamOffset;
previousDownstreamOffset = downstreamOffset;
return shouldSyncOffsets;
}
由于重试,下游集群消息有可能会重,从而导致同一条消息在下游集群的位点和上游集群的差值越来越大。
如下:
1 2 3 4 5 6 7 8 9
11 12 13 15 16 17 18 19 20
14
10 10 11 11 11 11 11 11 11
(downstreamOffset - upstreamOffset) - (lastSyncDownstreamOffset - lastSyncUpstreamOffset) >= maxOffsetLag
@Test
public void testOffsetSync() {
MirrorSourceTask.PartitionState partitionState = new MirrorSourceTask.PartitionState(50);
assertTrue(partitionState.update(0, 100), "always emit offset sync on first update");
assertTrue(partitionState.update(2, 102), "upstream offset skipped -> resync");
assertFalse(partitionState.update(3, 152), "no sync");
assertFalse(partitionState.update(4, 153), "no sync");
assertFalse(partitionState.update(5, 154), "no sync");
assertTrue(partitionState.update(6, 205), "one past target offset");
assertTrue(partitionState.update(2, 206), "upstream reset");
assertFalse(partitionState.update(3, 207), "no sync");
assertTrue(partitionState.update(4, 3), "downstream reset");
assertFalse(partitionState.update(5, 4), "no sync");
}
@Test
public void testZeroOffsetSync() {
MirrorSourceTask.PartitionState partitionState = new MirrorSourceTask.PartitionState(0);
// if max offset lag is zero, should always emit offset syncs
assertTrue(partitionState.update(0, 100), "zeroOffsetSync downStreamOffset 100 is incorrect");
assertTrue(partitionState.update(2, 102), "zeroOffsetSync downStreamOffset 102 is incorrect");
assertTrue(partitionState.update(3, 153), "zeroOffsetSync downStreamOffset 153 is incorrect");
assertTrue(partitionState.update(4, 154), "zeroOffsetSync downStreamOffset 154 is incorrect");
assertTrue(partitionState.update(5, 155), "zeroOffsetSync downStreamOffset 155 is incorrect");
assertTrue(partitionState.update(6, 207), "zeroOffsetSync downStreamOffset 207 is incorrect");
assertTrue(partitionState.update(2, 208), "zeroOffsetSync downStreamOffset 208 is incorrect");
assertTrue(partitionState.update(3, 209), "zeroOffsetSync downStreamOffset 209 is incorrect");
assertTrue(partitionState.update(4, 3), "zeroOffsetSync downStreamOffset 3 is incorrect");
assertTrue(partitionState.update(5, 4), "zeroOffsetSync downStreamOffset 4 is incorrect");
}
使用独立集群时注意其缺失REST能力,从而导致无法感知主题/消费组的变更引起新增的部分无法同步!
社区也以试到该问题并提出了改进,详见KIP-710状态