Skip to content

Instantly share code, notes, and snippets.

@moznion
Last active August 29, 2015 14:27
Show Gist options
  • Save moznion/42df6e4505e3b4a2890c to your computer and use it in GitHub Desktop.
Save moznion/42df6e4505e3b4a2890c to your computer and use it in GitHub Desktop.
package net.moznion.jesque.worker;
import lombok.extern.slf4j.Slf4j;
import net.greghaines.jesque.Config;
import net.greghaines.jesque.ConfigBuilder;
import net.greghaines.jesque.Job;
import net.greghaines.jesque.client.Client;
import net.greghaines.jesque.client.ClientImpl;
import net.greghaines.jesque.worker.JobFactory;
import net.greghaines.jesque.worker.MapBasedJobFactory;
import net.greghaines.jesque.worker.WorkerImpl;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@Slf4j
public class WorkerPoolFunctionVerifier {
public static void main(String... args) {
final Config config = new ConfigBuilder().build();
// Add a job to the queue
final Job job = new Job("TestAction", Collections.emptyList());
final Client client = new ClientImpl(config);
client.enqueue("foo", job);
client.end();
// Start a worker to run jobs from the queue
final Map<String, Class<? extends Runnable>> actionMap = new HashMap<>();
actionMap.put("TestAction", TestAction.class);
// final WorkerPool workerPool = new WorkerPool(() ->
// new FailingWorker(config, Collections.singletonList("foo"), new MapBasedJobFactory(actionMap)),
// 10,
// Executors.defaultThreadFactory()
// );
final RobustWorkerPool workerPool = new RobustWorkerPool(() ->
new FailingWorker(config, Collections.singletonList("foo"), new MapBasedJobFactory(actionMap)),
10,
(event, worker, queue, errorJob, runner, result, t) -> {
log.debug("Worker raise error (Worker Name: {})", worker.getName());
worker.end(false);
});
workerPool.run();
}
public static class TestAction implements Runnable {
public void run() {
log.debug("Run");
}
}
public static class FailingWorker extends WorkerImpl {
public FailingWorker(Config config, Collection<String> queues, JobFactory jobFactory) {
super(config, queues, jobFactory);
}
@Override
public void poll() {
try {
Thread.sleep(new Random(System.currentTimeMillis() + Thread.currentThread().getId()).nextInt(3000));
} catch (InterruptedException e) {
}
log.debug("Fail");
// RecoveryStrategy: TERMINATE
// So it will fire `WORKER_STOP` event
this.recoverFromException(null, new RuntimeException());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment