Skip to content

Instantly share code, notes, and snippets.

@mananai
Last active March 30, 2023 10:48
Show Gist options
  • Select an option

  • Save mananai/cc73b09d98de46ae1b3e6fd6306e01ab to your computer and use it in GitHub Desktop.

Select an option

Save mananai/cc73b09d98de46ae1b3e6fd6306e01ab to your computer and use it in GitHub Desktop.
Main class to save Twitter sample stream into SQL database
public class SampleStream2DB {
private static final int THREAD_POOL_SIZE = 2;
static Logger logger = Logger.getLogger(SampleStream2DB.class.getName());
public static void main(String[] args) throws SQLException {
if (args.length < 3)
throw new IllegalArgumentException();
final String jdbcURL = args[0];
final String lang = args[1];
final String[] bearerTokenArray = Arrays.copyOfRange(args, 2, args.length);
logger.info(format("Database url: %s", jdbcURL));
logger.info(format("Lang: %s", lang));
logger.info(format("PID: %d", ProcessHandle.current().pid()));
logger.info(format("Use \"kill\" command to terminate the program"));
final BlockingQueue<Tweet[]> tweetsQueue = new LinkedBlockingQueue<>();
final AtomicBoolean isTerminated = new AtomicBoolean(false);
final Predicate<Tweet> predicate = tweet->lang.equals(tweet.getLang());
TwitterDBWriter dbWriterTask = new TwitterDBWriter(jdbcURL, tweetsQueue);
SampleStreamCollector streamCollector = new SampleStreamCollector(predicate, /*lang,*/ bearerTokenArray, tweetsQueue, isTerminated);
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
Future<Integer> streamFuture = executorService.submit(streamCollector);
Future<Integer> dbFuture = executorService.submit(dbWriterTask);
Runtime.getRuntime().addShutdownHook(new Thread() {
private Logger logger = Logger.getLogger(this.getClass().getName());
@Override
public void run() {
logger.warning("!!! Got a request to terminate the program !!!");
isTerminated.set(true);
executorService.shutdown();
try {
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
System.out.println("Shutdown process is done");
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment