Skip to content

Instantly share code, notes, and snippets.

@ashrithr
Last active April 17, 2019 20:00
Show Gist options
  • Save ashrithr/6215763 to your computer and use it in GitHub Desktop.
Save ashrithr/6215763 to your computer and use it in GitHub Desktop.
kafka twitter producer, dumps twitter firehose to kafka.
begin
require 'tweetstream'
require 'poseidon'
rescue 'LoadError'
raise "Requires tweetstream and poseidon libraries"
end
raise "Works only on 1.9" if RUBY_VERSION < "1.9"
raise "Requires Argument keyword to match from twitter firehose" if ARGV.length == 0
# Name of the kafka topic to send data to
topic_name = "twitterstream"
# Number of messages to batch and flush
batch_size_max = 1000
# List of kafka brokers
kafka_broker_list = ["localhost:9092"]
TweetStream.configure do |config|
# <<< Fill these from dev.twitter.com >>> #
config.consumer_key = ''
config.consumer_secret = ''
config.oauth_token = ''
config.oauth_token_secret = ''
config.auth_method = :oauth
end
producer = Poseidon::Producer.new(kafka_broker_list, "ruby_producer")
messages = []
# Twitter Firehouse Data that matches keyword 'ARG[0]'
begin
loop do
counter = 0
TweetStream::Client.new.track(ARGV.join(',')) do |tweet|
puts tweet.user['screen_name'] + " => " + tweet.text
messages << Poseidon::MessageToSend.new(topic_name, tweet.text)
counter += 1
if counter == batch_size_max
puts "Flushing #{batch_size_max} to kafka"
producer.send_messages(messages)
messages.clear
counter = 0
end
end
end
rescue Interrupt
puts "Caught Interrupt ... Exiting!"
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment