Created
May 16, 2013 22:03
-
-
Save tsuna/5595476 to your computer and use it in GitHub Desktop.
Complex async scanning and processing with asynchbase
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This code is untested, may not even compile, but illustrates the idea. | |
class ComplexAsyncScanLoopDemo { | |
public Deferred<Object> scanAndProcess(final String start, final String stop) { | |
// This is the Deferred that the caller will wait on until everything | |
// we're doing has completed. If there was an object we wanted to return | |
// asynchronously to them as a result of whatever we're doing, we'd hand | |
// it to this Deferred once we're done. | |
final Deferred<Object> result = new Deferred<Object>(); | |
final Scanner scanner = client.newScanner(); | |
// Setup scanner... | |
scanner.setStartKey(start); | |
scanner.setStopKey(stop); | |
final int max_in_flight = 8; | |
// We use this counter to keep track of how many more batches we can | |
// process concurrently. When this counter drops down to 0, then we | |
// have max_in_flight batches running, so we'll have to pause scanning | |
// until this counter returns to 1. | |
final AtomicInteger inflight_slots = new AtomicInteger(max_in_flight); | |
// Callback that iterates asynchronously over the rows returned by the | |
// scanner. | |
class ScanLoop implements Callback<Object, ArrayList<ArrayList<KeyValue>>> { | |
boolean done = false; // Has our scanner reached the end already? | |
// What's the callback that should get triggered upon completion of a | |
// batch of work. | |
private final Callback<Object, Object> processingcb; | |
ScanLoop(Callback<Object, Object> processingcb) { | |
this.processingcb = processingcb; | |
} | |
void scan() { | |
scanner.nextRows().addCallback(ScanLoop.this); | |
} | |
public Object call(final ArrayList<ArrayList<KeyValue>> rows) { | |
if (rows == null) { // We've reached the end of the scanner. | |
done = true; // So remember that here. We're not 100% done yet, | |
return null; // some batches may still be in flight. | |
} | |
// Kick off your deletes or whatever you want. | |
process(rows).addCallback(processingcb); | |
int slots_left = inflight_slots.get(); | |
assert slots_left >= 0; // This cannot possibly become negative. | |
if (slots_left != 0) { // Can we go fetch more data? | |
inflight_slots.decrementAndGet(); // Yes, so decrement and... | |
scan(); // ... kick off another batch. | |
} // else: already as many batches in flight as we allow, | |
// so "pause" scanning by not kicking off another scan(). | |
return null; | |
} | |
} | |
// Whenever a batch of work completes, this callback gets executed. | |
class BatchCompletedCB implements Callback<Object, Object> { | |
public Object call(final Object unused) { | |
// Start by signaling that a batch completed by incrementing our | |
// counter (remember the counter indicates how many more batches | |
// can still be scheduled according to our limit). | |
int slots_left = inflight_slots.incrementAndGet(); | |
if (slots_left == 1 && !loop.done) { | |
// if we get here it's because slots_left was zero, and we're the | |
// first ones to increment it so we got one. If slots_left was | |
// zero, it means that scanning had ceased as we had too many | |
// batches in flight at the same time. So we should resume scanning | |
// at this point: | |
loop.scan(); | |
} else if (slots_left == max_in_flight && loop.done) { | |
// if we get here it's because we're the last batch to complete: the | |
// scanning loop indicated it was done scanning everything, and | |
// we're the last one to increment the counter, thereby returning | |
// its value to where it started from, at `max_in_flight'. | |
result.callback(null); // So indicate our caller we're all done. | |
} | |
return null; | |
} | |
} | |
final ScanLoop loop = new ScanLoop(new BatchCompletedCB()); | |
loop.scan(); // Kick off the whole dance. | |
return result; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I think there is a subtle race in that code. The
boolean done
should be madevolatile
, and upon settingdone = true
inScanLoop
we need to check whetherslots_left == max_in_flight
, in which case we need to callresult.callback(null);