Last active
February 25, 2024 09:03
-
-
Save sshark/86948a07830a2452f58544e98afc17d4 to your computer and use it in GitHub Desktop.
Uses flags or poison pill to synchronize producers and consumers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.teckhooi; | |
import java.time.Duration; | |
import java.util.LinkedList; | |
import java.util.Optional; | |
import java.util.Queue; | |
import java.util.UUID; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
public class ConsumerProducerQ { | |
final private int capacity; | |
final private Object lock = new Object(); | |
volatile private boolean alive = true; | |
volatile private boolean producerAlive = true; | |
final private Queue<String> bin = new LinkedList<>(); | |
final private Queue<String> queue = new LinkedList<>(); | |
public Queue<String> dump() { | |
return bin; | |
} | |
public boolean isEmpty() { | |
return queue.isEmpty(); | |
} | |
public boolean isAlive() { | |
return alive; | |
} | |
public void kill() { | |
this.alive = false; | |
} | |
public boolean isProducerAlive() { | |
return producerAlive; | |
} | |
public void killProducer() { | |
this.producerAlive = false; | |
} | |
public ConsumerProducerQ(int capacity) { | |
this.capacity = capacity; | |
} | |
public ConsumerProducerQ() { | |
this(Integer.MAX_VALUE); | |
} | |
public void consume() throws Exception { | |
synchronized (lock) { | |
if (queue.isEmpty()) { | |
lock.wait(200); | |
if (!isProducerAlive()) { | |
return; | |
} | |
} | |
String data = queue.remove(); | |
lock.notifyAll(); | |
bin.add(data); | |
// delay.ifPresent(d -> throwRuntimeExp(() -> Class.forName("abc"))); | |
} | |
} | |
private void throwRuntimeExp(CheckedFunction0 r) { | |
try { | |
r.apply(); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
public void produce() throws Exception { | |
synchronized (lock) { | |
if (queue.size() >= capacity) { | |
lock.wait(); | |
} | |
String data = UUID.randomUUID().toString().substring(0, 5); | |
queue.offer(data); | |
lock.notifyAll(); | |
bin.add(data); | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
final ConsumerProducerQ cpQ = new ConsumerProducerQ(5); | |
final CountDownLatch latch = new CountDownLatch(1); | |
final Runnable producer = () -> { | |
while (cpQ.isAlive()) { | |
try { | |
cpQ.produce(); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
cpQ.killProducer(); | |
}; | |
final Runnable consumer = () -> { | |
while (cpQ.isProducerAlive() || !cpQ.isEmpty()) { | |
try { | |
cpQ.consume(); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
latch.countDown(); | |
}; | |
try (ExecutorService es = Executors.newCachedThreadPool()) { | |
es.submit(consumer); | |
es.submit(producer); | |
Thread.sleep(1000); | |
cpQ.kill(); | |
latch.await(); | |
} | |
cpQ.dump().forEach(System.out::println); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.teckhooi; | |
import java.io.FileWriter; | |
import java.io.PrintWriter; | |
import java.util.LinkedList; | |
import java.util.Optional; | |
import java.util.Queue; | |
import java.util.UUID; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
public class PoisonPillQ { | |
final private int capacity; | |
final private Object lock = new Object(); | |
volatile private boolean alive = true; | |
final private Queue<String> bin = new LinkedList<>(); | |
final private Queue<Optional<String>> queue = new LinkedList<>(); | |
public Queue<String> dump() { | |
return bin; | |
} | |
public boolean isEmpty() { | |
return queue.isEmpty(); | |
} | |
public boolean isAlive() { | |
return alive; | |
} | |
public void kill() { | |
this.alive = false; | |
} | |
public void killProducer() { | |
try { | |
produce(Optional.empty()); // poison pill | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
public ConsumerProducerQ(int capacity) { | |
this.capacity = capacity; | |
} | |
public ConsumerProducerQ() { | |
this(Integer.MAX_VALUE); | |
} | |
public boolean consume() throws Exception { | |
synchronized (lock) { | |
while (queue.isEmpty()) { | |
lock.wait(); | |
} | |
Optional<String> data = queue.remove(); | |
data.ifPresent(d -> { | |
bin.add(d); | |
}); | |
lock.notifyAll(); | |
return data.isEmpty(); | |
// delay.ifPresent(d -> throwRuntimeExp(() -> Class.forName("abc"))); | |
} | |
} | |
private void throwRuntimeExp(CheckedFunction0 r) { | |
try { | |
r.apply(); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
public void produce() throws Exception { | |
produce(Optional.of(UUID.randomUUID().toString().substring(0, 8))); | |
} | |
public void produce(Optional<String> data) throws Exception { | |
synchronized (lock) { | |
if (queue.size() >= capacity) { | |
lock.wait(); | |
} | |
queue.offer(data); | |
lock.notifyAll(); | |
data.ifPresent(bin::add); | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
final ConsumerProducerQ cpQ = new ConsumerProducerQ(5); | |
final CountDownLatch latch = new CountDownLatch(1); | |
final Runnable producer = () -> { | |
while (cpQ.isAlive()) { | |
try { | |
cpQ.produce(); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
cpQ.killProducer(); | |
}; | |
final Runnable consumer = () -> { | |
while (true) { | |
try { | |
if(cpQ.consume()) { | |
break; | |
}; | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
latch.countDown(); | |
}; | |
try (ExecutorService es = Executors.newCachedThreadPool()) { | |
es.submit(consumer); | |
es.submit(producer); | |
Thread.sleep(1000); | |
cpQ.kill(); | |
latch.await(); | |
} | |
var sortedResult = cpQ.dump().stream().sorted().toList().toArray(new String[0]); | |
if (sortedResult.length % 2 != 0) { | |
throw new RuntimeException("Result count should be even"); | |
} | |
/* | |
try (PrintWriter writer = new PrintWriter(new FileWriter("results.txt"))){ | |
// cpQ.dump().forEach(System.out::println); | |
cpQ.dump().forEach(writer::println); | |
} | |
*/ | |
for (int i = 0; i < sortedResult.length; i = i + 2) { | |
if (!sortedResult[i].equals(sortedResult[i + 1])) { | |
throw new RuntimeException(String.format("The results does not tally, %s, %s", sortedResult[i], sortedResult[i + 1])); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment