Skip to content

Instantly share code, notes, and snippets.

@emaxerrno
Created May 22, 2013 19:56
Show Gist options
  • Select an option

  • Save emaxerrno/5630413 to your computer and use it in GitHub Desktop.

Select an option

Save emaxerrno/5630413 to your computer and use it in GitHub Desktop.
package com.yieldmo.storm;
import backtype.storm.Config
import backtype.storm.LocalCluster
import backtype.storm.StormSubmitter
import backtype.storm.spout.Scheme
import backtype.storm.tuple.Fields
import backtype.storm.tuple.Values
import com.yieldmo.common.kafka.KafkaConfigYieldmo
import com.yieldmo.common.kafka.KafkaConfigYieldmo._
import org.slf4j.LoggerFactory
import scala.Option
import storm.kafka.KafkaConfig
import storm.kafka.trident.OpaqueTridentKafkaSpout
import storm.kafka.trident.TridentKafkaConfig
import storm.kafka.trident.TridentKafkaConfig._
import storm.trident._
import storm.trident.operation.TridentCollector
import storm.trident.state._
import backtype.storm.task._
import com.yieldmo.common.protobuf._
import com.yieldmo.common.protobuf.Events._
import com.yieldmo.storm.scheme._
import scala.collection.JavaConversions._
import storm.trident.tuple.TridentTuple
class Saver extends State {
val serialVersionUID = 1L;
private val log = LoggerFactory.getLogger(this.getClass);
def saveBulk(toSave: List[ActionSeed]) = {
toSave.foreach(println)
}
override def beginCommit(txId: java.lang.Long): Unit = {}
override def commit(txId: java.lang.Long): Unit = {}
}
class Updater extends BaseStateUpdater[Saver] {
val serialVersionUID = 1L;
private val log = LoggerFactory.getLogger(this.getClass);
override def updateState(state: Saver, tuples: java.util.List[TridentTuple], col: TridentCollector): Unit = {
log.info("Not there yet")
try {
val l = tuples.toList.collect {
case t: storm.trident.tuple.TridentTuple =>
t.getValueByField("proto").asInstanceOf[ActionSeed]
}
state.saveBulk(l)
l.foreach(x => col.emit(new Values(x)))
} catch {
case e: Exception =>
col.reportError(e);
log.error("Unable to complete state update", e);
throw new RuntimeException(e);
}
}
}
class ActionSeedTopology extends App {
val serialVersionUID = 1L;
val ActionSeedStreamId = "v1_action_seed";
private val log = LoggerFactory.getLogger(this.getClass);
val factory = new StateFactory {
override def makeState(conf: java.util.Map[_, _],
metrics: IMetricsContext,
pIndex: Int,
numPartitions: Int) = new Saver()
}
val stormconfig = new Config();
val trident = new TridentTopology();
val protoStream = trident.newStream(ActionSeedStreamId,
createSpout(new ProtoScheme[ActionSeed]("proto"), KafkaConfigYieldmo.actionSeedTopic))
protoStream.partitionPersist(factory, new Fields("proto"), new Updater)
log.info("About to start the cluster")
if (args.length == 0) {
log.info("Working in Local Cluster Mode")
new LocalCluster().submitTopology("Local Topology Action Seed", stormconfig, trident.build)
} else {
log.info("Starting storm submitter topology: Production Mode")
stormconfig.setNumWorkers(1)
StormSubmitter.submitTopology(args(0), stormconfig, trident.build)
}
log.info("Finished submitting the topology")
def createSpout(scheme: Scheme, topic: String): OpaqueTridentKafkaSpout = {
val hosts = asJavaList(KafkaConfigYieldmo.kafkaClickTrackerServerBrokerHosts())
val partitions = KafkaConfigYieldmo.kafkaClickTrackerTopicPartitions();
val kafkaConf = new TridentKafkaConfig(KafkaConfig.StaticHosts.fromHostString(hosts, partitions), topic);
kafkaConf.scheme = scheme;
kafkaConf.bufferSizeBytes = KafkaConfigYieldmo.kafkaConsumerBufferSize();
log.info("Set the size of the fetch to:" + kafkaConf.bufferSizeBytes +
" bytes and the topic of the kafka queues to:" + kafkaConf.topic);
new OpaqueTridentKafkaSpout(kafkaConf);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment