Skip to content

Instantly share code, notes, and snippets.

@CuriousVini
Created April 14, 2016 04:55
Show Gist options
  • Save CuriousVini/e0bb81b3b9773fb5ea94a8a28733f5cc to your computer and use it in GitHub Desktop.
Save CuriousVini/e0bb81b3b9773fb5ea94a8a28733f5cc to your computer and use it in GitHub Desktop.
public class IncrementTest extends LongRunningTestBase<IncrementTestState> {
private static final int BATCH_SIZE = 100;
public static final int SUM_BATCH = (BATCH_SIZE * (BATCH_SIZE - 1)) / 2;
@Override
public void deploy() throws Exception {
deployApplication(getLongRunningNamespace(), IncrementApp.class);
}
@Override
public void start() throws Exception {
getApplicationManager().getFlowManager(IncrementApp.IncrementFlow.NAME).start();
}
@Override
public void stop() throws Exception {
FlowManager flowManager = getApplicationManager().getFlowManager(IncrementApp.IncrementFlow.NAME);
flowManager.stop();
flowManager.waitForStatus(false);
}
private ApplicationManager getApplicationManager() throws Exception {
return getApplicationManager(Id.Application.from(Id.Namespace.DEFAULT, IncrementApp.NAME));
}
@Override
public IncrementTestState getInitialState() {
return new IncrementTestState(0, 0);
}
@Override
public void awaitOperations(IncrementTestState state) throws Exception {
// just wait until a particular number of events are processed
Tasks.waitFor(state.getNumEvents(), new Callable<Long>() {
@Override
public Long call() throws Exception {
DatasetId regularTableId = new DatasetId(getLongRunningNamespace().getId(), IncrementApp.REGULAR_TABLE);
KeyValueTable regularTable = getKVTableDataset(regularTableId).get();
return readLong(regularTable.read(IncrementApp.NUM_KEY));
}
}, 5, TimeUnit.MINUTES, 10, TimeUnit.SECONDS);
}
@Override
public void verifyRuns(IncrementTestState state) throws Exception {
DatasetId readlessTableId = new DatasetId(getLongRunningNamespace().getId(), IncrementApp.READLESS_TABLE);
KeyValueTable readlessTable = getKVTableDataset(readlessTableId).get();
long readlessSum = readLong(readlessTable.read(IncrementApp.SUM_KEY));
long readlessNum = readLong(readlessTable.read(IncrementApp.NUM_KEY));
Assert.assertEquals(state.getSumEvents(), readlessSum);
Assert.assertEquals(state.getNumEvents(), readlessNum);
DatasetId regularTableId = new DatasetId(getLongRunningNamespace().getId(), IncrementApp.REGULAR_TABLE);
KeyValueTable regularTable = getKVTableDataset(regularTableId).get();
long regularSum = readLong(regularTable.read(IncrementApp.SUM_KEY));
long regularNum = readLong(regularTable.read(IncrementApp.NUM_KEY));
Assert.assertEquals(state.getSumEvents(), regularSum);
Assert.assertEquals(state.getNumEvents(), regularNum);
}
@Override
public IncrementTestState runOperations(IncrementTestState state) throws Exception {
StreamClient streamClient = getStreamClient();
LOG.info("Writing {} events in one batch", BATCH_SIZE);
StringWriter writer = new StringWriter();
for (int i = 0; i < BATCH_SIZE; i++) {
writer.write(String.format("%010d", i));
writer.write("\n");
}
streamClient.sendBatch(Id.Stream.from(getLongRunningNamespace(), IncrementApp.INT_STREAM), "text/plain",
ByteStreams.newInputStreamSupplier(writer.toString().getBytes(Charsets.UTF_8)));
long newSum = state.getSumEvents() + SUM_BATCH;
return new IncrementTestState(newSum, state.getNumEvents() + BATCH_SIZE);
}
private long readLong(byte[] bytes) {
return bytes == null ? 0 : Bytes.toLong(bytes);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment