Last active
October 6, 2017 12:47
-
-
Save wy8162/e0419aa3bb9838546286 to your computer and use it in GitHub Desktop.
Java Concurrency
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Producer / Consumer based on Blocking Queue | |
// | |
import java.util.concurrent.*; | |
import java.io.FileFilter; | |
public class FileCrawler implements Runnable { | |
private final BlockingQueue<File> fileQueue; | |
private final FileFilter fileFilter; | |
private final File root; | |
public FileCrawler(BlockingQueue<File> fileQueue, final FileFilter fileFilter, root) | |
{ | |
this.fileQueue = fileQueue; | |
this.root = root; | |
this.fileFilter = new FileFilter() { | |
public boolean accept(File f) { | |
return f.isDirectory() || fileFilter.accept(f); | |
} | |
}; | |
} | |
private boolean alreadyIndexed(File f) { return false; } | |
public void run() { | |
try { crawl(root); } | |
catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
} | |
} | |
private void crawl(File root) throws InterruptedException { | |
File[] entries = root.listFiles(fileFilter); | |
entries.each { entry -> | |
if (entry.isDirectory()) crawl(entry); | |
else if (!alreadyIndexed(entry)) { | |
println "(${Thread.currentThread().getName()})queue...file ${entry}"; | |
fileQueue.put(entry); | |
} | |
} | |
} | |
} | |
class Indexer implements Runnable { | |
private final BlockingQueue<File> queue; | |
public Indexer(BlockingQueue<File> queue) { this.queue = queue; } | |
public void run() { | |
try { | |
while(true) { | |
indexFile( queue.take() ); | |
} | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
} | |
} | |
public void indexFile(File file) { | |
println "(${Thread.currentThread().getName()})Index...file ${file}, size=${file.size()}"; | |
} | |
} | |
final int BOUND = 10; | |
final int N_CONSUMERS = Runtime.getRuntime().availableProcessors(); | |
BlockingQueue<File> queue = new LinkedBlockingQueue<File>(BOUND); | |
FileFilter filter = new FileFilter() { | |
public boolean accept(File file) { return true; } | |
}; | |
new File("/Users/yangwang/Prog/Groovy/e2esuite").eachFile { root -> | |
new Thread(new FileCrawler(queue, filter, root)).start(); | |
} | |
(1..N_CONSUMERS).each { | |
new Thread(new Indexer(queue)).start(); | |
} | |
// NOTE: need to figure out how to inform indexers to end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.*; | |
class FileInfo { | |
String name; | |
int lines; | |
} | |
class LineCounter implements Callable<FileInfo> { | |
private final File file; | |
LineCounter(File file) { this.file = file; } | |
public FileInfo call() { | |
//Thread.sleep(200L); | |
int count = 0; | |
file.eachLine { count++; } | |
return new FileInfo(name: Thread.currentThread().getName() + ' ' + file.getName(), | |
lines: count); | |
} | |
} | |
ExecutorService executor = Executors.newFixedThreadPool(3); | |
CompletionService<FileInfo> completionService = new ExecutorCompletionService<FileInfo>(executor); | |
def files = [] | |
new File("/Users/yangwang/Downloads/net/jcip/examples").eachFile { | |
if (!it.isDirectory()) files << it; | |
} | |
// Sub the tasks to ExecutorCompletionService for execution | |
files.each { file -> | |
completionService.submit( new LineCounter( file ) ); | |
} | |
/* | |
while (true) { | |
Future<FileInfo> f = completionService.poll(); | |
if (f) { | |
FileInfo fileInfo = f.get(); | |
println "${fileInfo.name} lines=${fileInfo.lines}" | |
} else break; | |
} | |
*/ | |
try { | |
for (int i = 0; i < files.size(); i++) { | |
Future<FileInfo> f = completionService.take(); | |
FileInfo fileInfo = f.get(); | |
println "${fileInfo.name} lines=${fileInfo.lines}" | |
} | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
} catch (ExecutionException e) { | |
println e; | |
} | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
Using CyclicBarrier to calculate a matrix: total value of all the cells. | |
*/ | |
import java.util.*; | |
import java.util.concurrent.*; | |
class CalculateMatrix { | |
private final List<List<Integer>> matrix; | |
private final CyclicBarrier barrier; | |
private final Worker[] workers; | |
private volatile List<Integer> values; | |
CalculateMatrix(theMatrix) { | |
this.matrix = theMatrix; | |
int count = this.matrix.size(); // number of rows | |
this.values = new Integer[ count ] as List<Integer>; | |
this.barrier = new CyclicBarrier( count, | |
new Runnable() { | |
public void run() { | |
def total = 0; | |
values.each { total += it; } | |
println "Total value: $total" | |
}}); | |
this.workers = new Worker[count]; | |
(0..count-1).each { workers[it] = new Worker(it, values, matrix[it]); } | |
} | |
private class Worker implements Runnable { | |
private final List<Integer> list; | |
private final int idx; | |
private final List<Integer> values; | |
Worker(int idx, List<Integer> values, List<Integer> list) { | |
this.list = list; | |
this.idx = idx; | |
this.values = values; | |
} | |
public void run() { | |
def total = 0; | |
list.each { total += it; } | |
values[idx] = total; | |
println "Done with row $idx, total=$total"; | |
try { barrier.await(); // Wait untill all parties called this barrier. | |
println "Thread ${Thread.currentThread().getName()} finished."; } | |
catch (InterruptedException ex) { return; } | |
catch (BrokenBarrierException ex) { return; } | |
} | |
} | |
public void start() { | |
workers.each { | |
new Thread( it ).start(); | |
} | |
} | |
} | |
List<List<Integer>> matrix = [ | |
[1,2,3,4,5,6,7,8,9,10], | |
[11,12,13,14,15,16,17], | |
[18,19,20,21,22,23,24], | |
[25,26,27,28,29,30,31] | |
]; | |
def calc = new CalculateMatrix(matrix); | |
calc.start(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import groovyx.gpars.actor.DynamicDispatchActor | |
final class MyActor extends DynamicDispatchActor { | |
def MyActor() {} | |
def counter = 0 | |
def processString(String str) { | |
counter ++ | |
def sb = new StringBuilder(8*1024) | |
def head= ~/^\d{4}-\d{2}-/ | |
def pat = ~/Executed in (\d+) msec/ | |
def callId= ~/(MS|WS)-\d+-\d+/ | |
if (head.matcher(str)) { | |
sb.append(str[0..23] + " ") | |
if (str.contains("jdbc.sqltiming")) sb.append(" SQL ") | |
def m = callId.matcher(str) | |
if (m) sb.append(m[0][1]) | |
return sb.toString() | |
} | |
return null | |
} | |
void onMessage(String message) { | |
processString(message) | |
} | |
void onMessage(boolean message) { | |
println 'Done. Bye.' | |
stop() | |
} | |
} | |
def startTime = new Date() | |
final def actor1 = new MyActor().start() | |
final def actor2 = new MyActor().start() | |
println "Total Heap size: " + Runtime.getRuntime().totalMemory() | |
println "Total Free size: " + Runtime.getRuntime().freeMemory() | |
def f = new File("/Users/yangwang/Downloads/temp/core-dsuat_v1_nysddss15-2011_10_31_15_43_02.log.1") | |
f.each { line -> | |
actor1 line | |
actor2 line | |
} | |
actor1 false | |
actor2 false | |
[actor1, actor2]*.join() | |
println "Total Heap size: " + Runtime.getRuntime().totalMemory() | |
println "Total Free size: " + Runtime.getRuntime().freeMemory() | |
def endTime = new Date() | |
println "Elapsed time: " + (endTime.time - startTime.time) | |
println "Total lines (actor 1): " + actor1.counter | |
println "Total lines (actor 2): " + actor1.counter |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.*; | |
public interface E2eTask { | |
public void run(); | |
} | |
public class LogSyncJob implements Runnable { | |
private static int runCount = 0; | |
private CountDownLatch finishCountDown; | |
private CountDownLatch startCountDown; | |
private BlockingQueue<String> fileQueue = null; | |
public LogSyncJob(BlockingQueue<String> queue) { | |
this.fileQueue = queue; | |
this.startCountDown = startCountDown; | |
this.finishCountDown = finishCountDown; | |
} | |
public void run() { | |
runCount++; | |
println "\t----->LogSyncJob started: " + new Date(); | |
fileQueue.put("File_" + runCount); | |
} | |
} | |
public class LogDispatcher implements Runnable { | |
private static int runCount = 0; | |
private CountDownLatch finishCountDown; | |
private CountDownLatch startCountDown; | |
private final BlockingQueue<String> fileQueue; | |
private final BlockingQueue<String>[] processorQueues; | |
public LogDispatcher(BlockingQueue<String> queue, List<BlockingQueue<String>> processorQueues, | |
CountDownLatch startCountDown) { | |
this.fileQueue = queue; | |
this.processorQueues = processorQueues; | |
this.startCountDown = startCountDown; | |
} | |
public void run() { | |
startCountDown.await(); | |
try { | |
while (true) { | |
println ">>>>LogDispatcher is waiting for file..."; | |
String file = fileQueue.take(); | |
println ">>>>LogDispatcher got file: " + file; | |
runCount++; | |
processorQueues[runCount % 2].put(file); | |
} | |
} catch (InterruptedException ignored) {} | |
} | |
} | |
class Worker implements Runnable { | |
private CountDownLatch startCountDown; | |
private final BlockingQueue<String> fileQueue; | |
private E2eTask task; | |
public Worker(E2eTask task, BlockingQueue<String> queue, CountDownLatch startCountDown) { | |
this.task = task; | |
this.fileQueue = queue; | |
this.startCountDown = startCountDown; | |
} | |
public void run() { | |
this.startCountDown.await(); | |
try { | |
while (true) { | |
//task.run(); | |
println Thread.currentThread().getName() + "Worker is waiting for file..."; | |
String file = fileQueue.take(); | |
println Thread.currentThread().getName() + "Worker got file: " + file + " at " + new Date(); | |
} | |
} catch (InterruptedException ignored) {} | |
} | |
} | |
public class E2eSuiteExecutor { | |
private final CountDownLatch startCountDown; | |
private ExecutorService executor, dispatcher; | |
private ScheduledExecutorService scheduler; | |
private BlockingQueue<String> fileQueue; | |
private List<BlockingQueue<String>> dispatchQueues = []; | |
public E2eSuiteExecutor() { | |
startCountDown = new CountDownLatch(1); | |
} | |
public void serve() { | |
println "\n\nDemonstrating usage of Executor and ScheduledThreadPool..."; | |
Runtime.getRuntime().addShutdownHook(new Thread() { | |
public void run() { | |
try { | |
println "Application is being shutdown..."; | |
E2eSuiteExecutor.this.stop(); | |
} catch (InterruptedException ignored) {} | |
} | |
}); | |
fileQueue = new ArrayBlockingQueue<String>(10); | |
dispatchQueues << new ArrayBlockingQueue<String>(10); | |
dispatchQueues << new ArrayBlockingQueue<String>(10); | |
LogSyncJob logSyncJob = new LogSyncJob(fileQueue); | |
LogDispatcher logDispatcher = new LogDispatcher(fileQueue, dispatchQueues, | |
startCountDown); | |
Worker ratings = new Worker( null, dispatchQueues[0], startCountDown); | |
Worker cmp = new Worker( null, dispatchQueues[1], startCountDown); | |
executor = Executors.newCachedThreadPool(); | |
dispatcher = Executors.newFixedThreadPool(1); | |
scheduler = Executors.newScheduledThreadPool(1); | |
ScheduledFuture logSyncLogS = scheduler.scheduleAtFixedRate(logSyncJob, 5, 2, TimeUnit.SECONDS); | |
Future logDispatcherF = dispatcher.submit( logDispatcher ); | |
Future ratingsF = executor.submit( ratings ); | |
Future cmpF = executor.submit( cmp ); | |
println "Now, fire them up..."; | |
startCountDown.countDown(); | |
executor.awaitTermination(12000, TimeUnit.MILLISECONDS); | |
logSyncLogS.cancel(true); | |
logDispatcherF.cancel(true); | |
ratingsF.cancel(true); | |
cmpF.cancel(true); | |
this.stop(); | |
println "Done. ExecutorService isShutdown=" + executor.isShutdown() + " isTerminated=" + executor.isTerminated() + "\n"; | |
} | |
public void stop() { | |
executor.shutdown(); | |
dispatcher.shutdown(); | |
scheduler.shutdown(); | |
} | |
} | |
def e2e = new E2eSuiteExecutor(); | |
e2e.serve(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.*; | |
class FileList implements Callable<List<String>> { | |
private final String dir; | |
FileList(String dir) { this.dir = dir; } | |
List<String> call() { | |
List<String> list = []; | |
new File(dir).eachFile { file -> list << file.getName(); } | |
return list; | |
} | |
} | |
ExecutorService executor = Executors.newCachedThreadPool(); | |
Future<List<String>> flist1 = executor.submit( new FileList( "/Users/yangwang/Downloads/prod" ) ); | |
Future<List<String>> flist2 = executor.submit( new FileList( "/Users/yangwang/Downloads/net/jcip/examples" ) ); | |
try { | |
println "Files under /Users/yangwang/Downloads/prod:"; | |
flist1.get().each { println it } | |
println "Files under /Users/yangwang/Downloads/net/jcip/examples:"; | |
flist2.get().each { println it } | |
} catch (InterruptedException e) { | |
// Re-assert the thread's interrupted status | |
Thread.currentThread().interrupt(); | |
// We don't need the result, so cancel the task too | |
flist1.cancel(true); | |
flist1.cancel(true); | |
} catch (ExecutionException e) { | |
println e; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import groovyx.gpars.dataflow.DataflowQueue | |
import static groovyx.gpars.dataflow.Dataflow.task | |
import groovyx.gpars.group.DefaultPGroup | |
class Message { | |
def message // End Of Message if message = null | |
} | |
abstract class TaskTemplate { | |
final DataflowQueue inQ // Incoming Queue | |
DataflowQueue outQ // Outgoing Queue | |
TaskTemplate() { | |
inQ = new DataflowQueue() | |
outQ = null | |
} | |
abstract void doWork(Message message) | |
def run() { | |
while (true) { | |
def message = inQ.val | |
doWork(message) | |
if (message.message == null) { | |
println ">>>>>>>>${getClass().name}: Bye!" | |
break | |
} | |
} | |
return true | |
} | |
void sendMessage(msg) { inQ << msg } | |
DataflowQueue getIncomingQueue() { return inQ } | |
void setOutgoingQueue(DataflowQueue q) { outQ = q } | |
boolean isTherePendingWork() { return inQ.length() > 0 } | |
} | |
final class ParserTask extends TaskTemplate { | |
void doWork(Message message) { | |
if (message.message) { | |
println "\tParser: ${message.message}" | |
outQ << new Message(message : "From Parser: " + message.message) | |
outQ << new Message(message : "And this is from me, the Parser") | |
(0..3).each { | |
Thread.sleep(10L) | |
outQ << new Message(message : "I'm " + "still" * it + " parsing") | |
} | |
} else { | |
outQ << message | |
} | |
} | |
} | |
final class ProcessorTask extends TaskTemplate { | |
void doWork(Message message) { | |
println "\t\tProcessor: ${message.message}" | |
} | |
} | |
// Create tasks | |
def parser = new ParserTask() | |
def processor = new ProcessorTask() | |
// Link them toggether | |
parser.setOutgoingQueue( processor.getIncomingQueue() ) | |
// Create task group | |
def group = new DefaultPGroup() | |
group.with { | |
def parserTask = task { | |
parser.run() | |
} | |
def processorTask = task { | |
processor.run() | |
} | |
def main = task { | |
(1..100000).each { n -> | |
println "Yeah, I'm coming..." | |
parser.sendMessage( new Message( message : "From Controller: Hey, my number = " + n ) ) | |
//Thread.sleep(300L) | |
} | |
parser.sendMessage( new Message( message : null ) ) | |
// Let's wait for these guys to finish their work | |
while (parser.isTherePendingWork() || processor.isTherePendingWork()) { | |
Thread.sleep(100L) | |
} | |
}.join() | |
} | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import groovyx.gpars.dataflow.DataflowQueue | |
import static groovyx.gpars.dataflow.Dataflow.task | |
import groovyx.gpars.group.DefaultPGroup | |
class Message { | |
def message // End Of Message if message = null | |
} | |
class MessageQueue { | |
DataflowQueue inQ = null // Incoming Queue | |
DataflowQueue outQ= null // Outgoing Queue | |
} | |
abstract class TaskTemplate { | |
protected static final ThreadLocal queue = new ThreadLocal() { | |
protected Object initialValue() { | |
println "${Thread.currentThread().getName()}-created one queue" | |
return new MessageQueue() | |
} | |
} | |
TaskTemplate() { | |
} | |
abstract void doWork(Message message) | |
def run() { | |
while (true) { | |
def message = queue.get().inQ.val | |
doWork(message) | |
if (message.message == null) { | |
println "${Thread.currentThread().getName()}->>>>>>>>${getClass().name}: Bye!" | |
break | |
} | |
} | |
return true | |
} | |
void sendMessage(msg) { queue.get().inQ << msg } | |
DataflowQueue getIncomingQueue() { return queue.get().inQ } | |
void setOutgoingQueue(DataflowQueue q) { queue.get().outQ = q } | |
void setIncomingQueue(DataflowQueue q) { queue.get().inQ = q } | |
boolean isTherePendingWork() { return queue.get().inQ.length() > 0 } | |
} | |
final class ParserTask extends TaskTemplate { | |
void doWork(Message message) { | |
if (message.message) { | |
println "\t${Thread.currentThread().getName()}-Parser: ${message.message}" | |
queue.get().outQ << new Message(message : "From Parser: " + message.message) | |
queue.get().outQ << new Message(message : "And this is from me, the Parser") | |
(1..2).each { | |
//Thread.sleep(100L) | |
queue.get().outQ << new Message(message : "I'm " + "still" * it + " parsing") | |
} | |
} else { | |
queue.get().outQ << message | |
} | |
} | |
} | |
final class ProcessorTask extends TaskTemplate { | |
void doWork(Message message) { | |
println "\t\t${Thread.currentThread().getName()}-Processor: ${message.message}" | |
} | |
} | |
// Queues | |
def parserInQ = new DataflowQueue() | |
def procInQ = new DataflowQueue() | |
// Create task group | |
def group = new DefaultPGroup() | |
group.with { | |
def parserTask = task { | |
def parser = new ParserTask() | |
parser.setIncomingQueue(parserInQ) | |
parser.setOutgoingQueue(procInQ) | |
parser.run() | |
} | |
def processorTask = task { | |
def processor = new ProcessorTask() | |
processor.setIncomingQueue(procInQ) | |
processor.run() | |
} | |
def main = task { | |
//Thread.sleep(300L) | |
(1..3).each { n -> | |
println "${Thread.currentThread().getName()}-Yeah, I'm coming..." | |
parserInQ << new Message( message : "From Controller: Hey, my number = " + n ) | |
//Thread.sleep(300L) | |
} | |
parserInQ << new Message( message : null ) | |
// Let's wait for these guys to finish their work | |
//Thread.sleep(1000L) | |
}.join() | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.*; | |
class TestLatch { | |
public long timeTasks(int nThreads, final Runnable task) { | |
final CountDownLatch startGate = new CountDownLatch(1); | |
final CountDownLatch endGate = new CountDownLatch(nThreads); | |
for (int i=0; i<nThreads; i++) { | |
Thread t = new Thread() { | |
public void run() { | |
try { | |
startGate.await(); | |
try { | |
task.run(); | |
} finally { | |
endGate.countDown(); | |
println Thread.currentThread().getName() + " finished" | |
} | |
} catch (InterruptedException ignored) { | |
} | |
} | |
}; | |
t.start(); | |
} | |
long start = System.nanoTime(); | |
startGate.countDown(); // give a signal to start the threads | |
endGate.await(); // Wait for all threads to complete | |
return System.nanoTime() - start | |
} | |
} | |
class Task implements Runnable { | |
void run() { | |
println Thread.currentThread().getName() | |
} | |
} | |
def task = new Task() | |
def test = new TestLatch() | |
println test.timeTasks( 5, task ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Producer / Consumer based on Blocking Queue | |
// | |
import java.util.concurrent.*; | |
import java.io.FileFilter; | |
public class LogAnalysis implements Runnable { | |
private Thread currentThread; | |
private final BlockingQueue<String> message; | |
public LogAnalysis(BlockingQueue<String> message) | |
{ | |
this.message = message | |
} | |
public void run() { | |
String msg; | |
while ( (msg = message.take()) != "exit" ) | |
println "Received message: $msg"; | |
} | |
public Thread start() { | |
currentThread = new Thread(this); | |
currentThread.start(); | |
return currentThread; | |
} | |
} | |
BlockingQueue<String> qa = new LinkedBlockingQueue<String>(10); | |
BlockingQueue<String> qb = new LinkedBlockingQueue<String>(10); | |
def loga = new LogAnalysis(qa).start(); | |
def logb = new LogAnalysis(qb).start(); | |
println loga.getState(); | |
println logb.getState(); | |
qa.put("Hello, from Yang Wang"); | |
qb.put("Hello, from Jack Wang"); | |
println loga.getState(); | |
println logb.getState(); | |
qa.put("Hello, from Rush Limbaugh"); | |
qb.put("Hello, from Rick Santorum"); | |
//Thread.currentThread().sleep(500); | |
println loga.getState(); | |
println logb.getState(); | |
qa.put("exit"); | |
qb.put("exit"); | |
Thread.currentThread().sleep(1000); | |
println loga.getState(); | |
println logb.getState(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
A counting semaphore. Conceptually, a semaphore maintains a set of permits. | |
Each acquire() blocks if necessary until a permit is available, and then takes it. | |
Each release() adds a permit, potentially releasing a blocking acquirer. However, | |
no actual permit objects are used; the Semaphore just keeps a count of the number | |
available and acts accordingly | |
*/ | |
import java.util.*; | |
import java.util.concurrent.*; | |
/** | |
* BoundedHashSet | |
* <p/> | |
* Using Semaphore to bound a collection | |
* | |
* @author Brian Goetz and Tim Peierls | |
*/ | |
public class BoundedHashSet <T> { | |
private final Set<T> set; | |
private final Semaphore sem; | |
public BoundedHashSet(int bound) { | |
this.set = Collections.synchronizedSet(new HashSet<T>()); | |
sem = new Semaphore(bound) //, true); // Fairness set to true | |
} | |
public boolean add(T o) throws InterruptedException { | |
println "Before add - Avaliable permits: ${sem.availablePermits()}" | |
sem.acquire(); | |
boolean wasAdded = false; | |
try { | |
wasAdded = set.add(o); | |
return wasAdded; | |
} finally { | |
if (!wasAdded) | |
sem.release(); | |
} | |
} | |
public boolean remove(Object o) { | |
boolean wasRemoved = set.remove(o); | |
if (wasRemoved) | |
sem.release(); | |
println "After remove - Avaliable permits: ${sem.availablePermits()}" | |
return wasRemoved; | |
} | |
} | |
class AddElement implements Runnable { | |
BoundedHashSet<String> bhs = null | |
List<String> list = null | |
AddElement(BoundedHashSet<String> set, List<String> list) { | |
this.bhs = set; | |
this.list = list; | |
} | |
void run() { | |
list.each { | |
bhs?.add(it); | |
println "${Thread.currentThread().getName()}-${System.currentTimeMillis()}-Added: $it"; | |
} | |
} | |
} | |
class RemoveElement implements Runnable { | |
BoundedHashSet<String> bhs = null | |
List<String> list = null | |
RemoveElement(BoundedHashSet<String> set, List<String> list) { | |
this.bhs = set; | |
this.list = list; | |
} | |
void run() { | |
list.each { | |
Thread.sleep(500L); // Give sometime for AddEle to add and be blocked | |
bhs.remove(it); | |
println "${Thread.currentThread().getName()}-${System.currentTimeMillis()}-Removed: $it"; | |
} | |
} | |
} | |
List<String> list = ['Jack', 'Yang', 'Jane', 'Wang', 'Daisy', 'Sunflow', 'Butterfly'] | |
def set = new BoundedHashSet<String>(3); | |
def th1 = new Thread(new AddElement(set, list)); | |
def th2 = new Thread(new RemoveElement(set, list)); | |
[th1, th2]*.start() | |
[th1, th2]*.join() | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class MyInfo { | |
static int count = 1 | |
String name = "Yes" | |
Date date = new Date() + count++ | |
} | |
class Task implements Runnable { | |
private static final ThreadLocal myInfo = new ThreadLocal() { | |
protected Object initialValue() { | |
println "created one queue" | |
return new MyInfo(name : "Goose") | |
} | |
} | |
Task(String nme) { | |
myInfo.set(new MyInfo(name : nme)) | |
} | |
void setName(String name) { | |
myInfo.get().name = name | |
} | |
void run() { | |
setName("Random No " + (Math.random())) | |
(0..2).each { | |
println "${Thread.currentThread().getName()} - My name is '${myInfo.get().name}', date=${myInfo.get().date}" | |
Thread.sleep(500L) | |
} | |
} | |
} | |
Task task1 = new Task("Yang") | |
Task task2 = new Task("Yang") | |
Task task3 = new Task("Yang") | |
//task1.setName("Yang") // This won't work - it actually set's the main thread's ThreadLocal variable | |
//task2.setName("Jack") // This won't work - it actually set's the main thread's ThreadLocal variable | |
//task3.setName("Jane") // This won't work - it actually set's the main thread's ThreadLocal variable | |
Thread th1 = new Thread(task1) | |
Thread th2 = new Thread(task2) | |
Thread th3 = new Thread(task3) | |
[th1, th2, th3]*.start() | |
[th1, th2, th3]*.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
These are Groovy files testing Java Concurrency. All are executable.