Created
February 3, 2019 14:51
-
-
Save adrianprecub/a048fe428de4676baa2f1fbad20fc97c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.interview.functions; | |
import org.apache.flink.streaming.api.functions.source.SourceFunction; | |
import org.test.Tweet; | |
import org.test.listeners.SimpleStreamListener; | |
import twitter4j.FilterQuery; | |
import twitter4j.TwitterStream; | |
import twitter4j.TwitterStreamFactory; | |
import twitter4j.conf.ConfigurationBuilder; | |
import java.util.Properties; | |
public class Twitter4JSourceFunction implements SourceFunction<Tweet> { | |
private boolean running = true; | |
private String[] filterTerms; | |
private Properties properties; | |
public TwitterStreamGeneratorFunction(Properties properties, String... filterTerms) { | |
this.properties = properties; | |
this.filterTerms = filterTerms; | |
} | |
@Override | |
public void run(SourceContext<Tweet> ctx) throws Exception { | |
ConfigurationBuilder cb = new ConfigurationBuilder(); | |
cb.setOAuthConsumerKey(properties.getProperty("oauth.consumerKey")) | |
.setOAuthConsumerSecret(properties.getProperty("oauth.consumerSecret")) | |
.setOAuthAccessToken(properties.getProperty("oauth.accessToken")) | |
.setOAuthAccessTokenSecret(properties.getProperty("oauth.accessTokenSecret")); | |
TwitterStream stream = new TwitterStreamFactory(cb.build()).getInstance(); | |
stream.addListener(new SimpleStreamListener(ctx)); | |
FilterQuery query = new FilterQuery(); | |
query.track(filterTerms); | |
stream.filter(query); | |
while(running){} | |
} | |
@Override | |
public void cancel() { | |
running = false; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment