Created
March 15, 2016 19:09
-
-
Save koduki/5ef85fceb170d3da41f7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* To change this license header, choose License Headers in Project Properties. | |
* To change this template file, choose Tools | Templates | |
* and open the template in the editor. | |
*/ | |
package cn.orz.pascal.example.csv; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.Future; | |
import java.util.concurrent.RecursiveTask; | |
import java.util.concurrent.SynchronousQueue; | |
import java.util.logging.Level; | |
import java.util.logging.Logger; | |
/** | |
* | |
* @author koduki | |
*/ | |
public class Test1 { | |
static int dataSize = 100_000; | |
public static String producer() { | |
sleep(10_000); | |
return "success"; | |
} | |
public static void consumer(String msg) { | |
sleep(150_000); | |
sleep(140_000); | |
} | |
public static void main(String[] args) { | |
sequential(); | |
asyncQ(); | |
parallel(6, 6); | |
parallel(8, 8); | |
parallel(16, 16); | |
parallel2(8, 8); | |
} | |
public static void parallel2(int jobCount, int threadCount) { | |
try { | |
long s = System.nanoTime(); | |
ExecutorService es = Executors.newFixedThreadPool(threadCount); | |
List<ParallelTask2> tasks = new ArrayList<>(); | |
for (int i = 0; i < jobCount; i++) { | |
tasks.add(new ParallelTask2(dataSize / jobCount)); | |
} | |
List<Future<Void>> futures = es.invokeAll(tasks); | |
for (Future<Void> future : futures) { | |
future.get(); | |
} | |
es.shutdown(); | |
long e = System.nanoTime(); | |
System.out.println("finish parallel(job=" + jobCount + ", thread=" + threadCount + "):\t" + (e - s) / 1000 / 1000 + "\tms"); | |
} catch (InterruptedException ex) { | |
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex); | |
} catch (ExecutionException ex) { | |
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex); | |
} | |
} | |
public static void parallel(int jobCount, int threadCount) { | |
try { | |
ExecutorService es = Executors.newFixedThreadPool(threadCount); | |
List<ParallelTask> tasks = new ArrayList<>(); | |
for (int i = 0; i < jobCount; i++) { | |
tasks.add(new ParallelTask(dataSize / jobCount)); | |
} | |
long s = System.nanoTime(); | |
List<Future<Void>> futures = es.invokeAll(tasks); | |
for (Future<Void> future : futures) { | |
future.get(); | |
} | |
es.shutdown(); | |
long e = System.nanoTime(); | |
System.out.println("finish parallel(job=" + jobCount + ", thread=" + threadCount + "):\t" + (e - s) / 1000 / 1000 + "\tms"); | |
} catch (InterruptedException ex) { | |
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex); | |
} catch (ExecutionException ex) { | |
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex); | |
} | |
} | |
public static void asyncQ() { | |
try { | |
long s = System.nanoTime(); | |
BlockingQueue<String> drop = new SynchronousQueue<>(); | |
Thread t1 = new Thread(new Producer(drop)); | |
Thread t2 = new Thread(new Consumer(drop)); | |
t1.start(); | |
t2.start(); | |
t2.join(); | |
long e = System.nanoTime(); | |
System.out.println("finish asyncQ:\t" + (e - s) / 1000 / 1000 + "\tms"); | |
} catch (InterruptedException ex) { | |
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex); | |
} | |
} | |
public static void sequential() { | |
long s = System.nanoTime(); | |
for (int i = 0; i < dataSize; i++) { | |
String msg = producer(); | |
consumer(msg); | |
} | |
long e = System.nanoTime(); | |
System.out.println("finish sequential:\t" + (e - s) / 1000 / 1000 + "\tms"); | |
} | |
public static void sleep(final long interval) { | |
long start = System.nanoTime(); | |
long end = 0; | |
do { | |
end = System.nanoTime(); | |
} while (start + interval >= end); | |
} | |
static class Producer implements Runnable { | |
private BlockingQueue<String> drop; | |
public Producer(BlockingQueue<String> d) { | |
this.drop = d; | |
} | |
public void run() { | |
try { | |
for (int i = 0; i < dataSize; i++) { | |
String msg = producer(); | |
drop.put(msg); | |
} | |
drop.put("DONE"); | |
} catch (InterruptedException ex) { | |
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex); | |
} | |
} | |
} | |
static class Consumer implements Runnable { | |
private BlockingQueue<String> drop; | |
public Consumer(BlockingQueue<String> d) { | |
this.drop = d; | |
} | |
public void run() { | |
String msg = null; | |
try { | |
while (!((msg = drop.take()).equals("DONE"))); | |
} catch (InterruptedException ex) { | |
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex); | |
} | |
} | |
} | |
public static class ParallelTask implements Callable<Void> { | |
private int chunkSize = 0; | |
public ParallelTask(int chunkSize) { | |
this.chunkSize = chunkSize; | |
} | |
@Override | |
public Void call() throws Exception { | |
for (int i = 0; i < chunkSize; i++) { | |
String msg = producer(); | |
consumer(msg); | |
} | |
return null; | |
} | |
} | |
public static class ParallelTask2 implements Callable<Void> { | |
private int chunkSize = 0; | |
public ParallelTask2(int chunkSize) { | |
this.chunkSize = chunkSize; | |
} | |
@Override | |
public Void call() throws Exception { | |
BlockingQueue<String> drop = new SynchronousQueue<>(); | |
// Producer2 producer = new Producer2(drop, chunkSize); | |
// Consumer2 consumer = new Consumer2(drop); | |
// | |
// producer.fork(); | |
// consumer.fork(); | |
// consumer.join(); | |
Thread t1 = new Thread(new Producer(drop)); | |
Thread t2 = new Thread(new Consumer(drop)); | |
t1.start(); | |
t2.start(); | |
t2.join(); | |
return null; | |
} | |
} | |
public static class Consumer2 extends RecursiveTask<Void> { | |
private BlockingQueue<String> drop; | |
public Consumer2(BlockingQueue<String> d) { | |
this.drop = d; | |
} | |
@Override | |
protected Void compute() { | |
String msg = null; | |
try { | |
while (!((msg = drop.take()).equals("DONE"))); | |
} catch (InterruptedException ex) { | |
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex); | |
} | |
return null; | |
} | |
} | |
public static class Producer2 extends RecursiveTask<Void> { | |
private BlockingQueue<String> drop; | |
private int chunkSize; | |
public Producer2(BlockingQueue<String> d, int chunkSize) { | |
this.drop = d; | |
this.chunkSize = chunkSize; | |
} | |
@Override | |
protected Void compute() { | |
try { | |
for (int i = 0; i < chunkSize; i++) { | |
String msg = producer(); | |
drop.put(msg); | |
} | |
drop.put("DONE"); | |
} catch (InterruptedException ex) { | |
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex); | |
} | |
return null; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment