Created
August 12, 2015 10:24
-
-
Save jpallari/de3fb1aa16cb461f5f7a to your computer and use it in GitHub Desktop.
Shoddy Akka Kafka example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package striimaus | |
import java.util.Properties | |
import akka.actor._ | |
import akka.persistence.{Persistence, PersistentActor, PersistentView} | |
import com.typesafe.config.{Config, ConfigFactory} | |
import kafka.consumer._ | |
import kafka.producer.{KeyedMessage, Producer, ProducerConfig} | |
import kafka.serializer.StringDecoder | |
import scala.concurrent.{ExecutionContext, Future} | |
sealed trait Rule { | |
def apply(source: String): Boolean | |
} | |
object Rule { | |
case class StartsWith(prefix: String) extends Rule { | |
override def apply(source: String): Boolean = source.startsWith(prefix) | |
} | |
case class HasLength(i: Int) extends Rule { | |
override def apply(source: String): Boolean = source.length == i | |
} | |
case class Contains(substring: String) extends Rule { | |
override def apply(source: String): Boolean = source.contains(substring) | |
} | |
} | |
object Striimaus { | |
val eventFilterPersistenceId = "event-filter-view" | |
val eventFilterViewId = "event-filter-view" | |
case class Event(data: String) | |
case class FilterRules(rules: Map[String, Rule]) | |
case class TriggeredEvent(event: Event, rules: Set[String]) | |
case class AddRule(id: String, rule: Rule) | |
case class RemoveRule(id: String) | |
} | |
object StriimManager { | |
def props(topics: Seq[String], consumerConf: Map[String, String], producerConf: Map[String, String]) | |
(implicit execctx: ExecutionContext): Props = { | |
Props(new StriimManager(topics, consumerConf, producerConf)(execctx)) | |
} | |
} | |
class StriimManager(topics: Seq[String], consumerConf: Map[String, String], producerConf: Map[String, String]) | |
(implicit execctx: ExecutionContext) | |
extends Actor with ActorLogging { | |
val consumer = kafkaConsumer(consumerConf) | |
val producer = kafkaProducer[String, String](producerConf) | |
val eventFilterManager = context.system.actorOf(Props[EventFilterManager]) | |
val eventFilters = { | |
val topicMap = topics.map(_ -> 1).toMap | |
val streams = consumer.createMessageStreams(topicMap, new StringDecoder, new StringDecoder).mapValues(_.head) | |
for ((topic, stream) <- streams) yield { | |
context.system.actorOf(EventFilter.props(topic, stream, producer)) | |
} | |
} | |
override def receive: Receive = { | |
case m => eventFilterManager forward m | |
} | |
override def postStop(): Unit = { | |
consumer.shutdown() | |
producer.close() | |
} | |
private def toProperties(m: Map[String, String]) = { | |
val p = new Properties() | |
m foreach { case (k, v) => p.put(k, v) } | |
p | |
} | |
private def kafkaProducer[Key, Value](config: Map[String, String]) = { | |
new Producer[Key, Value](new ProducerConfig(toProperties(config))) | |
} | |
private def kafkaConsumer(config: Map[String, String]) = { | |
Consumer.create(new ConsumerConfig(toProperties(config))) | |
} | |
} | |
class EventFilterManager extends PersistentActor with ActorLogging { | |
import Striimaus._ | |
private var rules = Map.empty[String, Rule] | |
override def persistenceId: String = eventFilterPersistenceId | |
override def receiveRecover: Receive = { | |
case fr: FilterRules => rules = fr.rules | |
} | |
override def receiveCommand: Receive = { | |
case fr: FilterRules => | |
persist(fr) { filterRules => | |
rules = filterRules.rules | |
} | |
case m: AddRule => | |
val newRules = rules + (m.id -> m.rule) | |
persist(FilterRules(newRules)) { fr => | |
rules = fr.rules | |
sender() ! 'ruleAdded | |
log.info("Added rule: {} -> {}", m.id, m.rule) | |
} | |
case m: RemoveRule => | |
val newRules = rules - m.id | |
persist(FilterRules(newRules)) { nr => | |
rules = nr.rules | |
sender() ! 'ruleRemoved | |
log.info("Removed rule: {}", m.id) | |
} | |
} | |
} | |
object EventFilter { | |
def props(topic: String, consumerStream: KafkaStream[String, String], producer: Producer[String, String]) | |
(implicit execctx: ExecutionContext): Props = { | |
Props(new EventFilter(topic, consumerStream, producer)(execctx)) | |
} | |
} | |
class EventFilter(topic: String, consumerStream: KafkaStream[String, String], producer: Producer[String, String]) | |
(implicit execctx: ExecutionContext) | |
extends PersistentView with ActorLogging { | |
import Striimaus._ | |
override def persistenceId: String = eventFilterPersistenceId | |
override def viewId: String = eventFilterViewId | |
override def receive: Receive = ready(FilterRules(Map.empty)) | |
def ready(filterRules: FilterRules): Receive = { | |
case r: FilterRules => | |
log.info("New rules for topic {}: {}", topic, r) | |
context.become(ready(r)) | |
case e: Event => | |
log.info("New event for topic {}: {}", topic, e) | |
val ruleIds = processEvent(filterRules.rules, e) | |
if (ruleIds.nonEmpty) | |
triggerRules(e, ruleIds) | |
} | |
private def processEvent(rules: Map[String, Rule], event: Event): Set[String] = { | |
rules.filter({ case (_, rule) => rule(event.data) }).keySet | |
} | |
private def triggerRules(event: Event, ruleIds: Set[String]): Unit = { | |
log.info("Triggered on event {}: {}", event, ruleIds) | |
val data = TriggeredEvent(event, ruleIds) | |
producer.send(new KeyedMessage[String, String]("trigger", data.toString)) | |
} | |
def consumerProcess[Key, Value](stream: KafkaStream[Key, Value]) | |
(callback: (Key, Value) => Unit) | |
(implicit execctx: ExecutionContext): Future[Unit] = Future { | |
for (m <- stream) { | |
callback(m.key(), m.message()) | |
} | |
} recoverWith { | |
case _: InterruptedException => | |
consumerProcess(stream)(callback) | |
} | |
consumerProcess(consumerStream) { | |
(_, value) => self ! Event(value) | |
} | |
} | |
object StriimSystem { | |
private val defaultConfig = ConfigFactory.parseString( | |
"""akka.persistence { | |
| journal.plugin = "akka.persistence.journal.leveldb" | |
| journal.leveldb.store { | |
| # DO NOT USE 'native = off' IN PRODUCTION !!! | |
| native = off | |
| dir = "target/journal" | |
| } | |
| snapshot-store.plugin = "akka.persistence.snapshot-store.local" | |
| snapshot-store.local.dir = "target/snapshots" | |
|}""".stripMargin | |
) | |
private val sampleRules = Map( | |
"foo" -> Rule.Contains("foo"), | |
"bar" -> Rule.StartsWith("bar") | |
) | |
def boot(topics: Seq[String], | |
consumerConf: Map[String, String], | |
producerConf: Map[String, String], | |
actorSystemConfig: Config) | |
(implicit execctx: ExecutionContext) = { | |
val actorSystem = ActorSystem("StriimSystem", actorSystemConfig) | |
Persistence(actorSystem) | |
val actorRef = actorSystem.actorOf(StriimManager.props(topics, consumerConf, producerConf)) | |
StriimSystem(actorSystem, actorRef) | |
} | |
def main(args: Array[String]) = { | |
import scala.concurrent.ExecutionContext.Implicits.global | |
val system = boot( | |
topics = Seq("topic1", "topic2", "topic3"), | |
consumerConf = Map( | |
"zookeeper.connect" -> "192.168.59.103:2181", | |
"group.id" -> "mygroup", | |
"zookeeper.session.timeout.ms" -> "400", | |
"zookeeper.sync.time.ms" -> "200", | |
"auto.commit.interval.ms" -> "1000" | |
), | |
producerConf = Map( | |
"metadata.broker.list" -> "192.168.59.103:9092", | |
"serializer.class" -> "kafka.serializer.StringEncoder", | |
"request.required.acks" -> "1" | |
), | |
actorSystemConfig = defaultConfig | |
) | |
sampleRules foreach { | |
case (id, rule) => system.striimManager ! Striimaus.AddRule(id, rule) | |
} | |
} | |
} | |
case class StriimSystem(actorSystem: ActorSystem, striimManager: ActorRef) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment