Created
May 22, 2013 19:56
-
-
Save emaxerrno/5630413 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package com.yieldmo.storm; | |
| import backtype.storm.Config | |
| import backtype.storm.LocalCluster | |
| import backtype.storm.StormSubmitter | |
| import backtype.storm.spout.Scheme | |
| import backtype.storm.tuple.Fields | |
| import backtype.storm.tuple.Values | |
| import com.yieldmo.common.kafka.KafkaConfigYieldmo | |
| import com.yieldmo.common.kafka.KafkaConfigYieldmo._ | |
| import org.slf4j.LoggerFactory | |
| import scala.Option | |
| import storm.kafka.KafkaConfig | |
| import storm.kafka.trident.OpaqueTridentKafkaSpout | |
| import storm.kafka.trident.TridentKafkaConfig | |
| import storm.kafka.trident.TridentKafkaConfig._ | |
| import storm.trident._ | |
| import storm.trident.operation.TridentCollector | |
| import storm.trident.state._ | |
| import backtype.storm.task._ | |
| import com.yieldmo.common.protobuf._ | |
| import com.yieldmo.common.protobuf.Events._ | |
| import com.yieldmo.storm.scheme._ | |
| import scala.collection.JavaConversions._ | |
| import storm.trident.tuple.TridentTuple | |
| class Saver extends State { | |
| val serialVersionUID = 1L; | |
| private val log = LoggerFactory.getLogger(this.getClass); | |
| def saveBulk(toSave: List[ActionSeed]) = { | |
| toSave.foreach(println) | |
| } | |
| override def beginCommit(txId: java.lang.Long): Unit = {} | |
| override def commit(txId: java.lang.Long): Unit = {} | |
| } | |
| class Updater extends BaseStateUpdater[Saver] { | |
| val serialVersionUID = 1L; | |
| private val log = LoggerFactory.getLogger(this.getClass); | |
| override def updateState(state: Saver, tuples: java.util.List[TridentTuple], col: TridentCollector): Unit = { | |
| log.info("Not there yet") | |
| try { | |
| val l = tuples.toList.collect { | |
| case t: storm.trident.tuple.TridentTuple => | |
| t.getValueByField("proto").asInstanceOf[ActionSeed] | |
| } | |
| state.saveBulk(l) | |
| l.foreach(x => col.emit(new Values(x))) | |
| } catch { | |
| case e: Exception => | |
| col.reportError(e); | |
| log.error("Unable to complete state update", e); | |
| throw new RuntimeException(e); | |
| } | |
| } | |
| } | |
| class ActionSeedTopology extends App { | |
| val serialVersionUID = 1L; | |
| val ActionSeedStreamId = "v1_action_seed"; | |
| private val log = LoggerFactory.getLogger(this.getClass); | |
| val factory = new StateFactory { | |
| override def makeState(conf: java.util.Map[_, _], | |
| metrics: IMetricsContext, | |
| pIndex: Int, | |
| numPartitions: Int) = new Saver() | |
| } | |
| val stormconfig = new Config(); | |
| val trident = new TridentTopology(); | |
| val protoStream = trident.newStream(ActionSeedStreamId, | |
| createSpout(new ProtoScheme[ActionSeed]("proto"), KafkaConfigYieldmo.actionSeedTopic)) | |
| protoStream.partitionPersist(factory, new Fields("proto"), new Updater) | |
| log.info("About to start the cluster") | |
| if (args.length == 0) { | |
| log.info("Working in Local Cluster Mode") | |
| new LocalCluster().submitTopology("Local Topology Action Seed", stormconfig, trident.build) | |
| } else { | |
| log.info("Starting storm submitter topology: Production Mode") | |
| stormconfig.setNumWorkers(1) | |
| StormSubmitter.submitTopology(args(0), stormconfig, trident.build) | |
| } | |
| log.info("Finished submitting the topology") | |
| def createSpout(scheme: Scheme, topic: String): OpaqueTridentKafkaSpout = { | |
| val hosts = asJavaList(KafkaConfigYieldmo.kafkaClickTrackerServerBrokerHosts()) | |
| val partitions = KafkaConfigYieldmo.kafkaClickTrackerTopicPartitions(); | |
| val kafkaConf = new TridentKafkaConfig(KafkaConfig.StaticHosts.fromHostString(hosts, partitions), topic); | |
| kafkaConf.scheme = scheme; | |
| kafkaConf.bufferSizeBytes = KafkaConfigYieldmo.kafkaConsumerBufferSize(); | |
| log.info("Set the size of the fetch to:" + kafkaConf.bufferSizeBytes + | |
| " bytes and the topic of the kafka queues to:" + kafkaConf.topic); | |
| new OpaqueTridentKafkaSpout(kafkaConf); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment