Last active
February 8, 2019 17:09
-
-
Save ggalmazor/b7394c7e8dd95d5d943ed9c1aa63d224 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Job<T> { | |
private final JobAwareSupplier<T> block; | |
private Job(JobAwareSupplier<T> block) { | |
this.block = block; | |
} | |
public static <U> Job<U> supply(JobAwareSupplier<U> supplier) { | |
return new Job<>(supplier); | |
} | |
public static Job<Void> run(JobAwareRunnable runnable) { | |
return new Job<>(jobStatus -> { | |
runnable.run(jobStatus); | |
return null; | |
}); | |
} | |
public static <T, U, V> Job<Triple<T, U, V>> allOf(Job<T> t, Job<U> u, Job<V> v) { | |
return new Job<>(jobStatus -> new Triple<>( | |
t.block.get(jobStatus), | |
u.block.get(jobStatus), | |
v.block.get(jobStatus) | |
)); | |
} | |
public <U> Job<U> thenApply(JobAwareFunction<T, U> function) { | |
return new Job<>(jobStatus -> function.apply(jobStatus, block.get(jobStatus))); | |
} | |
public CompletableFuture<T> launch(ExecutorService executor) { | |
return CompletableFuture.supplyAsync(() -> block.get(new JobStatus(executor::isShutdown)), executor); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@FunctionalInterface | |
public interface JobAwareFunction<T, U> { | |
U apply(JobStatus jobStatus, T t); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@FunctionalInterface | |
public interface JobAwareRunnable { | |
void run(JobStatus jobStatus); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@FunctionalInterface | |
public interface JobAwareSupplier<T> { | |
T get(JobStatus jobStatus); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class JobsRunner<T> { | |
private static final Logger log = LoggerFactory.getLogger(JobsRunner.class); | |
private final List<Consumer<List<T>>> successCallbacks = new ArrayList<>(); | |
private final List<Consumer<Throwable>> errorCallbacks = new ArrayList<>(); | |
private final ExecutorService executor; | |
private final List<Job<T>> jobs; | |
private JobsRunner(List<Job<T>> jobs) { | |
this.jobs = jobs; | |
executor = new ForkJoinPool( | |
ForkJoinPool.commonPool().getParallelism(), | |
ForkJoinPool.commonPool().getFactory(), | |
(thread, throwable) -> errorCallbacks.forEach(c -> c.accept(throwable)), | |
ForkJoinPool.commonPool().getAsyncMode() | |
); | |
} | |
@SafeVarargs | |
public static <T> JobsRunner<T> of(Job<T>... jobs) { | |
return JobsRunner.of(Stream.of(jobs)); | |
} | |
public static <T> JobsRunner<T> of(Stream<Job<T>> jobs) { | |
return JobsRunner.of(jobs.collect(toList())); | |
} | |
public static <T> JobsRunner<T> of(List<Job<T>> jobs1) { | |
return new JobsRunner<>(jobs1); | |
} | |
public JobsRunner<T> onError(Consumer<Throwable> errorCallback) { | |
errorCallbacks.add(errorCallback); | |
return this; | |
} | |
public JobsRunner<T> onSuccess(Consumer<List<T>> successCallback) { | |
successCallbacks.add(successCallback); | |
return this; | |
} | |
public JobsRunner<T> launch() { | |
CompletableFuture.runAsync(() -> { | |
try { | |
List<T> results = jobs.stream().map(job -> job.launch(executor)).collect(collectResult()).get(); | |
successCallbacks.forEach(c -> c.accept(results)); | |
} catch (InterruptedException | ExecutionException e) { | |
log.info("Job cancelled", e); | |
} | |
}, executor); | |
return this; | |
} | |
public void cancel() { | |
executor.shutdownNow(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class JobStatus extends TerminationFuture { | |
private final Supplier<Boolean> isCancelledGetter; | |
JobStatus(Supplier<Boolean> isCancelledGetter) { | |
this.isCancelledGetter = isCancelledGetter; | |
} | |
public boolean notCancelled() { | |
return !isCancelledGetter.get(); | |
} | |
@Override | |
public boolean isCancelled() { | |
return isCancelledGetter.get(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public JobsRunner pull(TransferForms forms) { | |
Http reusableHttp = http.reusingConnections(); | |
Stream<Job<PullResult>> pullJobs = forms.getSelectedForms().stream() | |
.map(form -> PullForm.pullOne(reusableHttp, form)); | |
return JobsRunner.of(pullJobs) | |
.onError(e -> { | |
log.error("Error pulling forms", e); | |
EventBus.publish(new PullEvent.Failure()); | |
}) | |
.onSuccess(results -> { | |
results.forEach(result -> forms.setLastPullCursor(result.getForm(), result.getLastCursor())); | |
EventBus.publish(new PullEvent.Success()); | |
}) | |
.launch(); | |
} | |
private Job<PullResult> pullOne(Http http, FormStatus form) { | |
PullTracker tracker = new PullTracker(form); | |
return allOf( | |
supply(jobStatus -> downloadBlankForm(form, tracker)), | |
supply(jobStatus -> getInstanceIdBatches(form, tracker, jobStatus)), | |
run(jobStatus -> downloadBlankFormAttachments(form, tracker)) | |
).thenApply((jobStatus, t) -> { | |
// Build the submission key generator with the blank form XML | |
SubmissionKeyGenerator subKeyGen = SubmissionKeyGenerator.from(t.get1()); | |
// Extract all the instance IDs from all the batches and download each instance | |
t.get2().stream() | |
.flatMap(batch -> batch.getInstanceIds().stream()) | |
.forEach(instanceId -> { | |
if (jobStatus.notCancelled()) | |
downloadSubmissionAndMedia(form, tracker, instanceId, subKeyGen); | |
}); | |
// Return the pull result with the last cursor | |
return PullResult.of(form, getLastCursor(t.get2())); | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment