Skip to content

Instantly share code, notes, and snippets.

@RuedigerMoeller
Last active January 2, 2016 07:59
Show Gist options
  • Save RuedigerMoeller/8273494 to your computer and use it in GitHub Desktop.
Save RuedigerMoeller/8273494 to your computer and use it in GitHub Desktop.
Abstraktor version of Akka Pi Sample
public class ActorPiSample {
public static class PiActor extends Actor {
public void calculatePiFor(int start, int nrOfElements, ChannelActor result ) {
double acc = 0.0;
for (int i = start * nrOfElements; i <= ((start + 1) * nrOfElements - 1); i++) {
acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1);
}
result.receiveResult(acc);
}
}
public static class PiStriper extends Actor {
PiActor actors[];
public void run(int numActors, int iterationSize, int numJobs, final ChannelActor resultListener ) {
final long tim = System.currentTimeMillis();
actors = new PiActor[numActors];
for (int i = 0; i < actors.length; i++) {
actors[i] = SpawnActor(PiActor.class);
}
final int iterPerAct = numJobs / numActors;
final int iterSum = iterPerAct * actors.length;
final ChannelActor endResult = Actors.QueuedChannel(new ChannelReceiver<Double>() {
double sum = 0;
int count = 0;
@Override
public void receiveResult(Double result) {
count++;
sum += result;
if (count == iterSum) {
resultListener.receiveResult(sum);
done();
PiStriper.this.getDispatcher().shutDown();
for (int i = 0; i < actors.length; i++) {
PiActor actor = actors[i];
actor.getDispatcher().shutDown();
}
}
}
});
int iteri = 0;
for (int i = 0; i < actors.length; i++) {
for ( int ii = 0; ii < iterPerAct; ii++ ) {
actors[iteri%actors.length].calculatePiFor(iteri, iterationSize, endResult /*subRes*/);
iteri++;
}
}
System.out.println("POK iteri " + iteri);
}
}
static long calcPi(final int numMessages, int step, final int numActors) throws InterruptedException {
final long tim = System.currentTimeMillis();
final CountDownLatch latch = new CountDownLatch(1); // to be able to wait for finish
final AtomicLong time = new AtomicLong(0);
ChannelActor resultReceiver = Actors.Channel(
new ChannelReceiver<Double>() {
public void receiveResult(Double pi) {
long l = System.currentTimeMillis() - tim;
System.out.println("T = " + numActors + " pi: " + pi + " " + l + " disp:" + de.ruedigermoeller.abstraktor.impl.Dispatcher.instanceCount.get());
time.set(l);
done();
latch.countDown();
}
});
PiStriper piStriper = Actors.AsActor(PiStriper.class);
piStriper.run(numActors,step, numMessages, resultReceiver );
// wait until done
latch.await();
return time.get();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment