-
-
Save ppurang/3832643 to your computer and use it in GitHub Desktop.
Akka 2.0 actors with Kafka backed durable mailboxes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.Actor | |
import akka.actor.ActorSystem | |
import akka.agent.Agent | |
import com.typesafe.config.ConfigFactory | |
import akka.event.Logging | |
import akka.actor.Props | |
import kafka.utils.Utils | |
import java.nio.ByteBuffer | |
object AkkaKafkaMailboxTest extends App { | |
implicit val system = ActorSystem("KafkaAkkaSystem") | |
val kafka = system.actorOf(Props[KafkaActor].withDispatcher("kafka-dispatcher"), name = "TestActor") | |
var loop = Agent(true); | |
sys.addShutdownHook({ | |
loop send false | |
system.stop(kafka) | |
println("Shutting down...") | |
}) | |
while(loop()) { | |
kafka ! ("Testing 1 2 3 @ " + System.currentTimeMillis() / 1000) | |
Thread.sleep(1000) | |
} | |
} | |
class KafkaActor extends Actor { | |
val log = Logging(context.system, this) | |
def receive = { | |
case msg:Array[Byte] => { | |
println("Got msg: " + new String(msg, "UTF-8")) | |
} | |
case msg => { | |
println("Got unknown type msg: " + msg) | |
} | |
} | |
override def postStop() = { | |
log.info("Shutting down Kafka actor...") | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kafka-dispatcher { | |
mailbox-type = "akka.actor.mailbox.KafkaBasedMailboxType" | |
throughput = 1 | |
} | |
akka { | |
actor { | |
mailbox { | |
kafka { | |
zkCluster = "localhost:2181" | |
} | |
} | |
} | |
loglevel = INFO | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akka.actor.mailbox | |
import akka.AkkaException | |
import akka.actor.ActorContext | |
import akka.dispatch.Envelope | |
import akka.event.Logging | |
import akka.actor.ActorRef | |
import akka.dispatch.MailboxType | |
import com.typesafe.config.Config | |
import akka.util.NonFatal | |
import akka.config.ConfigurationException | |
import akka.dispatch.MessageQueue | |
import akka.actor.ActorSystem | |
import akka.actor.mailbox.DurableMessageSerialization | |
import akka.actor.mailbox.DurableMessageQueue | |
import kafka.consumer.ConsumerConnector | |
import java.util.Properties | |
import kafka.message.ByteBufferMessageSet | |
import kafka.message.Message | |
import kafka.message.NoCompressionCodec | |
import kafka.consumer.ConsumerConfig | |
import kafka.consumer.Consumer._ | |
import kafka.producer.ProducerConfig | |
import kafka.producer.Producer | |
import kafka.producer.ProducerData | |
import kafka.utils.Utils | |
import kafka.consumer.ConsumerConnector | |
import akka.remote.MessageSerializer | |
import akka.serialization.SerializationExtension | |
class KafkaBasedMailboxException(message: String) extends AkkaException(message) | |
class KafkaBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType { | |
private val settings = new KafkaBasedMailboxSettings(systemSettings, config) | |
override def create(owner: Option[ActorContext]): MessageQueue = owner match { | |
case Some(o) ⇒ new KafkaBasedMessageQueue(o, settings) | |
case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") | |
} | |
} | |
class KafkaBasedMessageQueue(_owner: ActorContext, val settings: KafkaBasedMailboxSettings) extends DurableMessageQueue(_owner) with DurableMessageSerialization { | |
private var consumer = createConsumer() | |
private var producer = createProducer() | |
private var consumerIter = consumer.get.iterator | |
private var consumerConnector:ConsumerConnector = _ | |
val log = Logging(system, "KafkaBasedMessageQueue") | |
def enqueue(receiver: ActorRef, envelope: Envelope) { | |
withErrorHandling { | |
if (producer != None) { | |
try{ | |
val msg = serialize(envelope) | |
producer.get.send(new ProducerData(name,new Message(msg))) | |
}catch{ | |
case e: Exception => { log.warning("Unable to publish from producer.")} | |
} | |
} | |
} | |
} | |
def dequeue(): Envelope = withErrorHandling { | |
try { | |
try { | |
val item = consumerIter.next() | |
val bytes = new Array[Byte](item.payload.capacity()) | |
item.payload.get(bytes) | |
consumerConnector.commitOffsets | |
deserialize(bytes) | |
} catch { | |
case _ => throw new NoSuchElementException("Could not get element from Kafka.") | |
} | |
} catch { | |
case e: java.util.NoSuchElementException ⇒ null | |
case NonFatal(e) ⇒ | |
log.error(e, "Couldn't dequeue from Kafka-based mailbox") | |
throw e | |
} | |
} | |
def numberOfMessages: Int = withErrorHandling { | |
try { | |
consumerIter.hasNext() match { | |
case true => 1 | |
case _ => 0 | |
} | |
} catch { | |
case _ => 0 | |
} | |
} | |
def hasMessages: Boolean = numberOfMessages > 0 | |
def getConsumerConnector(groupId:String) : ConsumerConnector = { | |
val consumerProps = new Properties | |
consumerProps.put("zk.connect", settings.zkCluster) | |
consumerProps.put("zk.connectiontimeout.ms", "1000000") | |
consumerProps.put("groupid", groupId) | |
val consumerConfig = new ConsumerConfig(consumerProps) | |
create(consumerConfig) | |
} | |
def createProducer() = { | |
val producerProps = new Properties() | |
producerProps.put("zk.connect", settings.zkCluster) | |
val producerConfig = new ProducerConfig(producerProps) | |
val producer = try { | |
Some(new Producer[String, Message](producerConfig)) | |
} catch { | |
case e:Exception => None | |
} | |
producer | |
} | |
def createConsumer() = { | |
val topicThreadCount = Map((name, 1)) | |
consumerConnector = getConsumerConnector(name) | |
val topicMessageStreams = consumerConnector.createMessageStreams(topicThreadCount) | |
val streams = topicMessageStreams.get(name) | |
streams match { | |
case Some(List(stream)) => Some(stream) | |
case _ => { | |
log.error("Did not get a valid stream from topic " + name) | |
None | |
} | |
} | |
} | |
private def withErrorHandling[T](body: ⇒ T): T = { | |
try { | |
body | |
} catch { | |
case e: Exception ⇒ { | |
consumer = createConsumer() | |
producer = createProducer() | |
body | |
} | |
case NonFatal(e) ⇒ | |
val error = new KafkaBasedMailboxException("Could not connect to Kafka, due to: " + e.getMessage) | |
log.error(error, error.getMessage) | |
throw error | |
} | |
} | |
def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = { | |
consumerConnector.commitOffsets | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akka.actor.mailbox | |
import com.typesafe.config.Config | |
import akka.actor.ActorSystem | |
import akka.actor.mailbox.DurableMailboxSettings | |
class KafkaBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val userConfig: Config) | |
extends DurableMailboxSettings { | |
def name = "kafka" | |
val config = initialize | |
import config._ | |
val zkCluster = getString("zkCluster") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment