Created
January 25, 2018 03:25
-
-
Save PrabhatKJena/cf5985a475488a95a76987da769e598b to your computer and use it in GitHub Desktop.
Publisher-Subscriber Thread Implementation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package edu.pk.java.test; | |
import java.util.Random; | |
import java.util.concurrent.ArrayBlockingQueue; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.TimeUnit; | |
import static edu.pk.java.test.PublisherSubscriberTest.*; | |
public class PublisherSubscriberTest { | |
private static final int QUEUE_MAX_SIZE = 100; | |
public static final long THREAD_WAIT_TIME_IN_MS = 20; | |
public static void main(String[] args) { | |
BlockingQueue<Integer> buffer = new ArrayBlockingQueue<>(QUEUE_MAX_SIZE); | |
Writer writer = new Writer(buffer); | |
Reader reader1 = new Reader(buffer); | |
Reader reader2 = new Reader(buffer); | |
Reader reader3 = new Reader(buffer); | |
new Thread(writer).start(); | |
new Thread(reader1).start(); | |
new Thread(reader2).start(); | |
new Thread(reader3).start(); | |
try { | |
Thread.sleep(200L); // This is for testing purpose. Wait and Send shutdown signal after this. | |
} catch (InterruptedException e) { | |
} | |
Writer.setShutDown(true); | |
Reader.setShutDown(true, false); | |
} | |
} | |
class Writer implements Runnable { | |
private BlockingQueue<Integer> queue; | |
private static volatile boolean isShutDown; | |
public Writer(BlockingQueue<Integer> queue) { | |
this.queue = queue; | |
} | |
@Override | |
public void run() { | |
Random random = new Random(); | |
while (!isShutDown) { | |
int nextInt = random.nextInt(Integer.MAX_VALUE); | |
try { | |
// wait for 200ms, then retry | |
while (!queue.offer(nextInt, THREAD_WAIT_TIME_IN_MS, TimeUnit.MILLISECONDS)) { | |
System.out.println("Retrying........"); | |
} | |
System.out.printf("\n%s:%s -> %d", "Writing by", Thread.currentThread().getName(), nextInt); | |
} catch (InterruptedException e) { | |
} | |
} | |
} | |
public static void setShutDown(boolean shutDown) { | |
isShutDown = shutDown; | |
} | |
} | |
class Reader implements Runnable { | |
private BlockingQueue<Integer> queue; | |
private static volatile boolean isShutDown; // this will trigger shutdown signal but it will read the remaining items from the buffer if isForced = FALSE | |
private static volatile boolean isForced; // this is to stop immediately | |
public Reader(BlockingQueue<Integer> queue) { | |
this.queue = queue; | |
} | |
@Override | |
public void run() { | |
while (true) { | |
// check for shutdown signal | |
boolean shutdown = false; | |
if (isShutDown && isForced) | |
shutdown = true; | |
else if (isShutDown && !isForced && queue.size() < 1) // if isForced = FALSE, read remaining items from buffer | |
shutdown = true; | |
if (shutdown) { | |
break; // Stop Reading | |
} | |
try { | |
// Wait for 200ms if no item to retrieve from buffer | |
Integer poll = queue.poll(THREAD_WAIT_TIME_IN_MS, TimeUnit.MILLISECONDS); | |
if (poll != null) | |
System.out.printf("\n%30s:%s -> %d", "Reading by", Thread.currentThread().getName(), poll); | |
} catch (InterruptedException e) { | |
} | |
} | |
System.out.println("Remaining items:" + queue); | |
} | |
public static void setShutDown(boolean shutDown, boolean forced) { | |
isShutDown = shutDown; | |
isForced = forced; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment