Created
October 31, 2011 04:42
-
-
Save mardambey/1326933 to your computer and use it in GitHub Desktop.
Sample Kafka stdin producer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.edate.data.test | |
import java.util.Properties | |
import kafka.producer.ProducerConfig | |
import kafka.producer.Producer | |
import kafka.message.Message | |
import kafka.producer.ProducerData | |
import kafka.producer.ProducerData | |
import kafka.producer.Partitioner | |
/** | |
* stream | ./KafkaProducerTest zk-info topic | |
* where the data is msg | |
* or | |
* stream | ./KafkaProducerTest zk-info | |
* where the data is topic msg | |
*/ | |
object KafkaProducerTest extends App { | |
val producerProps = new Properties() | |
producerProps.put("zk.connect", args(0)) | |
//producerProps.put("partitioner.class", "com.edate.data.test.CustomPartitioner") | |
//producerProps.put("host", "192.168.100.10") | |
//producerProps.put("port", "9091") | |
// we can use the async producer for buffered sends | |
val producerConfig = new ProducerConfig(producerProps) | |
val topic = if (args.size > 1) args(1) else null | |
val producer = new Producer[String, Message](producerConfig) | |
for( ln <- io.Source.stdin.getLines ) { | |
if (topic != null) { | |
producer.send(new ProducerData(topic, new Message(ln.getBytes))) | |
println(topic + " -> " + ln) | |
} else { | |
try { | |
val t = ln.substring(0, ln.indexOf(" ")) | |
val msg = ln.substring(ln.indexOf(" ") + 1, ln.size) | |
println(t + " -> " + msg) | |
producer.send(new ProducerData(t, new Message(msg.getBytes))) | |
} catch { | |
case e:Exception => println(e.getMessage) | |
} | |
} | |
} | |
} | |
class CustomPartitioner extends Partitioner[Message] { | |
val r = new scala.util.Random | |
def partition(data: Message, numPartitions: Int): Int = { | |
r.nextInt(3) + 1 | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment