Skip to content

Instantly share code, notes, and snippets.

@ngerakines
Created July 20, 2012 06:11
Show Gist options
  • Select an option

  • Save ngerakines/3148990 to your computer and use it in GitHub Desktop.

Select an option

Save ngerakines/3148990 to your computer and use it in GitHub Desktop.
package com.socklabs;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MultiThreaded {
public static final int RING_SIZE = 1024 * 16;
public static final int COUNT = 5000000;
public static final int THREAD_COUNT = 4;
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
Disruptor<ValueEvent> disruptor =
new Disruptor<ValueEvent>(ValueEvent.EVENT_FACTORY, executor,
new SingleThreadedClaimStrategy(RING_SIZE),
new SleepingWaitStrategy());
disruptor.handleEventsWith(createEvents(THREAD_COUNT));
RingBuffer<ValueEvent> ringBuffer = disruptor.start();
System.out.println("start " + System.currentTimeMillis());
for (long i = 0; i < COUNT; i++) {
long sequence = ringBuffer.next();
ValueEvent event = ringBuffer.get(sequence);
event.setValue(i);
ringBuffer.publish(sequence);
}
}
public static final EventHandler<ValueEvent>[] createEvents(final int count) {
EventHandler<ValueEvent>[] events = new EventHandler[count];
for (int i = 0; i < count; i++) {
System.out.println("Setting " + i);
events[i] = new OrdHandler(i);
}
System.out.println("handlers: " + events.length);
return events;
}
public static class ValueEvent {
private long value;
public long getValue() {
return value;
}
public void setValue(final long value) {
this.value = value;
}
@Override
public String toString() {
return "ValueEvent{" +
"value=" + value +
'}';
}
public final static EventFactory<ValueEvent> EVENT_FACTORY = new EventFactory<ValueEvent>() {
public ValueEvent newInstance() {
return new ValueEvent();
}
};
}
}
package com.socklabs;
import com.lmax.disruptor.EventHandler;
public class OrdHandler implements EventHandler<MultiThreaded.ValueEvent> {
private final long ordinal;
private int count = 1;
public OrdHandler(long ordinal) {
System.out.println("Ord set as " + ordinal);
this.ordinal = ordinal;
}
@Override
public void onEvent(MultiThreaded.ValueEvent event, long sequence, boolean endOfBatch) throws Exception {
if ((sequence % count) == ordinal) {
count++;
}
if (endOfBatch) {
System.out.println("Batch " + ordinal + " size " + count + " / " + event.toString() + " / " + System.currentTimeMillis());
// count = 1;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment