Skip to content

Instantly share code, notes, and snippets.

@emaxerrno
Created May 28, 2013 14:46
Show Gist options
  • Select an option

  • Save emaxerrno/5663288 to your computer and use it in GitHub Desktop.

Select an option

Save emaxerrno/5663288 to your computer and use it in GitHub Desktop.
package com.yieldmo.storm.scheme
import backtype.storm.spout.Scheme
import backtype.storm.tuple.Fields
import com.google.protobuf._
import collection.JavaConversions._
import java.lang.reflect.Modifier
class ProtoScheme[T <: Message: ClassManifest](emittedField: String) extends Scheme {
val serialVersionUID = 1L;
// TODO(agallego): need one more check on the method aka after x.getName == "" to make sure that the num
// of parameters that the newBuilder takes is exactly zero
// sometimes it picks a different method and throws a runtime exception
//
@transient lazy val builder =
classManifest[T].erasure.getMethods.find(x => x.getName == "newBuilder" && Modifier.isStatic(x.getModifiers)).map(m => m.invoke(m.getDefaultValue).asInstanceOf[Message.Builder]).get
override def deserialize(bytes: Array[Byte]) = asJavaList(List[Object](builder.mergeFrom(bytes).build()))
override def getOutputFields: Fields = new Fields(emittedField)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment