Created
April 14, 2016 04:55
-
-
Save CuriousVini/e0bb81b3b9773fb5ea94a8a28733f5cc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class IncrementTest extends LongRunningTestBase<IncrementTestState> { | |
private static final int BATCH_SIZE = 100; | |
public static final int SUM_BATCH = (BATCH_SIZE * (BATCH_SIZE - 1)) / 2; | |
@Override | |
public void deploy() throws Exception { | |
deployApplication(getLongRunningNamespace(), IncrementApp.class); | |
} | |
@Override | |
public void start() throws Exception { | |
getApplicationManager().getFlowManager(IncrementApp.IncrementFlow.NAME).start(); | |
} | |
@Override | |
public void stop() throws Exception { | |
FlowManager flowManager = getApplicationManager().getFlowManager(IncrementApp.IncrementFlow.NAME); | |
flowManager.stop(); | |
flowManager.waitForStatus(false); | |
} | |
private ApplicationManager getApplicationManager() throws Exception { | |
return getApplicationManager(Id.Application.from(Id.Namespace.DEFAULT, IncrementApp.NAME)); | |
} | |
@Override | |
public IncrementTestState getInitialState() { | |
return new IncrementTestState(0, 0); | |
} | |
@Override | |
public void awaitOperations(IncrementTestState state) throws Exception { | |
// just wait until a particular number of events are processed | |
Tasks.waitFor(state.getNumEvents(), new Callable<Long>() { | |
@Override | |
public Long call() throws Exception { | |
DatasetId regularTableId = new DatasetId(getLongRunningNamespace().getId(), IncrementApp.REGULAR_TABLE); | |
KeyValueTable regularTable = getKVTableDataset(regularTableId).get(); | |
return readLong(regularTable.read(IncrementApp.NUM_KEY)); | |
} | |
}, 5, TimeUnit.MINUTES, 10, TimeUnit.SECONDS); | |
} | |
@Override | |
public void verifyRuns(IncrementTestState state) throws Exception { | |
DatasetId readlessTableId = new DatasetId(getLongRunningNamespace().getId(), IncrementApp.READLESS_TABLE); | |
KeyValueTable readlessTable = getKVTableDataset(readlessTableId).get(); | |
long readlessSum = readLong(readlessTable.read(IncrementApp.SUM_KEY)); | |
long readlessNum = readLong(readlessTable.read(IncrementApp.NUM_KEY)); | |
Assert.assertEquals(state.getSumEvents(), readlessSum); | |
Assert.assertEquals(state.getNumEvents(), readlessNum); | |
DatasetId regularTableId = new DatasetId(getLongRunningNamespace().getId(), IncrementApp.REGULAR_TABLE); | |
KeyValueTable regularTable = getKVTableDataset(regularTableId).get(); | |
long regularSum = readLong(regularTable.read(IncrementApp.SUM_KEY)); | |
long regularNum = readLong(regularTable.read(IncrementApp.NUM_KEY)); | |
Assert.assertEquals(state.getSumEvents(), regularSum); | |
Assert.assertEquals(state.getNumEvents(), regularNum); | |
} | |
@Override | |
public IncrementTestState runOperations(IncrementTestState state) throws Exception { | |
StreamClient streamClient = getStreamClient(); | |
LOG.info("Writing {} events in one batch", BATCH_SIZE); | |
StringWriter writer = new StringWriter(); | |
for (int i = 0; i < BATCH_SIZE; i++) { | |
writer.write(String.format("%010d", i)); | |
writer.write("\n"); | |
} | |
streamClient.sendBatch(Id.Stream.from(getLongRunningNamespace(), IncrementApp.INT_STREAM), "text/plain", | |
ByteStreams.newInputStreamSupplier(writer.toString().getBytes(Charsets.UTF_8))); | |
long newSum = state.getSumEvents() + SUM_BATCH; | |
return new IncrementTestState(newSum, state.getNumEvents() + BATCH_SIZE); | |
} | |
private long readLong(byte[] bytes) { | |
return bytes == null ? 0 : Bytes.toLong(bytes); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment