-
-
Save darrenfu/3867fbc16d8dbf069518654c8d12bbf5 to your computer and use it in GitHub Desktop.
Pipe a Kafka consumer to a WebSocket on Play! Framework.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kafka { | |
consumer { | |
group.id = "default_consumer_group" | |
zookeeper.connect = "localhost:2181" | |
auto.offset.reset = "smallest" | |
consumer.timeout.ms = "-1" | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package controllers | |
import java.util.Properties | |
import com.typesafe.config.ConfigFactory | |
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, Whitelist} | |
import kafka.serializer.StringDecoder | |
import play.api.libs.iteratee.{Enumerator, Iteratee} | |
import play.api.mvc.{Controller, WebSocket} | |
import scala.collection.JavaConversions._ | |
object KafkaFeed extends Controller { | |
private val kafkaConsumerConfig = | |
new ConsumerConfig(ConfigFactory.load().getConfig("kafka.consumer").entrySet().foldRight(new Properties) { | |
(item, props) => | |
props.setProperty(item.getKey, item.getValue.unwrapped().toString) | |
props | |
}) | |
private def connect(config: ConsumerConfig) = Consumer.create(config) | |
private def consume(topic: String, connection: ConsumerConnector) = | |
connection.createMessageStreamsByFilter(new Whitelist(topic), 1, new StringDecoder, new StringDecoder).headOption.map(_.toStream) | |
def listenTo(topic: String) = WebSocket.using[String] { _ => | |
val connection = connect(kafkaConsumerConfig) | |
var connected = true | |
val endOnDisconnection = Iteratee.foreach[String](println).map { _ => | |
connection.shutdown() | |
connected = false | |
} | |
val pipeFromKafka = consume(topic, connection) | |
.map(Enumerator.unfold(_) { s => if (connected) Some(s.tail, s.head.message()) else None }) | |
.getOrElse(Enumerator.eof[String]) | |
endOnDisconnection -> pipeFromKafka | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment