Skip to content

Instantly share code, notes, and snippets.

@efenderbosch
Last active November 1, 2016 13:24
Show Gist options
  • Save efenderbosch/3eef9a434f666b62412a6b0e714b0d2e to your computer and use it in GitHub Desktop.
Save efenderbosch/3eef9a434f666b62412a6b0e714b0d2e to your computer and use it in GitHub Desktop.
Scheduled Akka Actor to pipeline events to Redis
package com.example;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import play.Configuration;
import play.libs.Json;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Pipeline;
import scala.concurrent.duration.FiniteDuration;
import javax.inject.Inject;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
public class ScheduledActor extends UntypedActor {
public static Props props(JedisPool jedisPool) {
return Props.create(ScheduledActor.class, jedisPool);
}
private static final ConcurrentLinkedDeque<Object> EVENTS = new ConcurrentLinkedDeque<>();
private final JedisPool jedisPool;
private final int batchSize;
private final String key;
@Inject
public ScheduledActor(JedisPool jedisPool, Configuration root) {
this.jedisPool = jedisPool;
Configuration config = root.getConfig("com.example.redis");
if (config == null) {
config = Configuration.empty();
}
this.batchSize = config.getInt("batchSize", 1000);
FiniteDuration initialDelay = FiniteDuration.apply(config.getLong("initialDelay", 1000L), TimeUnit.MILLISECONDS);
FiniteDuration interval = FiniteDuration.apply(config.getLong("interval", 1000L), TimeUnit.MILLISECONDS);
key = config.getString("key");
// schedule myself
context().system().scheduler().schedule(initialDelay, interval, self(), new Tick(), context().dispatcher(), ActorRef.noSender());
}
public static void enqueue(Object object) {
EVENTS.push(object);
}
@Override
public void onReceive(Object message) throws Throwable {
if (!(message instanceof Tick)) {
unhandled(message);
return;
}
Jedis jedis = jedisPool.getResource();
try {
Pipeline pipeline = jedis.pipelined();
int processed = 0;
Object event = EVENTS.pollFirst();
while (processed < batchSize && event != null) {
String json = Json.mapper().writeValueAsString(event);
pipeline.lpush(key, json);
processed++;
event = EVENTS.pollFirst();
}
if (processed != 0) {
pipeline.sync();
}
} finally {
jedisPool.returnResource(jedis);
}
}
private class Tick {
//
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment