Last active
November 1, 2016 13:24
-
-
Save efenderbosch/3eef9a434f666b62412a6b0e714b0d2e to your computer and use it in GitHub Desktop.
Scheduled Akka Actor to pipeline events to Redis
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example; | |
import akka.actor.ActorRef; | |
import akka.actor.Props; | |
import akka.actor.UntypedActor; | |
import play.Configuration; | |
import play.libs.Json; | |
import redis.clients.jedis.Jedis; | |
import redis.clients.jedis.JedisPool; | |
import redis.clients.jedis.Pipeline; | |
import scala.concurrent.duration.FiniteDuration; | |
import javax.inject.Inject; | |
import java.util.concurrent.ConcurrentLinkedDeque; | |
import java.util.concurrent.TimeUnit; | |
public class ScheduledActor extends UntypedActor { | |
public static Props props(JedisPool jedisPool) { | |
return Props.create(ScheduledActor.class, jedisPool); | |
} | |
private static final ConcurrentLinkedDeque<Object> EVENTS = new ConcurrentLinkedDeque<>(); | |
private final JedisPool jedisPool; | |
private final int batchSize; | |
private final String key; | |
@Inject | |
public ScheduledActor(JedisPool jedisPool, Configuration root) { | |
this.jedisPool = jedisPool; | |
Configuration config = root.getConfig("com.example.redis"); | |
if (config == null) { | |
config = Configuration.empty(); | |
} | |
this.batchSize = config.getInt("batchSize", 1000); | |
FiniteDuration initialDelay = FiniteDuration.apply(config.getLong("initialDelay", 1000L), TimeUnit.MILLISECONDS); | |
FiniteDuration interval = FiniteDuration.apply(config.getLong("interval", 1000L), TimeUnit.MILLISECONDS); | |
key = config.getString("key"); | |
// schedule myself | |
context().system().scheduler().schedule(initialDelay, interval, self(), new Tick(), context().dispatcher(), ActorRef.noSender()); | |
} | |
public static void enqueue(Object object) { | |
EVENTS.push(object); | |
} | |
@Override | |
public void onReceive(Object message) throws Throwable { | |
if (!(message instanceof Tick)) { | |
unhandled(message); | |
return; | |
} | |
Jedis jedis = jedisPool.getResource(); | |
try { | |
Pipeline pipeline = jedis.pipelined(); | |
int processed = 0; | |
Object event = EVENTS.pollFirst(); | |
while (processed < batchSize && event != null) { | |
String json = Json.mapper().writeValueAsString(event); | |
pipeline.lpush(key, json); | |
processed++; | |
event = EVENTS.pollFirst(); | |
} | |
if (processed != 0) { | |
pipeline.sync(); | |
} | |
} finally { | |
jedisPool.returnResource(jedis); | |
} | |
} | |
private class Tick { | |
// | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment