Skip to content

Instantly share code, notes, and snippets.

@mannharleen
Last active September 12, 2017 07:04
Show Gist options
  • Select an option

  • Save mannharleen/e4572c19e4ed6507064bd934b0e06c14 to your computer and use it in GitHub Desktop.

Select an option

Save mannharleen/e4572c19e4ed6507064bd934b0e06c14 to your computer and use it in GitHub Desktop.
Spark flume integration: Based on "pull based (aka polling) approach"
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.streaming.{StreamingContext, Seconds}
val conf = new org.apache.spark.SparkConf().setAppName("a")
val ssc = new StreamingContext(conf, Seconds(10))
val dstream = createPollingStream(ssc,"quickstart.cloudera",19999)
//notice that we are using 'polling'stream
dstream.print
ssc.start
ssc.stop(true)
/*
Pre requisites:
------------------------------------------
agent2.conf
------------------------------------------
#netcat->memory ->spark custom sink
# ->logger sink
agent2.sources = source1
agent2.channels = channel1
agent2.sinks = sink1 sink2
agent2.sources.source1.type = netcat
agent2.sources.source1.bind = quickstart.cloudera
agent2.sources.source1.port = 9998
agent2.sources.source1.channels = channel1
agent2.channels.channel1.type = memory
agent2.sinks.sink1.type = org.apache.spark.streaming.flume.sink.SparkSink
agent2.sinks.sink1.hostname = quickstart.cloudera
agent2.sinks.sink1.port = 19999
agent2.sinks.sink1.channel = channel1
agent2.sinks.sink2.type = logger
agent2.sinks.sink2.channel = channel1
------------------------------------------
command
------------------------------------------
cmd1> flume-ng agent -c /home/cloudera/flume -f /home/cloudera/flume/agent2.conf -n agent2
cmd2> nc localhost 9998
------------------------------------------
Move dependencies to usr/lib/flume-ng
------------------------------------------
wget http://search.maven.org/remotecontent?filepath=org/apache/spark/spark-streaming-flume-sink_2.10/1.6.2/spark-streaming-flume-sink_2.10-1.6.2.jar -O spark-streaming-flume-sink_2.10-1.6.2.jar
wget http://search.maven.org/remotecontent?filepath=org/scala-lang/scala-library/2.10.5/scala-library-2.10.5.jar -O scala-library-2.10.5.jar
wget http://search.maven.org/remotecontent?filepath=org/apache/commons/commons-lang3/3.3.2/commons-lang3-3.3.2.jar -O commons-lang3-3.3.2.jar
cp *.jar /usr/lib/flume-ng/lib/
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment