-
-
Save alxhub/0e947fd8ec67854e1bf18b300d2a7b74 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const ACQUIRE_FREQ = 1000; | |
// Attempt to acquire the lock. Returns an Observable that emits the relative | |
// time of the lease if acquired, and undefined if the attempt fails. | |
// | |
// The currentLease parameter gives the current lease time (used to compare | |
// when re-acquiring an already held lock) or is undefined when acquiring for | |
// the first time. | |
function tryAcquire(currentLease?): Observable<number> { | |
// attempt to acquire the lock... | |
} | |
// Acquire the lock once, and emit the current lease time when successful. | |
function acquire(): Observable<number> { | |
return Observable | |
.interval(ACQUIRE_FREQ) // try to acquire every ACQUIRE_FREQ | |
.switchMap(_ => tryAcquire()) // make each attempt | |
.filter(v => !!v) // successful attempts return a truthful value (time of the lease) | |
.take(1); // wait until the first successful attempt | |
} | |
// Hold on to an acquired lock. Requires the current lease time, and emits | |
// false when the lock is lost. | |
function hold(lease: number): Observable<boolean> { | |
// Each input into leaseTimes represents the last lease time that was acquired. | |
// The initial value is the first lease time. | |
let leaseTimes = new BehaviorSubject(lease); | |
return leaseTimes | |
.switchMap(lease => Observable // process each lease time | |
.timer(lease / 2, -1) // wait until the halfway point of the lease | |
.switchMap(_ => tryAcquire(lease))) // and try to re-acquire | |
.map(lease => { | |
if (!!lease) { // successful re-acquire, repeat the cycle | |
leaseTimes.next(lease); | |
return true; | |
} else { | |
return false; // failed to re-acquire, pass failure through via filter | |
} | |
}) | |
.filter(v => !v) // only emit values on failures | |
.take(1); // give up after the first failure | |
} | |
function lock(): Observable<boolean> { | |
return acquire() // wait to acquire the lock | |
.switchMap(lease => Observable.merge( | |
Observable.of(true), // before returning true | |
hold(lease))) // and then trying to hang on to it (which returns false when lost) | |
.repeat(); // repeat this process indefinitely | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment