Last active
September 29, 2021 14:50
-
-
Save stdatalabs/62661f1a0040a34bbc7a0d195f1ced5f to your computer and use it in GitHub Desktop.
A storm spout to produce a stream of tuples from the twitter streaming API. More @ stdatalabs.blogspot.com
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Map; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import twitter4j.FilterQuery; | |
import twitter4j.StallWarning; | |
import twitter4j.Status; | |
import twitter4j.StatusDeletionNotice; | |
import twitter4j.StatusListener; | |
import twitter4j.TwitterStream; | |
import twitter4j.TwitterStreamFactory; | |
import twitter4j.auth.AccessToken; | |
import twitter4j.conf.ConfigurationBuilder; | |
import backtype.storm.Config; | |
import backtype.storm.spout.SpoutOutputCollector; | |
import backtype.storm.task.TopologyContext; | |
import backtype.storm.topology.OutputFieldsDeclarer; | |
import backtype.storm.topology.base.BaseRichSpout; | |
import backtype.storm.tuple.Fields; | |
import backtype.storm.tuple.Values; | |
import backtype.storm.utils.Utils; | |
/** | |
* Arguments: <comsumerKey> <consumerSecret> <accessToken> <accessTokenSecret> <topic-name> <keyword_1> ... <keyword_n> | |
* <comsumerKey> - Twitter consumer key | |
* <consumerSecret> - Twitter consumer secret | |
* <accessToken> - Twitter access token | |
* <accessTokenSecret> - Twitter access token secret | |
* <keyword_1> - The keyword to filter tweets | |
* <keyword_n> - Any number of keywords to filter tweets | |
* | |
* More discussion at stdatalabs.blogspot.com | |
* | |
* @author Sachin Thirumala | |
*/ | |
@SuppressWarnings("serial") | |
public class TwitterSampleSpout extends BaseRichSpout { | |
SpoutOutputCollector _collector; | |
LinkedBlockingQueue<Status> queue = null; | |
TwitterStream _twitterStream; | |
String consumerKey; | |
String consumerSecret; | |
String accessToken; | |
String accessTokenSecret; | |
String[] keyWords; | |
public TwitterSampleSpout(String consumerKey, String consumerSecret, | |
String accessToken, String accessTokenSecret, String[] keyWords) { | |
this.consumerKey = consumerKey; | |
this.consumerSecret = consumerSecret; | |
this.accessToken = accessToken; | |
this.accessTokenSecret = accessTokenSecret; | |
this.keyWords = keyWords; | |
} | |
public TwitterSampleSpout() { | |
// TODO Auto-generated constructor stub | |
} | |
@Override | |
public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) { | |
queue = new LinkedBlockingQueue<Status>(1000); | |
_collector = collector; | |
StatusListener listener = new StatusListener() { | |
@Override | |
public void onStatus(Status status) { | |
queue.offer(status); | |
} | |
@Override | |
public void onDeletionNotice(StatusDeletionNotice sdn) {} | |
@Override | |
public void onTrackLimitationNotice(int i) {} | |
@Override | |
public void onScrubGeo(long l, long l1) {} | |
@Override | |
public void onException(Exception ex) {} | |
@Override | |
public void onStallWarning(StallWarning arg0) { | |
// TODO Auto-generated method stub | |
} | |
}; | |
ConfigurationBuilder cb = new ConfigurationBuilder(); | |
cb.setDebugEnabled(true) | |
.setOAuthConsumerKey(consumerKey) | |
.setOAuthConsumerSecret(consumerSecret) | |
.setOAuthAccessToken(accessToken) | |
.setOAuthAccessTokenSecret(accessTokenSecret); | |
_twitterStream = new TwitterStreamFactory(cb.build()).getInstance(); | |
_twitterStream.addListener(listener); | |
if (keyWords.length == 0) { | |
_twitterStream.sample(); | |
}else { | |
FilterQuery query = new FilterQuery().track(keyWords); | |
_twitterStream.filter(query); | |
} | |
} | |
@Override | |
public void nextTuple() { | |
Status ret = queue.poll(); | |
if (ret == null) { | |
Utils.sleep(50); | |
} else { | |
_collector.emit(new Values(ret)); | |
} | |
} | |
@Override | |
public void close() { | |
_twitterStream.shutdown(); | |
} | |
@Override | |
public Map<String, Object> getComponentConfiguration() { | |
Config ret = new Config(); | |
ret.setMaxTaskParallelism(1); | |
return ret; | |
} | |
@Override | |
public void ack(Object id) {} | |
@Override | |
public void fail(Object id) {} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
declarer.declare(new Fields("tweet")); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment