Skip to content

Instantly share code, notes, and snippets.

@koduki
Created March 17, 2016 20:32
Show Gist options
  • Save koduki/f4df79532f90dcbfed67 to your computer and use it in GitHub Desktop.
Save koduki/f4df79532f90dcbfed67 to your computer and use it in GitHub Desktop.
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package cn.orz.pascal.example.csv;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
*
* @author koduki
*/
public class FutureTest {
public static void main(String[] args) throws Exception {
CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> {
String msg = "future1";
sleep(1000);
System.out.println("run:" + msg);
return new Item(1, msg);
}).thenAcceptAsync((Item item) -> {
sleep(1);
orderdPrintln(item);
});
CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> {
String msg = "future2";
sleep(500);
System.out.println("run:" + msg);
return new Item(2, msg);
}).thenAcceptAsync((Item item) -> {
sleep(2000);
orderdPrintln(item);
});
CompletableFuture<Void> future3 = CompletableFuture.supplyAsync(() -> {
String msg = "future3";
sleep(1200);
System.out.println("run:" + msg);
return new Item(3, msg);
}).thenAcceptAsync((Item item) -> {
sleep(100);
orderdPrintln(item);
});
System.out.println("all future done.");
new Thread(() -> {
int prevId = 0;
System.out.println("thread start.");
while (pq.isNotEnd()) {
if (!pq.isEmpty() && pq.peek().id == (prevId + 1)) {
System.out.println("orded-print:" + pq.pop().data);
prevId++;
}
}
System.out.println("thread close.");
}).start();
CompletableFuture.allOf(future1, future2, future3).get();
pq.push(new Item(Integer.MAX_VALUE, null));
}
public static SynchronizedPriorityQueue pq = new SynchronizedPriorityQueue();
public static void orderdPrintln(Item item) {
System.out.println("receive:" + item.data);
pq.push(item);
System.out.println("dump:" + pq);
}
public static class Item implements Comparable<Item> {
int id;
String data;
public Item(int id, String data) {
this.id = id;
this.data = data;
}
public void setData(String data) {
this.data = data;
}
@Override
public int compareTo(Item o) {
return id - o.id;
}
@Override
public String toString() {
return "[" + id + ", " + data + "]";
}
}
public static void sleep(final long interval) {
try {
Thread.sleep(interval);
} catch (InterruptedException ex) {
Logger.getLogger(FutureTest.class.getName()).log(Level.SEVERE, null, ex);
}
}
static class SynchronizedPriorityQueue {
private Queue<Item> pq = new PriorityQueue<>();
synchronized public void push(Item item) {
pq.offer(item);
}
synchronized public Item pop() {
return pq.poll();
}
synchronized public Item peek() {
return pq.peek();
}
synchronized public boolean isEmpty() {
return pq.isEmpty();
}
synchronized public boolean isNotEnd() {
if (pq.isEmpty()) {
return true;
} else {
if (pq.peek().data == null) {
return false;
} else {
return true;
}
}
}
}
}
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package cn.orz.pascal.example.csv;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.SynchronousQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import scala.concurrent.forkjoin.ForkJoinPool;
/**
*
* @author koduki
*/
public class Test1 {
static int dataSize = 100_000;
public static String producer() {
sleep(10_000);
return "success";
}
public static String producer(int index) {
sleep(10_000);
return "success:" + index;
}
public static void consumer(String msg) {
sleep(150_000);
sleep(140_000);
// System.out.println(msg);
}
public static void main(String[] args) throws Exception {
// sequential();
asyncQ();
// parallel(6, 6);
// parallel(8, 8);
// parallel(16, 16);
// parallel2(8, 8);
asyncFuture();
}
public static void sequential() {
long s = System.nanoTime();
for (int i = 0; i < dataSize; i++) {
String msg = producer();
consumer(msg);
}
long e = System.nanoTime();
System.out.println("finish sequential:\t" + (e - s) / 1000 / 1000 + "\tms");
}
public static void asyncFuture() throws InterruptedException, ExecutionException {
long s = System.nanoTime();
Queue<CompletableFuture<String>> futures = new LinkedList<>();
for (int i = 0; i < dataSize; i++) {
final int index = i;
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// System.out.println("index=" + index);
return producer(index);
});
future.thenAccept((String x) -> {
consumer(x);
});
futures.offer(future);
}
// CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
// future.get();
for (Future<String> future = futures.poll(); future != null; future = futures.poll()) {
try {
future.get();
} catch (Exception ex) {
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex);
}
}
long e = System.nanoTime();
System.out.println("finish asyncFuture(parallelism=" + ForkJoinPool.commonPool().getParallelism() + ", pool=" + ForkJoinPool.commonPool().getPoolSize() + "):\t" + (e - s) / 1000 / 1000 + "\tms");
}
public static void parallel2(int jobCount, int threadCount) {
try {
long s = System.nanoTime();
ExecutorService es = Executors.newFixedThreadPool(threadCount);
List<ParallelTask2> tasks = new ArrayList<>();
for (int i = 0; i < jobCount; i++) {
tasks.add(new ParallelTask2(dataSize / jobCount));
}
List<Future<Void>> futures = es.invokeAll(tasks);
for (Future<Void> future : futures) {
future.get();
}
es.shutdown();
long e = System.nanoTime();
System.out.println("finish parallel(job=" + jobCount + ", thread=" + threadCount + "):\t" + (e - s) / 1000 / 1000 + "\tms");
} catch (InterruptedException ex) {
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex);
} catch (ExecutionException ex) {
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex);
}
}
public static void parallel(int jobCount, int threadCount) {
try {
ExecutorService es = Executors.newFixedThreadPool(threadCount);
List<ParallelTask> tasks = new ArrayList<>();
for (int i = 0; i < jobCount; i++) {
tasks.add(new ParallelTask(dataSize / jobCount));
}
long s = System.nanoTime();
List<Future<Void>> futures = es.invokeAll(tasks);
for (Future<Void> future : futures) {
future.get();
}
es.shutdown();
long e = System.nanoTime();
System.out.println("finish parallel(job=" + jobCount + ", thread=" + threadCount + "):\t" + (e - s) / 1000 / 1000 + "\tms");
} catch (InterruptedException ex) {
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex);
} catch (ExecutionException ex) {
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex);
}
}
public static void asyncQ() {
try {
long s = System.nanoTime();
BlockingQueue<String> drop = new SynchronousQueue<>();
Thread t1 = new Thread(new Producer(drop));
Thread t2 = new Thread(new Consumer(drop));
t1.start();
t2.start();
t2.join();
long e = System.nanoTime();
System.out.println("finish asyncQ:\t" + (e - s) / 1000 / 1000 + "\tms");
} catch (InterruptedException ex) {
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex);
}
}
public static void sleep(final long interval) {
long start = System.nanoTime();
long end = 0;
do {
end = System.nanoTime();
} while (start + interval >= end);
}
static class Producer implements Runnable {
private BlockingQueue<String> drop;
public Producer(BlockingQueue<String> d) {
this.drop = d;
}
public void run() {
try {
for (int i = 0; i < dataSize; i++) {
String msg = producer();
drop.put(msg);
}
drop.put("DONE");
} catch (InterruptedException ex) {
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
static class Consumer implements Runnable {
private BlockingQueue<String> drop;
public Consumer(BlockingQueue<String> d) {
this.drop = d;
}
public void run() {
String msg = null;
try {
while (!((msg = drop.take()).equals("DONE"))){
consumer(msg);
};
} catch (InterruptedException ex) {
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
public static class ParallelTask implements Callable<Void> {
private int chunkSize = 0;
public ParallelTask(int chunkSize) {
this.chunkSize = chunkSize;
}
@Override
public Void call() throws Exception {
for (int i = 0; i < chunkSize; i++) {
String msg = producer();
consumer(msg);
}
return null;
}
}
public static class ParallelTask2 implements Callable<Void> {
private int chunkSize = 0;
public ParallelTask2(int chunkSize) {
this.chunkSize = chunkSize;
}
@Override
public Void call() throws Exception {
BlockingQueue<String> drop = new SynchronousQueue<>();
// Producer2 producer = new Producer2(drop, chunkSize);
// Consumer2 consumer = new Consumer2(drop);
//
// producer.fork();
// consumer.fork();
// consumer.join();
Thread t1 = new Thread(new Producer(drop));
Thread t2 = new Thread(new Consumer(drop));
t1.start();
t2.start();
t2.join();
return null;
}
}
public static class Consumer2 extends RecursiveTask<Void> {
private BlockingQueue<String> drop;
public Consumer2(BlockingQueue<String> d) {
this.drop = d;
}
@Override
protected Void compute() {
String msg = null;
try {
while (!((msg = drop.take()).equals("DONE")));
} catch (InterruptedException ex) {
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex);
}
return null;
}
}
public static class Producer2 extends RecursiveTask<Void> {
private BlockingQueue<String> drop;
private int chunkSize;
public Producer2(BlockingQueue<String> d, int chunkSize) {
this.drop = d;
this.chunkSize = chunkSize;
}
@Override
protected Void compute() {
try {
for (int i = 0; i < chunkSize; i++) {
String msg = producer();
drop.put(msg);
}
drop.put("DONE");
} catch (InterruptedException ex) {
Logger.getLogger(Test1.class.getName()).log(Level.SEVERE, null, ex);
}
return null;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment