Skip to content

Instantly share code, notes, and snippets.

@jpallari
Created August 12, 2015 10:24
Show Gist options
  • Save jpallari/de3fb1aa16cb461f5f7a to your computer and use it in GitHub Desktop.
Save jpallari/de3fb1aa16cb461f5f7a to your computer and use it in GitHub Desktop.
Shoddy Akka Kafka example
package striimaus
import java.util.Properties
import akka.actor._
import akka.persistence.{Persistence, PersistentActor, PersistentView}
import com.typesafe.config.{Config, ConfigFactory}
import kafka.consumer._
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import kafka.serializer.StringDecoder
import scala.concurrent.{ExecutionContext, Future}
sealed trait Rule {
def apply(source: String): Boolean
}
object Rule {
case class StartsWith(prefix: String) extends Rule {
override def apply(source: String): Boolean = source.startsWith(prefix)
}
case class HasLength(i: Int) extends Rule {
override def apply(source: String): Boolean = source.length == i
}
case class Contains(substring: String) extends Rule {
override def apply(source: String): Boolean = source.contains(substring)
}
}
object Striimaus {
val eventFilterPersistenceId = "event-filter-view"
val eventFilterViewId = "event-filter-view"
case class Event(data: String)
case class FilterRules(rules: Map[String, Rule])
case class TriggeredEvent(event: Event, rules: Set[String])
case class AddRule(id: String, rule: Rule)
case class RemoveRule(id: String)
}
object StriimManager {
def props(topics: Seq[String], consumerConf: Map[String, String], producerConf: Map[String, String])
(implicit execctx: ExecutionContext): Props = {
Props(new StriimManager(topics, consumerConf, producerConf)(execctx))
}
}
class StriimManager(topics: Seq[String], consumerConf: Map[String, String], producerConf: Map[String, String])
(implicit execctx: ExecutionContext)
extends Actor with ActorLogging {
val consumer = kafkaConsumer(consumerConf)
val producer = kafkaProducer[String, String](producerConf)
val eventFilterManager = context.system.actorOf(Props[EventFilterManager])
val eventFilters = {
val topicMap = topics.map(_ -> 1).toMap
val streams = consumer.createMessageStreams(topicMap, new StringDecoder, new StringDecoder).mapValues(_.head)
for ((topic, stream) <- streams) yield {
context.system.actorOf(EventFilter.props(topic, stream, producer))
}
}
override def receive: Receive = {
case m => eventFilterManager forward m
}
override def postStop(): Unit = {
consumer.shutdown()
producer.close()
}
private def toProperties(m: Map[String, String]) = {
val p = new Properties()
m foreach { case (k, v) => p.put(k, v) }
p
}
private def kafkaProducer[Key, Value](config: Map[String, String]) = {
new Producer[Key, Value](new ProducerConfig(toProperties(config)))
}
private def kafkaConsumer(config: Map[String, String]) = {
Consumer.create(new ConsumerConfig(toProperties(config)))
}
}
class EventFilterManager extends PersistentActor with ActorLogging {
import Striimaus._
private var rules = Map.empty[String, Rule]
override def persistenceId: String = eventFilterPersistenceId
override def receiveRecover: Receive = {
case fr: FilterRules => rules = fr.rules
}
override def receiveCommand: Receive = {
case fr: FilterRules =>
persist(fr) { filterRules =>
rules = filterRules.rules
}
case m: AddRule =>
val newRules = rules + (m.id -> m.rule)
persist(FilterRules(newRules)) { fr =>
rules = fr.rules
sender() ! 'ruleAdded
log.info("Added rule: {} -> {}", m.id, m.rule)
}
case m: RemoveRule =>
val newRules = rules - m.id
persist(FilterRules(newRules)) { nr =>
rules = nr.rules
sender() ! 'ruleRemoved
log.info("Removed rule: {}", m.id)
}
}
}
object EventFilter {
def props(topic: String, consumerStream: KafkaStream[String, String], producer: Producer[String, String])
(implicit execctx: ExecutionContext): Props = {
Props(new EventFilter(topic, consumerStream, producer)(execctx))
}
}
class EventFilter(topic: String, consumerStream: KafkaStream[String, String], producer: Producer[String, String])
(implicit execctx: ExecutionContext)
extends PersistentView with ActorLogging {
import Striimaus._
override def persistenceId: String = eventFilterPersistenceId
override def viewId: String = eventFilterViewId
override def receive: Receive = ready(FilterRules(Map.empty))
def ready(filterRules: FilterRules): Receive = {
case r: FilterRules =>
log.info("New rules for topic {}: {}", topic, r)
context.become(ready(r))
case e: Event =>
log.info("New event for topic {}: {}", topic, e)
val ruleIds = processEvent(filterRules.rules, e)
if (ruleIds.nonEmpty)
triggerRules(e, ruleIds)
}
private def processEvent(rules: Map[String, Rule], event: Event): Set[String] = {
rules.filter({ case (_, rule) => rule(event.data) }).keySet
}
private def triggerRules(event: Event, ruleIds: Set[String]): Unit = {
log.info("Triggered on event {}: {}", event, ruleIds)
val data = TriggeredEvent(event, ruleIds)
producer.send(new KeyedMessage[String, String]("trigger", data.toString))
}
def consumerProcess[Key, Value](stream: KafkaStream[Key, Value])
(callback: (Key, Value) => Unit)
(implicit execctx: ExecutionContext): Future[Unit] = Future {
for (m <- stream) {
callback(m.key(), m.message())
}
} recoverWith {
case _: InterruptedException =>
consumerProcess(stream)(callback)
}
consumerProcess(consumerStream) {
(_, value) => self ! Event(value)
}
}
object StriimSystem {
private val defaultConfig = ConfigFactory.parseString(
"""akka.persistence {
| journal.plugin = "akka.persistence.journal.leveldb"
| journal.leveldb.store {
| # DO NOT USE 'native = off' IN PRODUCTION !!!
| native = off
| dir = "target/journal"
| }
| snapshot-store.plugin = "akka.persistence.snapshot-store.local"
| snapshot-store.local.dir = "target/snapshots"
|}""".stripMargin
)
private val sampleRules = Map(
"foo" -> Rule.Contains("foo"),
"bar" -> Rule.StartsWith("bar")
)
def boot(topics: Seq[String],
consumerConf: Map[String, String],
producerConf: Map[String, String],
actorSystemConfig: Config)
(implicit execctx: ExecutionContext) = {
val actorSystem = ActorSystem("StriimSystem", actorSystemConfig)
Persistence(actorSystem)
val actorRef = actorSystem.actorOf(StriimManager.props(topics, consumerConf, producerConf))
StriimSystem(actorSystem, actorRef)
}
def main(args: Array[String]) = {
import scala.concurrent.ExecutionContext.Implicits.global
val system = boot(
topics = Seq("topic1", "topic2", "topic3"),
consumerConf = Map(
"zookeeper.connect" -> "192.168.59.103:2181",
"group.id" -> "mygroup",
"zookeeper.session.timeout.ms" -> "400",
"zookeeper.sync.time.ms" -> "200",
"auto.commit.interval.ms" -> "1000"
),
producerConf = Map(
"metadata.broker.list" -> "192.168.59.103:9092",
"serializer.class" -> "kafka.serializer.StringEncoder",
"request.required.acks" -> "1"
),
actorSystemConfig = defaultConfig
)
sampleRules foreach {
case (id, rule) => system.striimManager ! Striimaus.AddRule(id, rule)
}
}
}
case class StriimSystem(actorSystem: ActorSystem, striimManager: ActorRef)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment