From that @dorus's comment and after talking a while with him about it, I think we can merge the implementation of all the flatten map operators: mergeMap, concatMap, switchMap, exhaustMap, debounce/audiMap (see #1777) into a single operator flexible enough to cover all scenarios. Then each of those operators would just be an alias of that main operator. Plus it may open new possibilities like the debounce/auditMap that doesn't exist yet.
I'll use flatMap
and Queue
because it's hard to find new meaningful names and it's not worth spending too much time on it at this stage.
flatMap<T>(project: (value: T, index: number) => ObservableInput<R>, queue: Queue): OperatorFunction<T, R>;
The new flatMap
operator becomes just some kind of shell doing operator related stuff like managing inner/outer observable but it won't take any decision anymore about what to subscribe to or cancel. Those decisions will be delegated to queue
which is a simple interface decorrelated from rxjs internals to make it accessible for users.
Queue will be composed of 2 functions, analog to push & pop but specific to our case.
Those functions will be hooks called :
- when the source observable emits
- when an inner observable complete.
The algorithm of flatMap will then be as follow :
- subscribe to source
- on
source emits
, depending on the queue implementation, it'll do zero or more of :- run project on an item and subscribe
- cancel a running subscription
- on
inner completion
, depending on the queue implementation, it'll either :- run project on an item and subscribe
- do nothing
- on
source completion
,inner emits
,inner error
andouter unsubscribe
:- current behavior
export interface Queue<T> {
/**
*
* @param item new item emitted by the source
* @param actives list of items corresponding to actives subscriptions running
* @return a tuple with 2 values :
* 1. optional(arguable) item to run project and subscribe to
* 2. optional index of subscription to cancel
*/
onNewItem(item: T, actives: T[]): [T | undefined, number | undefined] | undefined;
/**
*
* @param completed item corresponding to the completed observable
* @return optional, an item (to run project and subscribe to)
*/
onSubComplete(completed: T): T | void;
}
As you can see it's fairly simple (except the tuple maybe), straightforward and easy for users to implements. It's all just about items, and nothing about rx stuff like subscriptions. We can easily build a concurrent queue, buffer queue, priority queue etc...
With those queue
implementations :
const NoQueue = {
onNewItem: item => item,
onSubComplete: () => { }
}
class ConcurrentFifoQueue<T> implements Queue<T> {
private buffer: T[] = [];
constructor(private concurrent = Number.POSITIVE_INFINITY,
private bufferSize = Number.POSITIVE_INFINITY,
private dropRunning = false) {}
onNewItem(item: T, actives: T[]): [T | undefined, number | undefined] | void {
// didn't reach maximum concurrent we can subscribe to item
if (actives.length < this.concurrent)
return [item];
// max concurrent reached, save item on buffer nothing else
if (this.buffer.length < this.bufferSize) {
this.buffer.push(item);
return;
}
// buffer overflow, remove latest item and add the new item
this.buffer.push(item);
const dropItem = this.buffer.shift();
// drop latest running subscription and subscribe to latest buffered item
if (this.dropRunning) {
return [dropItem, actives.length - 1];
}
// drop latest item and do nothing else.
}
onSubComplete(): T | void {
if (this.buffer.length > 0)
return this.buffer.shift();
}
}
We can now express all existings flatten operators as an alias :
- mergeMap:
flatMap(project, NoQueue)
- concatMap:
flatMap(project, new ConcurrentFifoQueue(1))
(default buffersize being infinity). - exhaustMap:
flatMap(project, new ConcurrentFifoQueue(1, 0))
(default drop being drop item). - switchMap:
flatMap(project, new ConcurrentFifoQueue(1, 0, true))
- debounce/auditMap:
flatMap(project, new ConcurrentFifoQueue(1, 1))
Pro :
- Add flexibility and offers new possibilities (debounceMap, priority queue...)
- reduce lib code size
- easier to maintain
Con:
- obviously some perf penality due to the flexibility but should be very minimal (only a bunch of conditonals and array read/write).
onNewItem
signature is not the sexiest API cause of the tuple, I tried removing it but ended up with even more complicated API.
Thanks @Dorus for all your time spend :)