Created
March 17, 2016 20:32
-
-
Save koduki/f4df79532f90dcbfed67 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* To change this license header, choose License Headers in Project Properties. | |
* To change this template file, choose Tools | Templates | |
* and open the template in the editor. | |
*/ | |
package cn.orz.pascal.example.csv; | |
import java.util.PriorityQueue; | |
import java.util.Queue; | |
import java.util.concurrent.CompletableFuture; | |
import java.util.logging.Level; | |
import java.util.logging.Logger; | |
/** | |
* | |
* @author koduki | |
*/ | |
public class FutureTest { | |
public static void main(String[] args) throws Exception { | |
CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> { | |
String msg = "future1"; | |
sleep(1000); | |
System.out.println("run:" + msg); | |
return new Item(1, msg); | |
}).thenAcceptAsync((Item item) -> { | |
sleep(1); | |
orderdPrintln(item); | |
}); | |
CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> { | |
String msg = "future2"; | |
sleep(500); | |
System.out.println("run:" + msg); | |
return new Item(2, msg); | |
}).thenAcceptAsync((Item item) -> { | |
sleep(2000); | |
orderdPrintln(item); | |
}); | |
CompletableFuture<Void> future3 = CompletableFuture.supplyAsync(() -> { | |
String msg = "future3"; | |
sleep(1200); | |
System.out.println("run:" + msg); | |
return new Item(3, msg); | |
}).thenAcceptAsync((Item item) -> { | |
sleep(100); | |
orderdPrintln(item); | |
}); | |
System.out.println("all future done."); | |
new Thread(() -> { | |
int prevId = 0; | |
System.out.println("thread start."); | |
while (pq.isNotEnd()) { | |
if (!pq.isEmpty() && pq.peek().id == (prevId + 1)) { | |
System.out.println("orded-print:" + pq.pop().data); | |
prevId++; | |
} | |
} | |
System.out.println("thread close."); | |
}).start(); | |
CompletableFuture.allOf(future1, future2, future3).get(); | |
pq.push(new Item(Integer.MAX_VALUE, null)); | |
} | |
public static SynchronizedPriorityQueue pq = new SynchronizedPriorityQueue(); | |
public static void orderdPrintln(Item item) { | |
System.out.println("receive:" + item.data); | |
pq.push(item); | |
System.out.println("dump:" + pq); | |
} | |
public static class Item implements Comparable<Item> { | |
int id; | |
String data; | |
public Item(int id, String data) { | |
this.id = id; | |
this.data = data; | |
} | |
public void setData(String data) { | |
this.data = data; | |
} | |
@Override | |
public int compareTo(Item o) { | |
return id - o.id; | |
} | |
@Override | |
public String toString() { | |
return "[" + id + ", " + data + "]"; | |
} | |
} | |
public static void sleep(final long interval) { | |
try { | |
Thread.sleep(interval); | |
} catch (InterruptedException ex) { | |
Logger.getLogger(FutureTest.class.getName()).log(Level.SEVERE, null, ex); | |
} | |
} | |
static class SynchronizedPriorityQueue { | |
private Queue<Item> pq = new PriorityQueue<>(); | |
synchronized public void push(Item item) { | |
pq.offer(item); | |
} | |
synchronized public Item pop() { | |
return pq.poll(); | |
} | |
synchronized public Item peek() { | |
return pq.peek(); | |
} | |
synchronized public boolean isEmpty() { | |
return pq.isEmpty(); | |
} | |
synchronized public boolean isNotEnd() { | |
if (pq.isEmpty()) { | |
return true; | |
} else { | |
if (pq.peek().data == null) { | |
return false; | |
} else { | |
return true; | |
} | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* To change this license header, choose License Headers in Project Properties. | |
* To change this template file, choose Tools | Templates | |
* and open the template in the editor. | |
*/ | |
package cn.orz.pascal.example.csv; | |
import java.util.ArrayList; | |
import java.util.LinkedList; | |
import java.util.List; | |
import java.util.Queue; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.CompletableFuture; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.Future; | |
import java.util.concurrent.RecursiveTask; | |
import java.util.concurrent.SynchronousQueue; | |
import java.util.logging.Level; | |
import java.util.logging.Logger; | |
import scala.concurrent.forkjoin.ForkJoinPool; | |
/** | |
* | |
* @author koduki | |
*/ | |
public class Test1 { | |
static int dataSize = 100_000; | |
public static String producer() { | |
sleep(10_000); | |
return "success"; | |
} | |
public static String producer(int index) { | |
sleep(10_000); | |
return "success:" + index; | |
} | |
public static void consumer(String msg) { | |
sleep(150_000); | |
sleep(140_000); | |
// System.out.println(msg); | |
} | |
public static void main(String[] args) throws Exception { | |
// sequential(); | |
asyncQ(); | |
// parallel(6, 6); | |
// parallel(8, 8); | |
// parallel(16, 16); | |
// parallel2(8, 8); | |
asyncFuture(); | |
} | |
public static void sequential() { | |
long s = System.nanoTime(); | |
for (int i = 0; i < dataSize; i++) { | |
String msg = producer(); | |
consumer(msg); | |
} | |
long e = System.nanoTime(); | |
System.out.println("finish sequential:\t" + (e - s) / 1000 / 1000 + "\tms"); | |
} | |
public static void asyncFuture() throws InterruptedException, ExecutionException { | |
long s = System.nanoTime(); | |
Queue<CompletableFuture<String>> futures = new LinkedList<>(); | |
for (int i = 0; i < dataSize; i++) { | |
final int index = i; | |
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { | |
// System.out.println("index=" + index); | |
return producer(index); | |
}); | |
future.thenAccept((String x) -> { | |
consumer(x); | |
}); | |
futures.offer(future); | |
} | |
// CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); | |
// future.get(); | |
for (Future<String> future = futures.poll(); future != null; future = futures.poll()) { | |
try { | |
future.get(); | |
} catch (Exception ex) { | |
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex); | |
} | |
} | |
long e = System.nanoTime(); | |
System.out.println("finish asyncFuture(parallelism=" + ForkJoinPool.commonPool().getParallelism() + ", pool=" + ForkJoinPool.commonPool().getPoolSize() + "):\t" + (e - s) / 1000 / 1000 + "\tms"); | |
} | |
public static void parallel2(int jobCount, int threadCount) { | |
try { | |
long s = System.nanoTime(); | |
ExecutorService es = Executors.newFixedThreadPool(threadCount); | |
List<ParallelTask2> tasks = new ArrayList<>(); | |
for (int i = 0; i < jobCount; i++) { | |
tasks.add(new ParallelTask2(dataSize / jobCount)); | |
} | |
List<Future<Void>> futures = es.invokeAll(tasks); | |
for (Future<Void> future : futures) { | |
future.get(); | |
} | |
es.shutdown(); | |
long e = System.nanoTime(); | |
System.out.println("finish parallel(job=" + jobCount + ", thread=" + threadCount + "):\t" + (e - s) / 1000 / 1000 + "\tms"); | |
} catch (InterruptedException ex) { | |
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex); | |
} catch (ExecutionException ex) { | |
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex); | |
} | |
} | |
public static void parallel(int jobCount, int threadCount) { | |
try { | |
ExecutorService es = Executors.newFixedThreadPool(threadCount); | |
List<ParallelTask> tasks = new ArrayList<>(); | |
for (int i = 0; i < jobCount; i++) { | |
tasks.add(new ParallelTask(dataSize / jobCount)); | |
} | |
long s = System.nanoTime(); | |
List<Future<Void>> futures = es.invokeAll(tasks); | |
for (Future<Void> future : futures) { | |
future.get(); | |
} | |
es.shutdown(); | |
long e = System.nanoTime(); | |
System.out.println("finish parallel(job=" + jobCount + ", thread=" + threadCount + "):\t" + (e - s) / 1000 / 1000 + "\tms"); | |
} catch (InterruptedException ex) { | |
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex); | |
} catch (ExecutionException ex) { | |
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex); | |
} | |
} | |
public static void asyncQ() { | |
try { | |
long s = System.nanoTime(); | |
BlockingQueue<String> drop = new SynchronousQueue<>(); | |
Thread t1 = new Thread(new Producer(drop)); | |
Thread t2 = new Thread(new Consumer(drop)); | |
t1.start(); | |
t2.start(); | |
t2.join(); | |
long e = System.nanoTime(); | |
System.out.println("finish asyncQ:\t" + (e - s) / 1000 / 1000 + "\tms"); | |
} catch (InterruptedException ex) { | |
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex); | |
} | |
} | |
public static void sleep(final long interval) { | |
long start = System.nanoTime(); | |
long end = 0; | |
do { | |
end = System.nanoTime(); | |
} while (start + interval >= end); | |
} | |
static class Producer implements Runnable { | |
private BlockingQueue<String> drop; | |
public Producer(BlockingQueue<String> d) { | |
this.drop = d; | |
} | |
public void run() { | |
try { | |
for (int i = 0; i < dataSize; i++) { | |
String msg = producer(); | |
drop.put(msg); | |
} | |
drop.put("DONE"); | |
} catch (InterruptedException ex) { | |
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex); | |
} | |
} | |
} | |
static class Consumer implements Runnable { | |
private BlockingQueue<String> drop; | |
public Consumer(BlockingQueue<String> d) { | |
this.drop = d; | |
} | |
public void run() { | |
String msg = null; | |
try { | |
while (!((msg = drop.take()).equals("DONE"))){ | |
consumer(msg); | |
}; | |
} catch (InterruptedException ex) { | |
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex); | |
} | |
} | |
} | |
public static class ParallelTask implements Callable<Void> { | |
private int chunkSize = 0; | |
public ParallelTask(int chunkSize) { | |
this.chunkSize = chunkSize; | |
} | |
@Override | |
public Void call() throws Exception { | |
for (int i = 0; i < chunkSize; i++) { | |
String msg = producer(); | |
consumer(msg); | |
} | |
return null; | |
} | |
} | |
public static class ParallelTask2 implements Callable<Void> { | |
private int chunkSize = 0; | |
public ParallelTask2(int chunkSize) { | |
this.chunkSize = chunkSize; | |
} | |
@Override | |
public Void call() throws Exception { | |
BlockingQueue<String> drop = new SynchronousQueue<>(); | |
// Producer2 producer = new Producer2(drop, chunkSize); | |
// Consumer2 consumer = new Consumer2(drop); | |
// | |
// producer.fork(); | |
// consumer.fork(); | |
// consumer.join(); | |
Thread t1 = new Thread(new Producer(drop)); | |
Thread t2 = new Thread(new Consumer(drop)); | |
t1.start(); | |
t2.start(); | |
t2.join(); | |
return null; | |
} | |
} | |
public static class Consumer2 extends RecursiveTask<Void> { | |
private BlockingQueue<String> drop; | |
public Consumer2(BlockingQueue<String> d) { | |
this.drop = d; | |
} | |
@Override | |
protected Void compute() { | |
String msg = null; | |
try { | |
while (!((msg = drop.take()).equals("DONE"))); | |
} catch (InterruptedException ex) { | |
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex); | |
} | |
return null; | |
} | |
} | |
public static class Producer2 extends RecursiveTask<Void> { | |
private BlockingQueue<String> drop; | |
private int chunkSize; | |
public Producer2(BlockingQueue<String> d, int chunkSize) { | |
this.drop = d; | |
this.chunkSize = chunkSize; | |
} | |
@Override | |
protected Void compute() { | |
try { | |
for (int i = 0; i < chunkSize; i++) { | |
String msg = producer(); | |
drop.put(msg); | |
} | |
drop.put("DONE"); | |
} catch (InterruptedException ex) { | |
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex); | |
} | |
return null; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment