Last active
October 6, 2017 12:16
-
-
Save bonifaido/5117342 to your computer and use it in GitHub Desktop.
Playing with Java Exchangers.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package me.nandork.exchangertest; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.Exchanger; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.TimeoutException; | |
/** | |
* | |
* @author Nandor Kracser | |
*/ | |
class ExchangerTest { | |
private static final long ITERATIONS = 1000L * 1000L * 100L; | |
private static final int BUFFER_SIZE = 100; | |
static class ExchangeProducer implements Runnable { | |
private final Exchanger<List<String>>[] exchangers; | |
private long timeouts = 0; | |
private int nextExchanger = 0; | |
public ExchangeProducer(ExchangeConsumer... exchangeConsumers) { | |
this.exchangers = new Exchanger[exchangeConsumers.length]; | |
for (int i = 0; i < exchangeConsumers.length; i++) { | |
exchangers[i] = exchangeConsumers[i].getExchanger(); | |
} | |
} | |
private Exchanger<List<String>> nextExchanger() { | |
Exchanger<List<String>> next = exchangers[nextExchanger++]; | |
if (nextExchanger == exchangers.length) { | |
nextExchanger = 0; | |
} | |
return next; | |
} | |
@Override | |
public void run() { | |
List<String> currentBuffer = new ArrayList<String>(BUFFER_SIZE); | |
try { | |
for (long i = 0; i < ITERATIONS; i++) { | |
currentBuffer.add(produce()); | |
if (currentBuffer.size() == BUFFER_SIZE) { | |
// try to exchange data | |
while (true) { | |
try { | |
currentBuffer = nextExchanger().exchange(currentBuffer, 10, TimeUnit.MICROSECONDS); | |
break; | |
} catch (TimeoutException ex) { | |
timeouts++; | |
} | |
} | |
} | |
} | |
// shutdown consumers | |
for (Exchanger exchanger : exchangers) { | |
exchanger.exchange(null); | |
} | |
} catch (InterruptedException ex) { | |
} | |
} | |
String produce() { | |
return "Message"; | |
} | |
public long getTimeouts() { | |
return timeouts; | |
} | |
} | |
static class ExchangeConsumer implements Runnable { | |
private long messageCount = 0; | |
private Exchanger<List<String>> exchanger = new Exchanger<List<String>>(); | |
@Override | |
public void run() { | |
List<String> currentBuffer = new ArrayList<String>(BUFFER_SIZE); | |
try { | |
while (currentBuffer != null) { | |
for (int i = currentBuffer.size() - 1; i >= 0; i--) { | |
String msg = currentBuffer.get(i); | |
consume(msg); | |
} | |
currentBuffer.clear(); | |
currentBuffer = exchanger.exchange(currentBuffer); | |
} | |
} catch (InterruptedException ex) { | |
} | |
} | |
void consume(String msg) { | |
messageCount++; | |
} | |
public Exchanger<List<String>> getExchanger() { | |
return exchanger; | |
} | |
public long getMessageCount() { | |
return messageCount; | |
} | |
} | |
static void exchangerTest() throws InterruptedException { | |
ExecutorService threadPool = Executors.newFixedThreadPool(3); | |
ExchangeConsumer exchangeConsumer = new ExchangeConsumer(); | |
ExchangeConsumer exchangeConsumer1 = new ExchangeConsumer(); | |
ExchangeProducer exchangeProducer = new ExchangeProducer(exchangeConsumer, exchangeConsumer1); | |
long start = System.currentTimeMillis(); | |
threadPool.execute(exchangeProducer); | |
threadPool.execute(exchangeConsumer); | |
threadPool.execute(exchangeConsumer1); | |
threadPool.shutdown(); | |
threadPool.awaitTermination(1, TimeUnit.DAYS); | |
long end = System.currentTimeMillis(); | |
long time = end - start; | |
System.out.println("Timeouts: " + exchangeProducer.getTimeouts()); | |
System.out.println("Exchanger Delivered messages: " + exchangeConsumer.getMessageCount() + " time: " + time + " ms"); | |
System.out.println("Exchanger Delivered messages: " + exchangeConsumer1.getMessageCount() + " time: " + time + " ms"); | |
System.out.println("This is ~" + (long) (exchangeConsumer.getMessageCount() / (time / 1000f)) + " messages/s"); | |
} | |
public static void main(String[] args) throws InterruptedException { | |
exchangerTest(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment