Skip to content

Instantly share code, notes, and snippets.

@koduki
Created March 15, 2016 19:09
Show Gist options
  • Save koduki/5ef85fceb170d3da41f7 to your computer and use it in GitHub Desktop.
Save koduki/5ef85fceb170d3da41f7 to your computer and use it in GitHub Desktop.
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package cn.orz.pascal.example.csv;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.SynchronousQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
*
* @author koduki
*/
public class Test1 {
static int dataSize = 100_000;
public static String producer() {
sleep(10_000);
return "success";
}
public static void consumer(String msg) {
sleep(150_000);
sleep(140_000);
}
public static void main(String[] args) {
sequential();
asyncQ();
parallel(6, 6);
parallel(8, 8);
parallel(16, 16);
parallel2(8, 8);
}
public static void parallel2(int jobCount, int threadCount) {
try {
long s = System.nanoTime();
ExecutorService es = Executors.newFixedThreadPool(threadCount);
List<ParallelTask2> tasks = new ArrayList<>();
for (int i = 0; i < jobCount; i++) {
tasks.add(new ParallelTask2(dataSize / jobCount));
}
List<Future<Void>> futures = es.invokeAll(tasks);
for (Future<Void> future : futures) {
future.get();
}
es.shutdown();
long e = System.nanoTime();
System.out.println("finish parallel(job=" + jobCount + ", thread=" + threadCount + "):\t" + (e - s) / 1000 / 1000 + "\tms");
} catch (InterruptedException ex) {
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex);
} catch (ExecutionException ex) {
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex);
}
}
public static void parallel(int jobCount, int threadCount) {
try {
ExecutorService es = Executors.newFixedThreadPool(threadCount);
List<ParallelTask> tasks = new ArrayList<>();
for (int i = 0; i < jobCount; i++) {
tasks.add(new ParallelTask(dataSize / jobCount));
}
long s = System.nanoTime();
List<Future<Void>> futures = es.invokeAll(tasks);
for (Future<Void> future : futures) {
future.get();
}
es.shutdown();
long e = System.nanoTime();
System.out.println("finish parallel(job=" + jobCount + ", thread=" + threadCount + "):\t" + (e - s) / 1000 / 1000 + "\tms");
} catch (InterruptedException ex) {
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex);
} catch (ExecutionException ex) {
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex);
}
}
public static void asyncQ() {
try {
long s = System.nanoTime();
BlockingQueue<String> drop = new SynchronousQueue<>();
Thread t1 = new Thread(new Producer(drop));
Thread t2 = new Thread(new Consumer(drop));
t1.start();
t2.start();
t2.join();
long e = System.nanoTime();
System.out.println("finish asyncQ:\t" + (e - s) / 1000 / 1000 + "\tms");
} catch (InterruptedException ex) {
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex);
}
}
public static void sequential() {
long s = System.nanoTime();
for (int i = 0; i < dataSize; i++) {
String msg = producer();
consumer(msg);
}
long e = System.nanoTime();
System.out.println("finish sequential:\t" + (e - s) / 1000 / 1000 + "\tms");
}
public static void sleep(final long interval) {
long start = System.nanoTime();
long end = 0;
do {
end = System.nanoTime();
} while (start + interval >= end);
}
static class Producer implements Runnable {
private BlockingQueue<String> drop;
public Producer(BlockingQueue<String> d) {
this.drop = d;
}
public void run() {
try {
for (int i = 0; i < dataSize; i++) {
String msg = producer();
drop.put(msg);
}
drop.put("DONE");
} catch (InterruptedException ex) {
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
static class Consumer implements Runnable {
private BlockingQueue<String> drop;
public Consumer(BlockingQueue<String> d) {
this.drop = d;
}
public void run() {
String msg = null;
try {
while (!((msg = drop.take()).equals("DONE")));
} catch (InterruptedException ex) {
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
public static class ParallelTask implements Callable<Void> {
private int chunkSize = 0;
public ParallelTask(int chunkSize) {
this.chunkSize = chunkSize;
}
@Override
public Void call() throws Exception {
for (int i = 0; i < chunkSize; i++) {
String msg = producer();
consumer(msg);
}
return null;
}
}
public static class ParallelTask2 implements Callable<Void> {
private int chunkSize = 0;
public ParallelTask2(int chunkSize) {
this.chunkSize = chunkSize;
}
@Override
public Void call() throws Exception {
BlockingQueue<String> drop = new SynchronousQueue<>();
// Producer2 producer = new Producer2(drop, chunkSize);
// Consumer2 consumer = new Consumer2(drop);
//
// producer.fork();
// consumer.fork();
// consumer.join();
Thread t1 = new Thread(new Producer(drop));
Thread t2 = new Thread(new Consumer(drop));
t1.start();
t2.start();
t2.join();
return null;
}
}
public static class Consumer2 extends RecursiveTask<Void> {
private BlockingQueue<String> drop;
public Consumer2(BlockingQueue<String> d) {
this.drop = d;
}
@Override
protected Void compute() {
String msg = null;
try {
while (!((msg = drop.take()).equals("DONE")));
} catch (InterruptedException ex) {
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex);
}
return null;
}
}
public static class Producer2 extends RecursiveTask<Void> {
private BlockingQueue<String> drop;
private int chunkSize;
public Producer2(BlockingQueue<String> d, int chunkSize) {
this.drop = d;
this.chunkSize = chunkSize;
}
@Override
protected Void compute() {
try {
for (int i = 0; i < chunkSize; i++) {
String msg = producer();
drop.put(msg);
}
drop.put("DONE");
} catch (InterruptedException ex) {
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex);
}
return null;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment