-
-
Save jibs/22d213c037f899c29446 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class DisruptorTest2 { | |
public static class PiJob { | |
public double result; | |
public int sliceNr; | |
public int numIter; | |
public int partionId; | |
public void calculatePi() { | |
double acc = 0.0; | |
for (int i = sliceNr * numIter; i <= ((sliceNr + 1) * numIter - 1); i++) { | |
acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1); | |
} | |
result = acc; | |
} | |
} | |
public static class PiEventFac implements EventFactory<PiJob> { | |
@Override | |
public PiJob newInstance() { | |
return new PiJob(); | |
} | |
} | |
public static class PiEventProcessor implements EventHandler<PiJob> { | |
private int partionId; | |
public PiEventProcessor(int partionId) { | |
this.partionId = partionId; | |
} | |
@Override | |
public void onEvent(PiJob event, long sequence, boolean isEndOfBatch) throws Exception { | |
if (partionId == event.partionId) { | |
event.calculatePi(); | |
} | |
} | |
} | |
public static class PiResultReclaimer implements EventHandler<PiJob> { | |
double result; | |
public long seq = 0; | |
private final int numSlice; | |
final CountDownLatch latch; | |
public PiResultReclaimer(int numSlice) | |
{ | |
this.numSlice = numSlice; | |
latch = new CountDownLatch(1); | |
} | |
@Override | |
public void onEvent(PiJob event, long sequence, boolean isEndOfBatch) throws Exception { | |
result += event.result; | |
++seq; | |
if (seq >= numSlice) { | |
latch.countDown(); | |
} | |
} | |
} | |
public long run(int numTH, int numSlice, int numIter) throws InterruptedException { | |
PiEventFac fac = new PiEventFac(); | |
ExecutorService executor = Executors.newCachedThreadPool(); | |
Disruptor<PiJob> disruptor = new Disruptor<>(fac,16384, executor, ProducerType.SINGLE, new SleepingWaitStrategy()); | |
PiEventProcessor procs[] = new PiEventProcessor[numTH]; | |
PiResultReclaimer res = new PiResultReclaimer(numSlice); | |
for (int i = 0; i < procs.length; i++) { | |
procs[i] = new PiEventProcessor(i); | |
} | |
disruptor.handleEventsWith(procs).then(res); | |
disruptor.start(); | |
final RingBuffer<PiJob> ringBuffer = disruptor.getRingBuffer(); | |
long tim = System.currentTimeMillis(); | |
int partionId = 0; | |
for (int i= 0; i < numSlice; i++ ) { | |
final long seq = ringBuffer.next(); | |
final PiJob piJob = ringBuffer.get(seq); | |
piJob.numIter = numIter; | |
piJob.sliceNr = i; | |
piJob.result = 0; | |
piJob.partionId = partionId; | |
ringBuffer.publish(seq); | |
partionId = (partionId == (numTH - 1)) ? 0 : partionId + 1; | |
} | |
res.latch.await(); | |
long timTest = System.currentTimeMillis() - tim; | |
System.out.println(numTH+": tim: "+ timTest +" Pi: "+res.result); | |
disruptor.shutdown(); | |
executor.shutdownNow(); | |
return timTest; | |
} | |
public static void main(String arg[] ) throws InterruptedException { | |
final DisruptorTest2 disruptorTest = new DisruptorTest2(); | |
int numSlice = 1000000; | |
int numIter = 100; | |
int NUM_CORE = 16; | |
String res[] = new String[NUM_CORE]; | |
for ( int i = 1; i <= NUM_CORE; i++ ) { | |
long sum = 0; | |
System.out.println("--------------------------"); | |
for ( int ii = 0; ii < 20; ii++ ) { | |
long t = disruptorTest.run(i, numSlice, numIter); | |
if ( ii >= 10 ) | |
sum += t; | |
} | |
res[i-1] = i+": "+(sum/10); | |
} | |
for (int i = 0; i < res.length; i++) { | |
String re = res[i]; | |
System.out.println(re); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment