Created
July 18, 2016 06:15
-
-
Save ykrkn/ccdc907b42518777c42188c101ea1864 to your computer and use it in GitHub Desktop.
CompletableFuture Actor Graph
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import java.util.concurrent.CompletableFuture; | |
import java.util.concurrent.CompletionException; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.stream.Collectors; | |
import java.util.stream.Stream; | |
import static java.util.concurrent.CompletableFuture.completedFuture; | |
/** | |
* Created by sx on 26.02.16. | |
*/ | |
public class Main { | |
public static void main(String[] args) | |
{ | |
System.out.println("RUN"); | |
Graph g = createGraphEtl(); // Don't work createGraphCycled(); | |
List<String> src = Stream.of("A").collect(Collectors.toList()); | |
CountDownLatch latch = new CountDownLatch(src.size()); | |
for(String c : src){ | |
GraphExecutor ge = new GraphExecutor(g); | |
ge.execute(c).thenApply((x) -> { | |
System.out.println("BATCH COMPLETE"); | |
latch.countDown(); | |
return null; | |
}).exceptionally(t -> { | |
if(t instanceof CompletionException){ | |
((CompletionException) t).getCause().printStackTrace(); | |
} else { | |
((Throwable)t).printStackTrace(); | |
} | |
latch.countDown(); | |
return null; | |
} ); | |
} | |
try { | |
latch.await(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
System.out.println("EXIT"); | |
} | |
private static Graph createGraphCycled() { | |
Graph g = new Graph(); | |
Actor e0 = new Actor("E0"); | |
Actor e1 = new Actor("E1"); | |
Actor e2 = new Actor("E2"); | |
Actor e3 = new Actor("E3"); | |
g.add(e0); | |
g.add(e1); | |
g.add(e2); | |
g.add(e3); | |
g.connect(e0, e1); | |
g.connect(e1, e2); | |
g.connect(e2, e1); | |
g.connect(e2, e3); | |
return g; | |
} | |
private static Graph createGraphEtl(){ | |
Graph g = new Graph(); | |
Actor e1 = new Actor("E1"); | |
Actor e2 = new Actor("E2"); | |
Actor e3 = new Actor("E3"); | |
Actor t1 = new Actor("T1"); | |
Actor t2 = new Actor("T2"); | |
Actor d1 = new Actor("D1"); | |
Actor d2 = new Actor("D2"); | |
Actor d3 = new Actor("D3"); | |
g.add(e1); | |
g.add(e2); | |
g.add(e3); | |
g.add(t1); | |
g.add(t2); | |
g.add(d1); | |
g.add(d2); | |
g.add(d3); | |
g.connect(e1, t1); | |
g.connect(e2, t1); | |
g.connect(e3, t2); | |
g.connect(t1, d1); | |
g.connect(t2, d2); | |
g.connect(t2, d3); | |
return g; | |
} | |
private static Graph createGraphChain100(){ | |
Graph g = new Graph(); | |
Actor a = new Actor("A0"); | |
// Actor first = a; | |
Actor p = null; | |
int i = 100; | |
while(i-->0) { | |
g.add(a); | |
if(p != null) g.connect(p, a); | |
p = a; a = new Actor("A"+i); | |
} | |
return g; | |
} | |
static class GraphExecutor { | |
private final Graph g; | |
private final Map<Actor, CompletableFuture> promises = new HashMap<>(); | |
public GraphExecutor(Graph g){ | |
this.g = g; | |
} | |
void putPromise(Actor v, CompletableFuture cf){ | |
promises.put(v, cf); | |
} | |
CompletableFuture getPromiseFor(Actor v){ | |
return promises.get(v); | |
} | |
private CompletableFuture execute(String c) { | |
for(Actor a : g.vertices) { | |
List<Actor> inputs = g.getInwards(a); | |
CompletableFuture promise; | |
if(inputs.size() == 0){ | |
promise = completedFuture(c).thenComposeAsync(a::promise); | |
} else if (inputs.size() == 1){ | |
Actor prev = inputs.get(0); | |
promise = getPromiseFor(prev).thenComposeAsync(a::promise); | |
} else { | |
// More than 1 previous - get ANY OF previous promises | |
List<CompletableFuture> tailPromises = inputs.stream() | |
.map(a2 -> getPromiseFor(a2)).collect(Collectors.toList()); | |
promise = CompletableFuture.anyOf(tailPromises.toArray(new CompletableFuture[]{})); | |
promise = promise.thenComposeAsync(a::promise); | |
} | |
putPromise(a, promise); | |
} | |
List<CompletableFuture> tailPromises = g.vertices.stream() | |
.filter(a -> 0 == g.getOutwards(a).size()) | |
.map(a -> getPromiseFor(a)).collect(Collectors.toList()); | |
// Combine all tail promises into final ALL OF completed | |
return CompletableFuture.allOf(tailPromises.toArray(new CompletableFuture[]{})); | |
} | |
} | |
static class Graph { | |
private List<Actor> vertices = new LinkedList<>(); | |
private List<Edge> edges = new LinkedList<>(); | |
boolean add(Actor v){ | |
if (!vertices.contains(v)) { | |
this.vertices.add(v); | |
return true; | |
} | |
return false; | |
} | |
Edge connect(Actor from, Actor to){ | |
for (Edge e : edges) { | |
if (e.from == from && e.to == to) { | |
return e; | |
} | |
} | |
Edge e = new Edge(from, to); | |
edges.add(e); | |
return e; | |
} | |
List<Actor> getInwards(Actor to){ | |
List<Actor> result = new ArrayList<>(); | |
for(Edge e : edges){ | |
if(e.to == to) result.add(e.from); | |
} | |
return result; | |
} | |
List<Actor> getOutwards(Actor from){ | |
List<Actor> result = new ArrayList<>(); | |
for(Edge e : edges){ | |
if(e.from == from) result.add(e.to); | |
} | |
return result; | |
} | |
} | |
static class Edge { | |
public final Actor from; | |
public final Actor to; | |
public Edge(Actor from, Actor to) { | |
this.from = from; | |
this.to = to; | |
//System.out.println(from + " -> " + to); | |
} | |
} | |
static class Actor<T, R> { | |
String N; | |
public Actor(String name) { | |
N = name; | |
} | |
public CompletableFuture promise(Object x) { | |
return CompletableFuture.supplyAsync(() -> execute((T)x)); | |
} | |
public R execute(T value) throws RuntimeException { | |
String z = null; | |
assert z != null; // WTF ??? | |
//if(is()) throw new RuntimeException("EX"); | |
System.out.println(N + " " + value); | |
try { | |
Thread.currentThread().sleep(111); | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
} | |
return (R) (value.toString() + '.'); | |
} | |
boolean is(){ return true; } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment