Created
October 14, 2013 20:57
-
-
Save elandau/6982077 to your computer and use it in GitHub Desktop.
JavaRX based concurrent thread model for polling data from a synchronous source
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.netflix.schlep.rx; | |
import java.util.List; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import com.google.common.util.concurrent.ThreadFactoryBuilder; | |
import rx.Observer; | |
import rx.Scheduler; | |
import rx.Subscription; | |
import rx.Observable.OnSubscribeFunc; | |
import rx.concurrency.Schedulers; | |
import rx.subscriptions.CompositeSubscription; | |
import rx.util.functions.Action0; | |
import rx.util.functions.Func1; | |
/** | |
* Specialized operation used to drive the starting point of a stream reader with built | |
* in concurrency for polling a provided function for the next batch from the stream. | |
* This module will scale up to the provided number of concurrent operations. On error, | |
* it will scale down to 1 thread until a successful read. onError will not propagate | |
* down to the observer. | |
* | |
* @author elandau | |
* | |
*/ | |
public class OperationConcurrent { | |
/** | |
* @param source Function providing the input | |
* @param delayFunc Policy for deciding how long to delay retrying the source function | |
* in case it is empty of of an error. The parameter is the ratio | |
* of batch fullness (0.0 none or error, 1.0 full). Return value is | |
* delay in msec. | |
* @param batchSize Number of elements to read from the function | |
* @param maxThreads Maximum number of concurrent threads | |
* @param name Name to give threads (must have a %d for the thread id) | |
* @return | |
*/ | |
public static <T> OnSubscribeFunc<List<T>> concurrent( | |
Func1<Integer, List<T>> source, | |
Func1<Double, Long> delayFunc, | |
final int batchSize, | |
final int maxThreads, | |
final String name) { | |
return concurrent(source, | |
delayFunc, | |
maxThreads, | |
batchSize, | |
Schedulers.executor(Executors.newScheduledThreadPool( | |
maxThreads, | |
new ThreadFactoryBuilder() | |
.setNameFormat(name) | |
.setDaemon(true) | |
.build())) | |
); | |
} | |
/** | |
* Create a concurrent operation with a provided scheduler | |
* | |
* @param source | |
* @param scheduler | |
* @param maxThreads | |
* @param batchSize | |
* @return | |
*/ | |
public static <T> OnSubscribeFunc<List<T>> concurrent( | |
final Func1<Integer, List<T>> source, | |
final Func1<Double, Long> delayFunc, | |
final int maxThreads, | |
final int batchSize, | |
final Scheduler scheduler | |
) { | |
return new OnSubscribeFunc<List<T>>() { | |
@Override | |
public Subscription onSubscribe(final Observer<? super List<T>> observer) { | |
return new ScheduledSubscription<T>(source, delayFunc, observer, scheduler, maxThreads, batchSize); | |
} | |
}; | |
} | |
private final static AtomicInteger idCounter = new AtomicInteger(0); | |
private static class ScheduledSubscription<T> implements Subscription { | |
private final Func1<Integer, List<T>> sourceFunc; | |
private final Func1<Double, Long> delayFunc; | |
private final Scheduler scheduler; | |
private final Observer<? super List<T>> observer; | |
private final int batchSize; | |
private final int maxThreads; | |
private int counter; | |
private final CompositeSubscription subs = new CompositeSubscription(); | |
private final Object lock = new Object(); | |
class ObserverAction implements Action0 { | |
private Subscription scheduledSub; | |
private final int id; | |
public ObserverAction() { | |
id = idCounter.incrementAndGet(); | |
synchronized (lock) { | |
counter++; | |
scheduledSub = scheduler.schedule(this); | |
subs.add(scheduledSub); | |
} | |
} | |
public boolean removeSub() { | |
synchronized(lock) { | |
if (counter > 1) { | |
counter--; | |
subs.remove(scheduledSub); | |
return true; | |
} | |
else { | |
return false; | |
} | |
} | |
} | |
public void addSub() { | |
synchronized (lock) { | |
if (counter < maxThreads) { | |
new ObserverAction(); | |
} | |
} | |
} | |
public void rescheduleSub(int count) { | |
subs.remove(scheduledSub); | |
Long delay = delayFunc.call((double)count / (double)batchSize); | |
if (delay == null || delay == 0) { | |
scheduledSub = scheduler.schedule(this); | |
} | |
else { | |
scheduledSub = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS); | |
} | |
subs.add(scheduledSub); | |
} | |
@Override | |
public void call() { | |
try { | |
// Get the next batch and send to the observer | |
List<T> t1 = sourceFunc.call(batchSize); | |
observer.onNext(t1); | |
// Can downsize | |
if (t1.size() < batchSize) { | |
if (!removeSub()) { | |
rescheduleSub(t1.size()); | |
} | |
} | |
// Increase reader pool (if not at max) and reschedule this one | |
else { | |
addSub(); | |
rescheduleSub(t1.size()); | |
} | |
} | |
catch (Throwable t) { | |
// Remove all threads except for one | |
if (!removeSub()) { | |
rescheduleSub(0); | |
} | |
} | |
} | |
} | |
private ScheduledSubscription( | |
Func1<Integer, List<T>> sourceFunc, | |
Func1<Double, Long> delayFunc, | |
Observer<? super List<T>> observer, | |
Scheduler scheduler, | |
int batchSize, | |
int maxThreads) { | |
this.sourceFunc = sourceFunc; | |
this.delayFunc = delayFunc; | |
this.scheduler = scheduler; | |
this.observer = observer; | |
this.batchSize = batchSize; | |
this.maxThreads = maxThreads; | |
this.counter = 0; | |
new ObserverAction(); | |
} | |
@Override | |
public void unsubscribe() { | |
subs.unsubscribe(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment