Last active
March 4, 2020 02:16
-
-
Save pcurrivan/73872aa87d41fcaeda804b962041d61d to your computer and use it in GitHub Desktop.
Replacement for removed RxJS operator inspectTime
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Observable } from 'rxjs'; | |
// custom RxJS operator | |
// should be equivalent to the inspectTime operator that was removed from RxJS | |
export function inspectTime<T>(delay: number) { | |
return (source: Observable<T>): Observable<T> => { | |
return new Observable<T>(subscriber => { | |
let lastValue: T; | |
const action = () => subscriber.next(lastValue); | |
const throttle = ActionThrottle(action, delay); | |
const subscription = source.subscribe({ | |
next(t) { | |
lastValue = t; | |
throttle.enqueue(); | |
}, | |
error(err) { | |
throttle.dequeue(); | |
subscriber.error(err); | |
}, | |
complete() { | |
throttle.dequeue(); | |
subscriber.complete(); | |
} | |
}); | |
return () => { | |
throttle.dequeue(); | |
subscription.unsubscribe(); | |
}; | |
}); | |
}; | |
} | |
/* | |
limits the frequency at which the provided action is executed. | |
call enqueue to execute the action - it will execute either immediately or, if it was executed less than minPeriod_ms in the past, | |
as soon as minPeriod_ms has expired. | |
call dequeue to cancel any pending action. | |
*/ | |
function ActionThrottle(action, minPeriod_ms) { | |
let blocked = false; | |
let queued = false; | |
function enqueue() { | |
if (!blocked) { | |
blockAndExecute(); | |
} else { | |
queued = true; | |
} | |
} | |
function dequeue() { | |
queued = false; | |
} | |
function blockAndExecute() { | |
blocked = true; | |
setTimeout(unblock, minPeriod_ms); | |
action(); | |
} | |
function unblock() { | |
if (queued) { | |
dequeue(); | |
blockAndExecute(); | |
} else { | |
blocked = false; | |
} | |
} | |
return { | |
enqueue: enqueue, | |
dequeue: dequeue | |
}; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment