Created
May 17, 2012 05:35
-
-
Save vazexqi/2716697 to your computer and use it in GitHub Desktop.
Various APIs for Futures/Promises for the JVM. For complete code, see https://github.com/vazexqi/JVMFuturesExploration
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
override def doAnalysisParallel() = { | |
implicit val executor = ExecutionContext.fromExecutorService(new ForkJoinPool()) | |
val nyseData = Future(loadNyseData()) | |
val nasdaqData = Future(loadNasdaqData()) | |
val mergedMarketData = for { | |
nyse <- nyseData | |
nasdaq <- nasdaqData | |
} yield mergeMarketData(Arrays.asList(nyse, nasdaq)) | |
val modeledMarketData = for { | |
m <- mergedMarketData | |
normalizedData <- Future(normalizeData(m)) | |
analyzedData <- Future(analyzeData(normalizedData)) | |
} yield runModel(analyzedData) | |
val modeledHistoricalData = for { | |
fedHistoricalData <- Future(loadFedHistoricalData()) | |
normalizedHistoricalData <- Future(normalizeData(fedHistoricalData)) | |
analyzedHistoricalData <- Future(analyzeData(normalizedHistoricalData)) | |
} yield runModel(analyzedHistoricalData) | |
val results = for { | |
marketData <- modeledMarketData | |
historicalData <- modeledHistoricalData | |
} yield compareModels(Arrays.asList(marketData, historicalData)) | |
Await.result(results, Duration.Inf) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
void doAnalysisParallel() { | |
Promise nyseData = pGroup.task {loadNyseData()} | |
Promise nasdaqData = pGroup.task {loadNasdaqData()} | |
Promise mergedMarketData = pGroup.whenAllBound([nyseData, nasdaqData]) { nyse, nasdaq -> | |
mergeMarketData([nyse, nasdaq]) | |
} | |
Promise modeledMarketData = mergedMarketData.then {it -> normalizeData(it)} then {it -> analyzeData(it)} then {it -> runModel(it)} | |
Promise modeledHistoricalData = pGroup.task {return loadFedHistoricalData()} then {it -> normalizeData(it)} then {it -> analyzeData(it)} then {it -> runModel(it)} | |
Promise results = pGroup.whenAllBound([modeledMarketData, modeledHistoricalData]) {marketData, historicalData -> | |
compareModels([marketData, historicalData]) | |
} | |
results.get() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public void doAnalysisParallel() throws ExecutionException, InterruptedException { | |
final ListenableFuture<StockDataCollection> nyseData = executor.submit(new Callable<StockDataCollection>() { | |
@Override | |
public StockDataCollection call() throws Exception { | |
return loadNyseData(); | |
} | |
}); | |
final ListenableFuture<StockDataCollection> nasdaqData = executor.submit(new Callable<StockDataCollection>() { | |
@Override | |
public StockDataCollection call() throws Exception { | |
return loadNasdaqData(); | |
} | |
}); | |
final ListenableFuture<StockDataCollection> mergedMarketData = Futures.transform(Futures.successfulAsList(nyseData, nasdaqData), new Function<List<StockDataCollection>, StockDataCollection>() { | |
@Override | |
public StockDataCollection apply(final List<StockDataCollection> input) { | |
return mergeMarketData(input); | |
} | |
}); | |
ListenableFuture<StockDataCollection> normalizedMarketData = Futures.transform(mergedMarketData, new Function<StockDataCollection, StockDataCollection>() { | |
@Override | |
public StockDataCollection apply(final StockDataCollection input) { | |
return normalizeData(input); | |
} | |
}); | |
final ListenableFuture<StockDataCollection> fedHistoricalData = executor.submit(new Callable<StockDataCollection>() { | |
@Override | |
public StockDataCollection call() throws Exception { | |
return loadFedHistoricalData(); | |
} | |
}); | |
final ListenableFuture<StockDataCollection> normalizedHistoricalData = Futures.transform(fedHistoricalData, new Function<StockDataCollection, StockDataCollection>() { | |
@Override | |
public StockDataCollection apply(final StockDataCollection input) { | |
return normalizeData(input); | |
} | |
}); | |
final ListenableFuture<StockAnalysisCollection> analyzedStockData = Futures.transform(normalizedMarketData, new Function<StockDataCollection, StockAnalysisCollection>() { | |
@Override | |
public StockAnalysisCollection apply(final StockDataCollection input) { | |
return analyzeData(input); | |
} | |
}); | |
ListenableFuture<MarketModel> modeledMarketData = Futures.transform(analyzedStockData, new Function<StockAnalysisCollection, MarketModel>() { | |
@Override | |
public MarketModel apply(final StockAnalysisCollection input) { | |
return runModel(input); | |
} | |
}); | |
final ListenableFuture<StockAnalysisCollection> analyzedHistoricalData = Futures.transform(normalizedHistoricalData, new Function<StockDataCollection, StockAnalysisCollection>() { | |
@Override | |
public StockAnalysisCollection apply(final StockDataCollection input) { | |
return analyzeData(input); | |
} | |
}); | |
ListenableFuture<MarketModel> modeledHistoricalData = Futures.transform(analyzedHistoricalData, new Function<StockAnalysisCollection, MarketModel>() { | |
@Override | |
public MarketModel apply(final StockAnalysisCollection input) { | |
return runModel(input); | |
} | |
}); | |
ListenableFuture<MarketRecommendation> results = Futures.transform(Futures.successfulAsList(modeledMarketData, modeledHistoricalData), new Function<List<MarketModel>, MarketRecommendation>() { | |
@Override | |
public MarketRecommendation apply(final List<MarketModel> input) { | |
return compareModels(input); | |
} | |
}); | |
results.get(); | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public void doAnalysisParallel() throws ExecutionException, InterruptedException { | |
final Future<StockDataCollection> nyseData = executor.submit(new Callable<StockDataCollection>() { | |
@Override | |
public StockDataCollection call() throws Exception { | |
return loadNyseData(); | |
} | |
}); | |
final Future<StockDataCollection> nasdaqData = executor.submit(new Callable<StockDataCollection>() { | |
@Override | |
public StockDataCollection call() throws Exception { | |
return loadNasdaqData(); | |
} | |
}); | |
final Future<StockDataCollection> mergedMarketData = executor.submit(new Callable<StockDataCollection>() { | |
@Override | |
public StockDataCollection call() throws Exception { | |
return mergeMarketData(Arrays.asList(nyseData.get(), nasdaqData.get())); | |
} | |
}); | |
final Future<StockDataCollection> normalizedMarketData = executor.submit(new Callable<StockDataCollection>() { | |
@Override | |
public StockDataCollection call() throws Exception { | |
return normalizeData(mergedMarketData.get()); | |
} | |
}); | |
final Future<StockDataCollection> fedHistoricalData = executor.submit(new Callable<StockDataCollection>() { | |
@Override | |
public StockDataCollection call() throws Exception { | |
return loadFedHistoricalData(); | |
} | |
}); | |
final Future<StockDataCollection> normalizedHistoricalData = executor.submit(new Callable<StockDataCollection>() { | |
@Override | |
public StockDataCollection call() throws Exception { | |
return normalizeData(fedHistoricalData.get()); | |
} | |
}); | |
final Future<StockAnalysisCollection> analyzedStockData = executor.submit(new Callable<StockAnalysisCollection>() { | |
@Override | |
public StockAnalysisCollection call() throws Exception { | |
return analyzeData(normalizedMarketData.get()); | |
} | |
}); | |
final Future<MarketModel> modeledMarketData = executor.submit(new Callable<MarketModel>() { | |
@Override | |
public MarketModel call() throws Exception { | |
return runModel(analyzedStockData.get()); | |
} | |
}); | |
final Future<StockAnalysisCollection> analyzedHistoricalData = executor.submit(new Callable<StockAnalysisCollection>() { | |
@Override | |
public StockAnalysisCollection call() throws Exception { | |
return analyzeData(normalizedHistoricalData.get()); | |
} | |
}); | |
final Future<MarketModel> modeledHistoricalData = executor.submit(new Callable<MarketModel>() { | |
@Override | |
public MarketModel call() throws Exception { | |
return runModel(analyzedHistoricalData.get()); | |
} | |
}); | |
Future<MarketRecommendation> results = executor.submit(new Callable<MarketRecommendation>() { | |
@Override | |
public MarketRecommendation call() throws Exception { | |
return compareModels(Arrays.asList(modeledMarketData.get(), modeledHistoricalData.get())); | |
} | |
}); | |
results.get(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
override def doAnalysisParallel() = { | |
val executor = FuturePool(new ForkJoinPool()) | |
val nyseData = executor(loadNyseData()) | |
val nasdaqData = executor(loadNasdaqData()) | |
val mergedMarketData = for { | |
nyse <- nyseData | |
nasdaq <- nasdaqData | |
} yield mergeMarketData(Arrays.asList(nyse, nasdaq)) | |
val modeledMarketData = for { | |
m <- mergedMarketData | |
normalizedData <- executor(normalizeData(m)) | |
analyzedData <- executor(analyzeData(normalizedData)) | |
} yield runModel(analyzedData) | |
val modeledHistoricalData = for { | |
fedHistoricalData <- executor(loadFedHistoricalData()) | |
normalizedHistoricalData <- executor(normalizeData(fedHistoricalData)) | |
analyzedHistoricalData <- executor(analyzeData(normalizedHistoricalData)) | |
} yield runModel(analyzedHistoricalData) | |
val results = for { | |
marketData <- modeledMarketData | |
historicalData <- modeledHistoricalData | |
} yield compareModels(Arrays.asList(marketData, historicalData)) | |
results.get() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment