Skip to content

Instantly share code, notes, and snippets.

@stonegao
Forked from mardambey/KafkaEmbedded.scala
Created May 11, 2012 03:11
Show Gist options
  • Save stonegao/2657294 to your computer and use it in GitHub Desktop.
Save stonegao/2657294 to your computer and use it in GitHub Desktop.
Embedded Kafka broker / producer / simple consumer in a single process useful for testing or for persistent queues.
import java.util.Properties
import kafka.server.KafkaServer
import kafka.server.KafkaConfig
import kafka.producer.ProducerConfig
import kafka.producer.Producer
import kafka.message.Message
import kafka.producer.ProducerData
import kafka.consumer.ConsumerConfig
import kafka.consumer.Consumer
import kafka.utils.Utils
import kafka.consumer.SimpleConsumer
import kafka.api.FetchRequest
object KafkaEmbedded extends App {
val props = new Properties()
props.setProperty("hostname", "localhost")
props.setProperty("port", "9090");
props.setProperty("brokerid", "1")
props.setProperty("log.dir", "/tmp/embeddedkafka/")
props.setProperty("enable.zookeeper", "false")
val server = new KafkaServer(new KafkaConfig(props))
server.startup()
val prodProps = new Properties()
prodProps.setProperty("producer.type", "async")
prodProps.setProperty("queue.time", "2000")
prodProps.setProperty("queue.size", "100")
prodProps.setProperty("batch.size", "10")
prodProps.setProperty("broker.list", "1:localhost:9090")
val prodConfig = new ProducerConfig(prodProps)
val prod = (new Producer[String, Message](prodConfig))
for(i <- 1 to 200) {
prod.send(new ProducerData("TEST",new Message("testing 1 2 3".getBytes)))
}
val cons = new SimpleConsumer("localhost", 9090, 100, 1024)
var offset = 0L
var i = 0
while (true) {
val fetchRequest = new FetchRequest("TEST", 0, offset, 1024)
for (msg <- cons.fetch(fetchRequest)) {
i = i + 1
println("consumed [ " + i + "]: offset = " + msg.offset + ", payload = " + Utils.toString(msg.message.payload, "UTF-8"))
offset = msg.offset
}
}
sys.addShutdownHook({
prod.close()
cons.close()
server.shutdown()
server.awaitShutdown()
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment