Last active
March 30, 2023 10:48
-
-
Save mananai/cc73b09d98de46ae1b3e6fd6306e01ab to your computer and use it in GitHub Desktop.
Main class to save Twitter sample stream into SQL database
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| public class SampleStream2DB { | |
| private static final int THREAD_POOL_SIZE = 2; | |
| static Logger logger = Logger.getLogger(SampleStream2DB.class.getName()); | |
| public static void main(String[] args) throws SQLException { | |
| if (args.length < 3) | |
| throw new IllegalArgumentException(); | |
| final String jdbcURL = args[0]; | |
| final String lang = args[1]; | |
| final String[] bearerTokenArray = Arrays.copyOfRange(args, 2, args.length); | |
| logger.info(format("Database url: %s", jdbcURL)); | |
| logger.info(format("Lang: %s", lang)); | |
| logger.info(format("PID: %d", ProcessHandle.current().pid())); | |
| logger.info(format("Use \"kill\" command to terminate the program")); | |
| final BlockingQueue<Tweet[]> tweetsQueue = new LinkedBlockingQueue<>(); | |
| final AtomicBoolean isTerminated = new AtomicBoolean(false); | |
| final Predicate<Tweet> predicate = tweet->lang.equals(tweet.getLang()); | |
| TwitterDBWriter dbWriterTask = new TwitterDBWriter(jdbcURL, tweetsQueue); | |
| SampleStreamCollector streamCollector = new SampleStreamCollector(predicate, /*lang,*/ bearerTokenArray, tweetsQueue, isTerminated); | |
| ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE); | |
| Future<Integer> streamFuture = executorService.submit(streamCollector); | |
| Future<Integer> dbFuture = executorService.submit(dbWriterTask); | |
| Runtime.getRuntime().addShutdownHook(new Thread() { | |
| private Logger logger = Logger.getLogger(this.getClass().getName()); | |
| @Override | |
| public void run() { | |
| logger.warning("!!! Got a request to terminate the program !!!"); | |
| isTerminated.set(true); | |
| executorService.shutdown(); | |
| try { | |
| if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) { | |
| executorService.shutdownNow(); | |
| } | |
| } catch (InterruptedException e) { | |
| executorService.shutdownNow(); | |
| } | |
| System.out.println("Shutdown process is done"); | |
| } | |
| }); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment