Skip to content

Instantly share code, notes, and snippets.

Created February 3, 2019 14:51
Show Gist options
  • Save adrianprecub/a048fe428de4676baa2f1fbad20fc97c to your computer and use it in GitHub Desktop.
Save adrianprecub/a048fe428de4676baa2f1fbad20fc97c to your computer and use it in GitHub Desktop.
package org.interview.functions;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.test.Tweet;
import org.test.listeners.SimpleStreamListener;
import twitter4j.FilterQuery;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;
import java.util.Properties;
public class Twitter4JSourceFunction implements SourceFunction<Tweet> {
private boolean running = true;
private String[] filterTerms;
private Properties properties;
public TwitterStreamGeneratorFunction(Properties properties, String... filterTerms) { = properties;
this.filterTerms = filterTerms;
public void run(SourceContext<Tweet> ctx) throws Exception {
ConfigurationBuilder cb = new ConfigurationBuilder();
TwitterStream stream = new TwitterStreamFactory(;
stream.addListener(new SimpleStreamListener(ctx));
FilterQuery query = new FilterQuery();
public void cancel() {
running = false;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment