Skip to content

Instantly share code, notes, and snippets.

@adrianprecub
Created February 3, 2019 14:51
Show Gist options
  • Save adrianprecub/a048fe428de4676baa2f1fbad20fc97c to your computer and use it in GitHub Desktop.
Save adrianprecub/a048fe428de4676baa2f1fbad20fc97c to your computer and use it in GitHub Desktop.
package org.interview.functions;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.test.Tweet;
import org.test.listeners.SimpleStreamListener;
import twitter4j.FilterQuery;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;
import java.util.Properties;
public class Twitter4JSourceFunction implements SourceFunction<Tweet> {
private boolean running = true;
private String[] filterTerms;
private Properties properties;
public TwitterStreamGeneratorFunction(Properties properties, String... filterTerms) {
this.properties = properties;
this.filterTerms = filterTerms;
}
@Override
public void run(SourceContext<Tweet> ctx) throws Exception {
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setOAuthConsumerKey(properties.getProperty("oauth.consumerKey"))
.setOAuthConsumerSecret(properties.getProperty("oauth.consumerSecret"))
.setOAuthAccessToken(properties.getProperty("oauth.accessToken"))
.setOAuthAccessTokenSecret(properties.getProperty("oauth.accessTokenSecret"));
TwitterStream stream = new TwitterStreamFactory(cb.build()).getInstance();
stream.addListener(new SimpleStreamListener(ctx));
FilterQuery query = new FilterQuery();
query.track(filterTerms);
stream.filter(query);
while(running){}
}
@Override
public void cancel() {
running = false;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment