Skip to content

Instantly share code, notes, and snippets.

@wy8162
Last active October 6, 2017 12:47
Show Gist options
  • Save wy8162/e0419aa3bb9838546286 to your computer and use it in GitHub Desktop.
Save wy8162/e0419aa3bb9838546286 to your computer and use it in GitHub Desktop.
Java Concurrency
// Producer / Consumer based on Blocking Queue
//
import java.util.concurrent.*;
import java.io.FileFilter;
public class FileCrawler implements Runnable {
private final BlockingQueue<File> fileQueue;
private final FileFilter fileFilter;
private final File root;
public FileCrawler(BlockingQueue<File> fileQueue, final FileFilter fileFilter, root)
{
this.fileQueue = fileQueue;
this.root = root;
this.fileFilter = new FileFilter() {
public boolean accept(File f) {
return f.isDirectory() || fileFilter.accept(f);
}
};
}
private boolean alreadyIndexed(File f) { return false; }
public void run() {
try { crawl(root); }
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void crawl(File root) throws InterruptedException {
File[] entries = root.listFiles(fileFilter);
entries.each { entry ->
if (entry.isDirectory()) crawl(entry);
else if (!alreadyIndexed(entry)) {
println "(${Thread.currentThread().getName()})queue...file ${entry}";
fileQueue.put(entry);
}
}
}
}
class Indexer implements Runnable {
private final BlockingQueue<File> queue;
public Indexer(BlockingQueue<File> queue) { this.queue = queue; }
public void run() {
try {
while(true) {
indexFile( queue.take() );
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void indexFile(File file) {
println "(${Thread.currentThread().getName()})Index...file ${file}, size=${file.size()}";
}
}
final int BOUND = 10;
final int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
BlockingQueue<File> queue = new LinkedBlockingQueue<File>(BOUND);
FileFilter filter = new FileFilter() {
public boolean accept(File file) { return true; }
};
new File("/Users/yangwang/Prog/Groovy/e2esuite").eachFile { root ->
new Thread(new FileCrawler(queue, filter, root)).start();
}
(1..N_CONSUMERS).each {
new Thread(new Indexer(queue)).start();
}
// NOTE: need to figure out how to inform indexers to end
import java.util.concurrent.*;
class FileInfo {
String name;
int lines;
}
class LineCounter implements Callable<FileInfo> {
private final File file;
LineCounter(File file) { this.file = file; }
public FileInfo call() {
//Thread.sleep(200L);
int count = 0;
file.eachLine { count++; }
return new FileInfo(name: Thread.currentThread().getName() + ' ' + file.getName(),
lines: count);
}
}
ExecutorService executor = Executors.newFixedThreadPool(3);
CompletionService<FileInfo> completionService = new ExecutorCompletionService<FileInfo>(executor);
def files = []
new File("/Users/yangwang/Downloads/net/jcip/examples").eachFile {
if (!it.isDirectory()) files << it;
}
// Sub the tasks to ExecutorCompletionService for execution
files.each { file ->
completionService.submit( new LineCounter( file ) );
}
/*
while (true) {
Future<FileInfo> f = completionService.poll();
if (f) {
FileInfo fileInfo = f.get();
println "${fileInfo.name} lines=${fileInfo.lines}"
} else break;
}
*/
try {
for (int i = 0; i < files.size(); i++) {
Future<FileInfo> f = completionService.take();
FileInfo fileInfo = f.get();
println "${fileInfo.name} lines=${fileInfo.lines}"
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
println e;
}
/**
Using CyclicBarrier to calculate a matrix: total value of all the cells.
*/
import java.util.*;
import java.util.concurrent.*;
class CalculateMatrix {
private final List<List<Integer>> matrix;
private final CyclicBarrier barrier;
private final Worker[] workers;
private volatile List<Integer> values;
CalculateMatrix(theMatrix) {
this.matrix = theMatrix;
int count = this.matrix.size(); // number of rows
this.values = new Integer[ count ] as List<Integer>;
this.barrier = new CyclicBarrier( count,
new Runnable() {
public void run() {
def total = 0;
values.each { total += it; }
println "Total value: $total"
}});
this.workers = new Worker[count];
(0..count-1).each { workers[it] = new Worker(it, values, matrix[it]); }
}
private class Worker implements Runnable {
private final List<Integer> list;
private final int idx;
private final List<Integer> values;
Worker(int idx, List<Integer> values, List<Integer> list) {
this.list = list;
this.idx = idx;
this.values = values;
}
public void run() {
def total = 0;
list.each { total += it; }
values[idx] = total;
println "Done with row $idx, total=$total";
try { barrier.await(); // Wait untill all parties called this barrier.
println "Thread ${Thread.currentThread().getName()} finished."; }
catch (InterruptedException ex) { return; }
catch (BrokenBarrierException ex) { return; }
}
}
public void start() {
workers.each {
new Thread( it ).start();
}
}
}
List<List<Integer>> matrix = [
[1,2,3,4,5,6,7,8,9,10],
[11,12,13,14,15,16,17],
[18,19,20,21,22,23,24],
[25,26,27,28,29,30,31]
];
def calc = new CalculateMatrix(matrix);
calc.start();
import groovyx.gpars.actor.DynamicDispatchActor
final class MyActor extends DynamicDispatchActor {
def MyActor() {}
def counter = 0
def processString(String str) {
counter ++
def sb = new StringBuilder(8*1024)
def head= ~/^\d{4}-\d{2}-/
def pat = ~/Executed in (\d+) msec/
def callId= ~/(MS|WS)-\d+-\d+/
if (head.matcher(str)) {
sb.append(str[0..23] + " ")
if (str.contains("jdbc.sqltiming")) sb.append(" SQL ")
def m = callId.matcher(str)
if (m) sb.append(m[0][1])
return sb.toString()
}
return null
}
void onMessage(String message) {
processString(message)
}
void onMessage(boolean message) {
println 'Done. Bye.'
stop()
}
}
def startTime = new Date()
final def actor1 = new MyActor().start()
final def actor2 = new MyActor().start()
println "Total Heap size: " + Runtime.getRuntime().totalMemory()
println "Total Free size: " + Runtime.getRuntime().freeMemory()
def f = new File("/Users/yangwang/Downloads/temp/core-dsuat_v1_nysddss15-2011_10_31_15_43_02.log.1")
f.each { line ->
actor1 line
actor2 line
}
actor1 false
actor2 false
[actor1, actor2]*.join()
println "Total Heap size: " + Runtime.getRuntime().totalMemory()
println "Total Free size: " + Runtime.getRuntime().freeMemory()
def endTime = new Date()
println "Elapsed time: " + (endTime.time - startTime.time)
println "Total lines (actor 1): " + actor1.counter
println "Total lines (actor 2): " + actor1.counter
import java.util.concurrent.*;
public interface E2eTask {
public void run();
}
public class LogSyncJob implements Runnable {
private static int runCount = 0;
private CountDownLatch finishCountDown;
private CountDownLatch startCountDown;
private BlockingQueue<String> fileQueue = null;
public LogSyncJob(BlockingQueue<String> queue) {
this.fileQueue = queue;
this.startCountDown = startCountDown;
this.finishCountDown = finishCountDown;
}
public void run() {
runCount++;
println "\t----->LogSyncJob started: " + new Date();
fileQueue.put("File_" + runCount);
}
}
public class LogDispatcher implements Runnable {
private static int runCount = 0;
private CountDownLatch finishCountDown;
private CountDownLatch startCountDown;
private final BlockingQueue<String> fileQueue;
private final BlockingQueue<String>[] processorQueues;
public LogDispatcher(BlockingQueue<String> queue, List<BlockingQueue<String>> processorQueues,
CountDownLatch startCountDown) {
this.fileQueue = queue;
this.processorQueues = processorQueues;
this.startCountDown = startCountDown;
}
public void run() {
startCountDown.await();
try {
while (true) {
println ">>>>LogDispatcher is waiting for file...";
String file = fileQueue.take();
println ">>>>LogDispatcher got file: " + file;
runCount++;
processorQueues[runCount % 2].put(file);
}
} catch (InterruptedException ignored) {}
}
}
class Worker implements Runnable {
private CountDownLatch startCountDown;
private final BlockingQueue<String> fileQueue;
private E2eTask task;
public Worker(E2eTask task, BlockingQueue<String> queue, CountDownLatch startCountDown) {
this.task = task;
this.fileQueue = queue;
this.startCountDown = startCountDown;
}
public void run() {
this.startCountDown.await();
try {
while (true) {
//task.run();
println Thread.currentThread().getName() + "Worker is waiting for file...";
String file = fileQueue.take();
println Thread.currentThread().getName() + "Worker got file: " + file + " at " + new Date();
}
} catch (InterruptedException ignored) {}
}
}
public class E2eSuiteExecutor {
private final CountDownLatch startCountDown;
private ExecutorService executor, dispatcher;
private ScheduledExecutorService scheduler;
private BlockingQueue<String> fileQueue;
private List<BlockingQueue<String>> dispatchQueues = [];
public E2eSuiteExecutor() {
startCountDown = new CountDownLatch(1);
}
public void serve() {
println "\n\nDemonstrating usage of Executor and ScheduledThreadPool...";
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
println "Application is being shutdown...";
E2eSuiteExecutor.this.stop();
} catch (InterruptedException ignored) {}
}
});
fileQueue = new ArrayBlockingQueue<String>(10);
dispatchQueues << new ArrayBlockingQueue<String>(10);
dispatchQueues << new ArrayBlockingQueue<String>(10);
LogSyncJob logSyncJob = new LogSyncJob(fileQueue);
LogDispatcher logDispatcher = new LogDispatcher(fileQueue, dispatchQueues,
startCountDown);
Worker ratings = new Worker( null, dispatchQueues[0], startCountDown);
Worker cmp = new Worker( null, dispatchQueues[1], startCountDown);
executor = Executors.newCachedThreadPool();
dispatcher = Executors.newFixedThreadPool(1);
scheduler = Executors.newScheduledThreadPool(1);
ScheduledFuture logSyncLogS = scheduler.scheduleAtFixedRate(logSyncJob, 5, 2, TimeUnit.SECONDS);
Future logDispatcherF = dispatcher.submit( logDispatcher );
Future ratingsF = executor.submit( ratings );
Future cmpF = executor.submit( cmp );
println "Now, fire them up...";
startCountDown.countDown();
executor.awaitTermination(12000, TimeUnit.MILLISECONDS);
logSyncLogS.cancel(true);
logDispatcherF.cancel(true);
ratingsF.cancel(true);
cmpF.cancel(true);
this.stop();
println "Done. ExecutorService isShutdown=" + executor.isShutdown() + " isTerminated=" + executor.isTerminated() + "\n";
}
public void stop() {
executor.shutdown();
dispatcher.shutdown();
scheduler.shutdown();
}
}
def e2e = new E2eSuiteExecutor();
e2e.serve();
import java.util.concurrent.*;
class FileList implements Callable<List<String>> {
private final String dir;
FileList(String dir) { this.dir = dir; }
List<String> call() {
List<String> list = [];
new File(dir).eachFile { file -> list << file.getName(); }
return list;
}
}
ExecutorService executor = Executors.newCachedThreadPool();
Future<List<String>> flist1 = executor.submit( new FileList( "/Users/yangwang/Downloads/prod" ) );
Future<List<String>> flist2 = executor.submit( new FileList( "/Users/yangwang/Downloads/net/jcip/examples" ) );
try {
println "Files under /Users/yangwang/Downloads/prod:";
flist1.get().each { println it }
println "Files under /Users/yangwang/Downloads/net/jcip/examples:";
flist2.get().each { println it }
} catch (InterruptedException e) {
// Re-assert the thread's interrupted status
Thread.currentThread().interrupt();
// We don't need the result, so cancel the task too
flist1.cancel(true);
flist1.cancel(true);
} catch (ExecutionException e) {
println e;
}
import groovyx.gpars.dataflow.DataflowQueue
import static groovyx.gpars.dataflow.Dataflow.task
import groovyx.gpars.group.DefaultPGroup
class Message {
def message // End Of Message if message = null
}
abstract class TaskTemplate {
final DataflowQueue inQ // Incoming Queue
DataflowQueue outQ // Outgoing Queue
TaskTemplate() {
inQ = new DataflowQueue()
outQ = null
}
abstract void doWork(Message message)
def run() {
while (true) {
def message = inQ.val
doWork(message)
if (message.message == null) {
println ">>>>>>>>${getClass().name}: Bye!"
break
}
}
return true
}
void sendMessage(msg) { inQ << msg }
DataflowQueue getIncomingQueue() { return inQ }
void setOutgoingQueue(DataflowQueue q) { outQ = q }
boolean isTherePendingWork() { return inQ.length() > 0 }
}
final class ParserTask extends TaskTemplate {
void doWork(Message message) {
if (message.message) {
println "\tParser: ${message.message}"
outQ << new Message(message : "From Parser: " + message.message)
outQ << new Message(message : "And this is from me, the Parser")
(0..3).each {
Thread.sleep(10L)
outQ << new Message(message : "I'm " + "still" * it + " parsing")
}
} else {
outQ << message
}
}
}
final class ProcessorTask extends TaskTemplate {
void doWork(Message message) {
println "\t\tProcessor: ${message.message}"
}
}
// Create tasks
def parser = new ParserTask()
def processor = new ProcessorTask()
// Link them toggether
parser.setOutgoingQueue( processor.getIncomingQueue() )
// Create task group
def group = new DefaultPGroup()
group.with {
def parserTask = task {
parser.run()
}
def processorTask = task {
processor.run()
}
def main = task {
(1..100000).each { n ->
println "Yeah, I'm coming..."
parser.sendMessage( new Message( message : "From Controller: Hey, my number = " + n ) )
//Thread.sleep(300L)
}
parser.sendMessage( new Message( message : null ) )
// Let's wait for these guys to finish their work
while (parser.isTherePendingWork() || processor.isTherePendingWork()) {
Thread.sleep(100L)
}
}.join()
}
import groovyx.gpars.dataflow.DataflowQueue
import static groovyx.gpars.dataflow.Dataflow.task
import groovyx.gpars.group.DefaultPGroup
class Message {
def message // End Of Message if message = null
}
class MessageQueue {
DataflowQueue inQ = null // Incoming Queue
DataflowQueue outQ= null // Outgoing Queue
}
abstract class TaskTemplate {
protected static final ThreadLocal queue = new ThreadLocal() {
protected Object initialValue() {
println "${Thread.currentThread().getName()}-created one queue"
return new MessageQueue()
}
}
TaskTemplate() {
}
abstract void doWork(Message message)
def run() {
while (true) {
def message = queue.get().inQ.val
doWork(message)
if (message.message == null) {
println "${Thread.currentThread().getName()}->>>>>>>>${getClass().name}: Bye!"
break
}
}
return true
}
void sendMessage(msg) { queue.get().inQ << msg }
DataflowQueue getIncomingQueue() { return queue.get().inQ }
void setOutgoingQueue(DataflowQueue q) { queue.get().outQ = q }
void setIncomingQueue(DataflowQueue q) { queue.get().inQ = q }
boolean isTherePendingWork() { return queue.get().inQ.length() > 0 }
}
final class ParserTask extends TaskTemplate {
void doWork(Message message) {
if (message.message) {
println "\t${Thread.currentThread().getName()}-Parser: ${message.message}"
queue.get().outQ << new Message(message : "From Parser: " + message.message)
queue.get().outQ << new Message(message : "And this is from me, the Parser")
(1..2).each {
//Thread.sleep(100L)
queue.get().outQ << new Message(message : "I'm " + "still" * it + " parsing")
}
} else {
queue.get().outQ << message
}
}
}
final class ProcessorTask extends TaskTemplate {
void doWork(Message message) {
println "\t\t${Thread.currentThread().getName()}-Processor: ${message.message}"
}
}
// Queues
def parserInQ = new DataflowQueue()
def procInQ = new DataflowQueue()
// Create task group
def group = new DefaultPGroup()
group.with {
def parserTask = task {
def parser = new ParserTask()
parser.setIncomingQueue(parserInQ)
parser.setOutgoingQueue(procInQ)
parser.run()
}
def processorTask = task {
def processor = new ProcessorTask()
processor.setIncomingQueue(procInQ)
processor.run()
}
def main = task {
//Thread.sleep(300L)
(1..3).each { n ->
println "${Thread.currentThread().getName()}-Yeah, I'm coming..."
parserInQ << new Message( message : "From Controller: Hey, my number = " + n )
//Thread.sleep(300L)
}
parserInQ << new Message( message : null )
// Let's wait for these guys to finish their work
//Thread.sleep(1000L)
}.join()
}
import java.util.concurrent.*;
class TestLatch {
public long timeTasks(int nThreads, final Runnable task) {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);
for (int i=0; i<nThreads; i++) {
Thread t = new Thread() {
public void run() {
try {
startGate.await();
try {
task.run();
} finally {
endGate.countDown();
println Thread.currentThread().getName() + " finished"
}
} catch (InterruptedException ignored) {
}
}
};
t.start();
}
long start = System.nanoTime();
startGate.countDown(); // give a signal to start the threads
endGate.await(); // Wait for all threads to complete
return System.nanoTime() - start
}
}
class Task implements Runnable {
void run() {
println Thread.currentThread().getName()
}
}
def task = new Task()
def test = new TestLatch()
println test.timeTasks( 5, task )
// Producer / Consumer based on Blocking Queue
//
import java.util.concurrent.*;
import java.io.FileFilter;
public class LogAnalysis implements Runnable {
private Thread currentThread;
private final BlockingQueue<String> message;
public LogAnalysis(BlockingQueue<String> message)
{
this.message = message
}
public void run() {
String msg;
while ( (msg = message.take()) != "exit" )
println "Received message: $msg";
}
public Thread start() {
currentThread = new Thread(this);
currentThread.start();
return currentThread;
}
}
BlockingQueue<String> qa = new LinkedBlockingQueue<String>(10);
BlockingQueue<String> qb = new LinkedBlockingQueue<String>(10);
def loga = new LogAnalysis(qa).start();
def logb = new LogAnalysis(qb).start();
println loga.getState();
println logb.getState();
qa.put("Hello, from Yang Wang");
qb.put("Hello, from Jack Wang");
println loga.getState();
println logb.getState();
qa.put("Hello, from Rush Limbaugh");
qb.put("Hello, from Rick Santorum");
//Thread.currentThread().sleep(500);
println loga.getState();
println logb.getState();
qa.put("exit");
qb.put("exit");
Thread.currentThread().sleep(1000);
println loga.getState();
println logb.getState();
/**
A counting semaphore. Conceptually, a semaphore maintains a set of permits.
Each acquire() blocks if necessary until a permit is available, and then takes it.
Each release() adds a permit, potentially releasing a blocking acquirer. However,
no actual permit objects are used; the Semaphore just keeps a count of the number
available and acts accordingly
*/
import java.util.*;
import java.util.concurrent.*;
/**
* BoundedHashSet
* <p/>
* Using Semaphore to bound a collection
*
* @author Brian Goetz and Tim Peierls
*/
public class BoundedHashSet <T> {
private final Set<T> set;
private final Semaphore sem;
public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(bound) //, true); // Fairness set to true
}
public boolean add(T o) throws InterruptedException {
println "Before add - Avaliable permits: ${sem.availablePermits()}"
sem.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
} finally {
if (!wasAdded)
sem.release();
}
}
public boolean remove(Object o) {
boolean wasRemoved = set.remove(o);
if (wasRemoved)
sem.release();
println "After remove - Avaliable permits: ${sem.availablePermits()}"
return wasRemoved;
}
}
class AddElement implements Runnable {
BoundedHashSet<String> bhs = null
List<String> list = null
AddElement(BoundedHashSet<String> set, List<String> list) {
this.bhs = set;
this.list = list;
}
void run() {
list.each {
bhs?.add(it);
println "${Thread.currentThread().getName()}-${System.currentTimeMillis()}-Added: $it";
}
}
}
class RemoveElement implements Runnable {
BoundedHashSet<String> bhs = null
List<String> list = null
RemoveElement(BoundedHashSet<String> set, List<String> list) {
this.bhs = set;
this.list = list;
}
void run() {
list.each {
Thread.sleep(500L); // Give sometime for AddEle to add and be blocked
bhs.remove(it);
println "${Thread.currentThread().getName()}-${System.currentTimeMillis()}-Removed: $it";
}
}
}
List<String> list = ['Jack', 'Yang', 'Jane', 'Wang', 'Daisy', 'Sunflow', 'Butterfly']
def set = new BoundedHashSet<String>(3);
def th1 = new Thread(new AddElement(set, list));
def th2 = new Thread(new RemoveElement(set, list));
[th1, th2]*.start()
[th1, th2]*.join()
class MyInfo {
static int count = 1
String name = "Yes"
Date date = new Date() + count++
}
class Task implements Runnable {
private static final ThreadLocal myInfo = new ThreadLocal() {
protected Object initialValue() {
println "created one queue"
return new MyInfo(name : "Goose")
}
}
Task(String nme) {
myInfo.set(new MyInfo(name : nme))
}
void setName(String name) {
myInfo.get().name = name
}
void run() {
setName("Random No " + (Math.random()))
(0..2).each {
println "${Thread.currentThread().getName()} - My name is '${myInfo.get().name}', date=${myInfo.get().date}"
Thread.sleep(500L)
}
}
}
Task task1 = new Task("Yang")
Task task2 = new Task("Yang")
Task task3 = new Task("Yang")
//task1.setName("Yang") // This won't work - it actually set's the main thread's ThreadLocal variable
//task2.setName("Jack") // This won't work - it actually set's the main thread's ThreadLocal variable
//task3.setName("Jane") // This won't work - it actually set's the main thread's ThreadLocal variable
Thread th1 = new Thread(task1)
Thread th2 = new Thread(task2)
Thread th3 = new Thread(task3)
[th1, th2, th3]*.start()
[th1, th2, th3]*.join()
@wy8162
Copy link
Author

wy8162 commented Jul 17, 2015

These are Groovy files testing Java Concurrency. All are executable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment