Last active
January 13, 2021 18:15
-
-
Save gresrun/c5a2479c3f6287ccb799 to your computer and use it in GitHub Desktop.
Simple Micro-benchmark to Test Jesque Job Throughput
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.greghaines.jesque.perftest; | |
import static net.greghaines.jesque.utils.JesqueUtils.entry; | |
import static net.greghaines.jesque.utils.JesqueUtils.map; | |
import static net.greghaines.jesque.worker.WorkerEvent.JOB_SUCCESS; | |
import static net.greghaines.jesque.worker.WorkerEvent.WORKER_START; | |
import java.util.Arrays; | |
import java.util.List; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.atomic.AtomicLong; | |
import net.greghaines.jesque.Config; | |
import net.greghaines.jesque.ConfigBuilder; | |
import net.greghaines.jesque.Job; | |
import net.greghaines.jesque.client.Client; | |
import net.greghaines.jesque.client.ClientImpl; | |
import net.greghaines.jesque.worker.MapBasedJobFactory; | |
import net.greghaines.jesque.worker.Worker; | |
import net.greghaines.jesque.worker.WorkerEvent; | |
import net.greghaines.jesque.worker.WorkerImpl; | |
import net.greghaines.jesque.worker.WorkerListener; | |
import net.greghaines.jesque.worker.WorkerPool; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
public class MultiPerfTest { | |
private static final Logger LOG = LoggerFactory.getLogger(MultiPerfTest.class); | |
private static final Config CONFIG = new ConfigBuilder().build(); | |
private static final String TEST_QUEUE = "foo"; | |
private static final int JOB_COUNT = 100000; | |
private static final int NUM_ENQUEUERS = 1; | |
private static final int NUM_WORKERS = 8; | |
public static void main(final String... args) { | |
LOG.info("Starting test..."); | |
final WorkerPool workerPool = new WorkerPool(new Callable<Worker>() { | |
@Override | |
public Worker call() { | |
return new WorkerImpl(CONFIG, Arrays.asList(TEST_QUEUE), | |
new MapBasedJobFactory(map(entry("TestAction", TestAction.class)))); | |
} | |
}, NUM_WORKERS); | |
workerPool.getWorkerEventEmitter().addListener(new TimerWorkerListener(workerPool), | |
WORKER_START, JOB_SUCCESS); | |
new Enqueuer().run(); | |
LOG.info("Enqueue complete!"); | |
workerPool.run(); | |
try { | |
workerPool.join(JOB_COUNT * 100); | |
} catch (Exception e) { | |
LOG.warn("Exception while waiting for workerPool to join", e); | |
} | |
LOG.info("Test complete!"); | |
} | |
private static class Enqueuer implements Runnable { | |
@Override | |
public void run() { | |
final Client client = new ClientImpl(CONFIG); | |
try { | |
final Job job = new Job("TestAction", | |
new Object[] { 1, 2.3, true, "test", Arrays.asList("inner", 4.5) }); | |
for (int i = 0; i < JOB_COUNT; i++) { | |
client.enqueue(TEST_QUEUE, job); | |
} | |
} finally { | |
client.end(); | |
} | |
} | |
} | |
private static class TimerWorkerListener implements WorkerListener { | |
private final AtomicLong startTime = new AtomicLong(0L); | |
private final AtomicLong stopTime = new AtomicLong(0L); | |
private final AtomicInteger countdown = new AtomicInteger(JOB_COUNT * NUM_ENQUEUERS); | |
private final WorkerPool workerPool; | |
public TimerWorkerListener(final WorkerPool workerPool) { | |
this.workerPool = workerPool; | |
} | |
@Override | |
public void onEvent(final WorkerEvent event, final Worker worker, final String queue, final Job job, | |
final Object runner, final Object result, final Throwable ex) { | |
if (event == WORKER_START && this.startTime.compareAndSet(0L, System.currentTimeMillis())) { | |
LOG.info("Started the clock..."); | |
} | |
if (event == JOB_SUCCESS && this.countdown.decrementAndGet() <= 0 | |
&& this.stopTime.compareAndSet(0L, System.currentTimeMillis())) { | |
final long timeDiff = this.stopTime.get() - this.startTime.get(); | |
LOG.info("Completed {} jobs in {}ms - Avg. {} jobs/sec", | |
new Object[] { JOB_COUNT, timeDiff, (JOB_COUNT * 1000.0 / timeDiff) }); | |
this.workerPool.end(false); | |
} | |
} | |
} | |
public static class TestAction implements Runnable { | |
private final Integer i; | |
private final Double d; | |
private final Boolean b; | |
private final String s; | |
private final List<Object> l; | |
public TestAction(final Integer i, final Double d, final Boolean b, final String s, final List<Object> l) { | |
this.i = i; | |
this.d = d; | |
this.b = b; | |
this.s = s; | |
this.l = l; | |
} | |
@Override | |
public void run() { | |
LOG.debug("TestAction.run() {} {} {} {} {}", new Object[] { this.i, this.d, this.b, this.s, this.l }); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.greghaines.jesque.perftest; | |
import static net.greghaines.jesque.utils.JesqueUtils.entry; | |
import static net.greghaines.jesque.utils.JesqueUtils.map; | |
import static net.greghaines.jesque.worker.WorkerEvent.JOB_SUCCESS; | |
import static net.greghaines.jesque.worker.WorkerEvent.WORKER_START; | |
import java.util.Arrays; | |
import java.util.List; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.atomic.AtomicLong; | |
import net.greghaines.jesque.Config; | |
import net.greghaines.jesque.ConfigBuilder; | |
import net.greghaines.jesque.Job; | |
import net.greghaines.jesque.client.Client; | |
import net.greghaines.jesque.client.ClientImpl; | |
import net.greghaines.jesque.worker.MapBasedJobFactory; | |
import net.greghaines.jesque.worker.Worker; | |
import net.greghaines.jesque.worker.WorkerEvent; | |
import net.greghaines.jesque.worker.WorkerImpl; | |
import net.greghaines.jesque.worker.WorkerListener; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
public class PerfTest { | |
private static final Logger LOG = LoggerFactory.getLogger(PerfTest.class); | |
private static final Config CONFIG = new ConfigBuilder().build(); | |
private static final String TEST_QUEUE = "foo"; | |
private static final int JOB_COUNT = 100000; | |
private static final int NUM_ENQUEUERS = 1; | |
public static void main(final String... args) { | |
LOG.info("Starting test..."); | |
final Worker worker = new WorkerImpl(CONFIG, Arrays.asList(TEST_QUEUE), | |
new MapBasedJobFactory(map(entry("TestAction", TestAction.class)))); | |
final TimerWorkerListener timer = new TimerWorkerListener(); | |
worker.getWorkerEventEmitter().addListener(timer, WORKER_START, JOB_SUCCESS); | |
final Thread workerThread = new Thread(worker); | |
new Enqueuer().run(); | |
LOG.info("Enqueue complete!"); | |
workerThread.start(); | |
try { | |
workerThread.join(); | |
} catch (Exception e) { | |
LOG.warn("Exception while waiting for workerThread to join", e); | |
} | |
LOG.info("Test complete!"); | |
} | |
private static class Enqueuer implements Runnable { | |
@Override | |
public void run() { | |
final Client client = new ClientImpl(CONFIG); | |
try { | |
final Job job = new Job("TestAction", | |
new Object[] { 1, 2.3, true, "test", Arrays.asList("inner", 4.5) }); | |
for (int i = 0; i < JOB_COUNT; i++) { | |
client.enqueue(TEST_QUEUE, job); | |
} | |
} finally { | |
client.end(); | |
} | |
} | |
} | |
private static class TimerWorkerListener implements WorkerListener { | |
private final AtomicLong startTime = new AtomicLong(0L); | |
private final AtomicLong stopTime = new AtomicLong(0L); | |
private final AtomicInteger countdown = new AtomicInteger(JOB_COUNT * NUM_ENQUEUERS); | |
@Override | |
public void onEvent(final WorkerEvent event, final Worker worker, final String queue, final Job job, | |
final Object runner, final Object result, final Throwable ex) { | |
if (event == WORKER_START && this.startTime.compareAndSet(0L, System.currentTimeMillis())) { | |
LOG.info("Started the clock..."); | |
} | |
if (event == JOB_SUCCESS && this.countdown.decrementAndGet() <= 0 | |
&& this.stopTime.compareAndSet(0L, System.currentTimeMillis())) { | |
final long timeDiff = this.stopTime.get() - this.startTime.get(); | |
LOG.info("Completed {} jobs in {}ms - Avg. {} jobs/sec", | |
new Object[] { JOB_COUNT, timeDiff, (JOB_COUNT * 1000.0 / timeDiff) }); | |
worker.end(false); | |
} | |
} | |
} | |
public static class TestAction implements Runnable { | |
private final Integer i; | |
private final Double d; | |
private final Boolean b; | |
private final String s; | |
private final List<Object> l; | |
public TestAction(final Integer i, final Double d, final Boolean b, final String s, final List<Object> l) { | |
this.i = i; | |
this.d = d; | |
this.b = b; | |
this.s = s; | |
this.l = l; | |
} | |
@Override | |
public void run() { | |
LOG.debug("TestAction.run() {} {} {} {} {}", new Object[] { this.i, this.d, this.b, this.s, this.l }); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment