Last active
March 3, 2021 03:40
-
-
Save yangl/ef891a48cc3b30b5080401f163688875 to your computer and use it in GitHub Desktop.
Apache BookKeeper quorum ack LAC 关键代码
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
interface QuorumCoverageSet { | |
/** | |
* Add a bookie to the result set. | |
* | |
* @param bookieIndexHeardFrom Bookie we've just heard from | |
*/ | |
void addBookie(int bookieIndexHeardFrom, int rc); | |
/** | |
* check if all quorum in the set have had the action performed for it. | |
* | |
* @return whether all quorums have been covered | |
*/ | |
boolean checkCovered(); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public void readLacComplete(int rc, long ledgerId, final ByteBuf lacBuffer, final ByteBuf lastEntryBuffer, | |
Object ctx) { | |
int bookieIndex = (Integer) ctx; | |
// add the response to coverage set | |
coverageSet.addBookie(bookieIndex, rc); | |
numResponsesPending--; | |
boolean heardValidResponse = false; | |
if (completed) { | |
return; | |
} | |
if (rc == BKException.Code.OK) { | |
try { | |
// Each bookie may have two store LAC in two places. | |
// One is in-memory copy in FileInfo and other is | |
// piggy-backed LAC on the last entry. | |
// This routine picks both of them and compares to return | |
// the latest Lac. | |
// lacBuffer and lastEntryBuffer are optional in the protocol. | |
// So check if they exist before processing them. | |
// Extract lac from FileInfo on the ledger. | |
if (lacBuffer != null && lacBuffer.readableBytes() > 0) { | |
long lac = lh.macManager.verifyDigestAndReturnLac(lacBuffer); | |
if (lac > maxLac) { | |
maxLac = lac; | |
} | |
} | |
// Extract lac from last entry on the disk | |
if (lastEntryBuffer != null && lastEntryBuffer.readableBytes() > 0) { | |
RecoveryData recoveryData = lh.macManager.verifyDigestAndReturnLastConfirmed(lastEntryBuffer); | |
long recoveredLac = recoveryData.getLastAddConfirmed(); | |
if (recoveredLac > maxLac) { | |
maxLac = recoveredLac; | |
} | |
} | |
heardValidResponse = true; | |
} catch (BKDigestMatchException e) { | |
// Too bad, this bookie did not give us a valid answer, we | |
// still might be able to recover. So, continue | |
LOG.error("Mac mismatch while reading ledger: " + ledgerId + " LAC from bookie: " | |
+ currentEnsemble.get(bookieIndex)); | |
rc = BKException.Code.DigestMatchException; | |
} | |
} | |
if (rc == BKException.Code.NoSuchLedgerExistsException || rc == BKException.Code.NoSuchEntryException) { | |
heardValidResponse = true; | |
} | |
if (rc == BKException.Code.UnauthorizedAccessException && !completed) { | |
cb.getLacComplete(rc, maxLac); | |
completed = true; | |
return; | |
} | |
if (!heardValidResponse && BKException.Code.OK != rc) { | |
lastSeenError = rc; | |
} | |
// We don't consider a success until we have coverage set responses. | |
if (heardValidResponse | |
&& coverageSet.checkCovered() // 检测是否达到指定ack | |
&& !completed) { | |
completed = true; | |
if (LOG.isDebugEnabled()) { | |
LOG.debug("Read LAC complete with enough validResponse for ledger: {} LAC: {}", ledgerId, maxLac); | |
} | |
cb.getLacComplete(BKException.Code.OK, maxLac); | |
return; | |
} | |
if (numResponsesPending == 0 && !completed) { | |
LOG.info("While readLac ledger: " + ledgerId + " did not hear success responses from all of ensemble"); | |
cb.getLacComplete(lastSeenError, maxLac); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public synchronized void readEntryComplete(final int rc, final long ledgerId, final long entryId, | |
final ByteBuf buffer, final Object ctx) { | |
int bookieIndex = (Integer) ctx; | |
// add the response to coverage set | |
coverageSet.addBookie(bookieIndex, rc); | |
numResponsesPending--; | |
boolean heardValidResponse = false; | |
if (rc == BKException.Code.OK) { | |
try { | |
RecoveryData recoveryData = digestManager.verifyDigestAndReturnLastConfirmed(buffer); | |
if (recoveryData.getLastAddConfirmed() > maxRecoveredData.getLastAddConfirmed()) { | |
maxRecoveredData = recoveryData; | |
} | |
heardValidResponse = true; | |
} catch (BKDigestMatchException e) { | |
// Too bad, this bookie didn't give us a valid answer, we | |
// still might be able to recover though so continue | |
LOG.error("Mac mismatch for ledger: " + ledgerId + ", entry: " + entryId | |
+ " while reading last entry from bookie: " | |
+ currentEnsemble.get(bookieIndex)); | |
} | |
} | |
if (rc == BKException.Code.NoSuchLedgerExistsException || rc == BKException.Code.NoSuchEntryException) { | |
// this still counts as a valid response, e.g., if the client crashed without writing any entry | |
heardValidResponse = true; | |
} | |
if (rc == BKException.Code.UnauthorizedAccessException && !completed) { | |
cb.readLastConfirmedDataComplete(rc, maxRecoveredData); | |
completed = true; | |
} | |
if (!heardValidResponse && BKException.Code.OK != rc) { | |
lastSeenError = rc; | |
} | |
// other return codes dont count as valid responses | |
if (heardValidResponse | |
&& coverageSet.checkCovered() // 检测是否达到指定ack | |
&& !completed) { | |
completed = true; | |
if (LOG.isDebugEnabled()) { | |
LOG.debug("Read Complete with enough validResponses for ledger: {}, entry: {}", | |
ledgerId, entryId); | |
} | |
cb.readLastConfirmedDataComplete(BKException.Code.OK, maxRecoveredData); | |
return; | |
} | |
if (numResponsesPending == 0 && !completed) { | |
LOG.error("While readLastConfirmed ledger: {} did not hear success responses from all quorums, {}", | |
ledgerId, coverageSet); | |
cb.readLastConfirmedDataComplete(lastSeenError, maxRecoveredData); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment