Skip to content

Instantly share code, notes, and snippets.

@vazexqi
Created May 17, 2012 05:35
Show Gist options
  • Save vazexqi/2716697 to your computer and use it in GitHub Desktop.
Save vazexqi/2716697 to your computer and use it in GitHub Desktop.
Various APIs for Futures/Promises for the JVM. For complete code, see https://github.com/vazexqi/JVMFuturesExploration
override def doAnalysisParallel() = {
implicit val executor = ExecutionContext.fromExecutorService(new ForkJoinPool())
val nyseData = Future(loadNyseData())
val nasdaqData = Future(loadNasdaqData())
val mergedMarketData = for {
nyse <- nyseData
nasdaq <- nasdaqData
} yield mergeMarketData(Arrays.asList(nyse, nasdaq))
val modeledMarketData = for {
m <- mergedMarketData
normalizedData <- Future(normalizeData(m))
analyzedData <- Future(analyzeData(normalizedData))
} yield runModel(analyzedData)
val modeledHistoricalData = for {
fedHistoricalData <- Future(loadFedHistoricalData())
normalizedHistoricalData <- Future(normalizeData(fedHistoricalData))
analyzedHistoricalData <- Future(analyzeData(normalizedHistoricalData))
} yield runModel(analyzedHistoricalData)
val results = for {
marketData <- modeledMarketData
historicalData <- modeledHistoricalData
} yield compareModels(Arrays.asList(marketData, historicalData))
Await.result(results, Duration.Inf)
}
}
void doAnalysisParallel() {
Promise nyseData = pGroup.task {loadNyseData()}
Promise nasdaqData = pGroup.task {loadNasdaqData()}
Promise mergedMarketData = pGroup.whenAllBound([nyseData, nasdaqData]) { nyse, nasdaq ->
mergeMarketData([nyse, nasdaq])
}
Promise modeledMarketData = mergedMarketData.then {it -> normalizeData(it)} then {it -> analyzeData(it)} then {it -> runModel(it)}
Promise modeledHistoricalData = pGroup.task {return loadFedHistoricalData()} then {it -> normalizeData(it)} then {it -> analyzeData(it)} then {it -> runModel(it)}
Promise results = pGroup.whenAllBound([modeledMarketData, modeledHistoricalData]) {marketData, historicalData ->
compareModels([marketData, historicalData])
}
results.get()
}
public void doAnalysisParallel() throws ExecutionException, InterruptedException {
final ListenableFuture<StockDataCollection> nyseData = executor.submit(new Callable<StockDataCollection>() {
@Override
public StockDataCollection call() throws Exception {
return loadNyseData();
}
});
final ListenableFuture<StockDataCollection> nasdaqData = executor.submit(new Callable<StockDataCollection>() {
@Override
public StockDataCollection call() throws Exception {
return loadNasdaqData();
}
});
final ListenableFuture<StockDataCollection> mergedMarketData = Futures.transform(Futures.successfulAsList(nyseData, nasdaqData), new Function<List<StockDataCollection>, StockDataCollection>() {
@Override
public StockDataCollection apply(final List<StockDataCollection> input) {
return mergeMarketData(input);
}
});
ListenableFuture<StockDataCollection> normalizedMarketData = Futures.transform(mergedMarketData, new Function<StockDataCollection, StockDataCollection>() {
@Override
public StockDataCollection apply(final StockDataCollection input) {
return normalizeData(input);
}
});
final ListenableFuture<StockDataCollection> fedHistoricalData = executor.submit(new Callable<StockDataCollection>() {
@Override
public StockDataCollection call() throws Exception {
return loadFedHistoricalData();
}
});
final ListenableFuture<StockDataCollection> normalizedHistoricalData = Futures.transform(fedHistoricalData, new Function<StockDataCollection, StockDataCollection>() {
@Override
public StockDataCollection apply(final StockDataCollection input) {
return normalizeData(input);
}
});
final ListenableFuture<StockAnalysisCollection> analyzedStockData = Futures.transform(normalizedMarketData, new Function<StockDataCollection, StockAnalysisCollection>() {
@Override
public StockAnalysisCollection apply(final StockDataCollection input) {
return analyzeData(input);
}
});
ListenableFuture<MarketModel> modeledMarketData = Futures.transform(analyzedStockData, new Function<StockAnalysisCollection, MarketModel>() {
@Override
public MarketModel apply(final StockAnalysisCollection input) {
return runModel(input);
}
});
final ListenableFuture<StockAnalysisCollection> analyzedHistoricalData = Futures.transform(normalizedHistoricalData, new Function<StockDataCollection, StockAnalysisCollection>() {
@Override
public StockAnalysisCollection apply(final StockDataCollection input) {
return analyzeData(input);
}
});
ListenableFuture<MarketModel> modeledHistoricalData = Futures.transform(analyzedHistoricalData, new Function<StockAnalysisCollection, MarketModel>() {
@Override
public MarketModel apply(final StockAnalysisCollection input) {
return runModel(input);
}
});
ListenableFuture<MarketRecommendation> results = Futures.transform(Futures.successfulAsList(modeledMarketData, modeledHistoricalData), new Function<List<MarketModel>, MarketRecommendation>() {
@Override
public MarketRecommendation apply(final List<MarketModel> input) {
return compareModels(input);
}
});
results.get();
}
public void doAnalysisParallel() throws ExecutionException, InterruptedException {
final Future<StockDataCollection> nyseData = executor.submit(new Callable<StockDataCollection>() {
@Override
public StockDataCollection call() throws Exception {
return loadNyseData();
}
});
final Future<StockDataCollection> nasdaqData = executor.submit(new Callable<StockDataCollection>() {
@Override
public StockDataCollection call() throws Exception {
return loadNasdaqData();
}
});
final Future<StockDataCollection> mergedMarketData = executor.submit(new Callable<StockDataCollection>() {
@Override
public StockDataCollection call() throws Exception {
return mergeMarketData(Arrays.asList(nyseData.get(), nasdaqData.get()));
}
});
final Future<StockDataCollection> normalizedMarketData = executor.submit(new Callable<StockDataCollection>() {
@Override
public StockDataCollection call() throws Exception {
return normalizeData(mergedMarketData.get());
}
});
final Future<StockDataCollection> fedHistoricalData = executor.submit(new Callable<StockDataCollection>() {
@Override
public StockDataCollection call() throws Exception {
return loadFedHistoricalData();
}
});
final Future<StockDataCollection> normalizedHistoricalData = executor.submit(new Callable<StockDataCollection>() {
@Override
public StockDataCollection call() throws Exception {
return normalizeData(fedHistoricalData.get());
}
});
final Future<StockAnalysisCollection> analyzedStockData = executor.submit(new Callable<StockAnalysisCollection>() {
@Override
public StockAnalysisCollection call() throws Exception {
return analyzeData(normalizedMarketData.get());
}
});
final Future<MarketModel> modeledMarketData = executor.submit(new Callable<MarketModel>() {
@Override
public MarketModel call() throws Exception {
return runModel(analyzedStockData.get());
}
});
final Future<StockAnalysisCollection> analyzedHistoricalData = executor.submit(new Callable<StockAnalysisCollection>() {
@Override
public StockAnalysisCollection call() throws Exception {
return analyzeData(normalizedHistoricalData.get());
}
});
final Future<MarketModel> modeledHistoricalData = executor.submit(new Callable<MarketModel>() {
@Override
public MarketModel call() throws Exception {
return runModel(analyzedHistoricalData.get());
}
});
Future<MarketRecommendation> results = executor.submit(new Callable<MarketRecommendation>() {
@Override
public MarketRecommendation call() throws Exception {
return compareModels(Arrays.asList(modeledMarketData.get(), modeledHistoricalData.get()));
}
});
results.get();
}
override def doAnalysisParallel() = {
val executor = FuturePool(new ForkJoinPool())
val nyseData = executor(loadNyseData())
val nasdaqData = executor(loadNasdaqData())
val mergedMarketData = for {
nyse <- nyseData
nasdaq <- nasdaqData
} yield mergeMarketData(Arrays.asList(nyse, nasdaq))
val modeledMarketData = for {
m <- mergedMarketData
normalizedData <- executor(normalizeData(m))
analyzedData <- executor(analyzeData(normalizedData))
} yield runModel(analyzedData)
val modeledHistoricalData = for {
fedHistoricalData <- executor(loadFedHistoricalData())
normalizedHistoricalData <- executor(normalizeData(fedHistoricalData))
analyzedHistoricalData <- executor(analyzeData(normalizedHistoricalData))
} yield runModel(analyzedHistoricalData)
val results = for {
marketData <- modeledMarketData
historicalData <- modeledHistoricalData
} yield compareModels(Arrays.asList(marketData, historicalData))
results.get()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment