Skip to content

Instantly share code, notes, and snippets.

@emaxerrno
Created May 17, 2013 17:38
Show Gist options
  • Select an option

  • Save emaxerrno/5600663 to your computer and use it in GitHub Desktop.

Select an option

Save emaxerrno/5600663 to your computer and use it in GitHub Desktop.
package com.yieldmo.common.protobuf
import com.google.protobuf.Message
import java.lang.AutoCloseable
import java.util.Properties
import kafka.producer.Producer
import kafka.producer.ProducerConfig
import kafka.producer.ProducerData
import kafka.serializer.Encoder
import scala.reflect.ClassDef
class ProtoEncoder extends Encoder[Message]{
def toMessage(proto: Message) = new kafka.message.Message(proto.toByteArray)
}
trait ProtoKafkaWriter extends AutoCloseable{
def producer: Producer[String, Message]
def write(topic:String, item: Message) = producer.send(new ProducerData[String,Message](topic,item))
def close = producer.close
}
private object ProtoWriter{
private val clzz = classOf[ProtoEncoder].getCanonicalName
def createConfZk(zk : String) : Properties = {
val props = new Properties()
props.setProperty("zk.connect", zk)
props.setProperty("serializer.class", clzz)
props
}
def createConfBrokerList(brokers : String) : Properties = {
val props = new Properties()
props.setProperty("broker.list", brokers)
props.setProperty("serializer.class", clzz)
props
}
}
class ProtoWriter(conf: Properties) extends ProtoKafkaWriter {
val producer = new Producer[String, Message](new ProducerConfig(conf))
}
class ZkProtoWriter(zk: String) extends ProtoWriter(ProtoWriter.createConfZk(zk));
class BrokerListProtoWriter(bkList: String) extends ProtoWriter(ProtoWriter.createConfBrokerList(bkList));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment