Skip to content

Instantly share code, notes, and snippets.

@zhirzh
Last active August 10, 2020 18:30
Show Gist options
  • Save zhirzh/faff8e33e9e620bc1a6aca4baf00d657 to your computer and use it in GitHub Desktop.
Save zhirzh/faff8e33e9e620bc1a6aca4baf00d657 to your computer and use it in GitHub Desktop.
action queue 500 retry
/**
* run on https://rxviz.com
* Time window must be 10+ seconds
*/
// rxviz setup
console.clear();
const { BehaviorSubject, EMPTY, from, merge, of, Subject } = Rx;
Object.keys(RxOperators).forEach(k => {
if (k !== "window") window[k] = RxOperators[k];
});
const actions = "abcdef";
const action$ = from(actions).pipe(
concatMap(action => of(action).pipe(delay(200)))
);
const sec = 1000;
const retryDelays = [0, 1 * sec, 3 * sec];
// "retry" stream from SyncLeadsAlarm
const retry$ = new Subject();
const output$ = action$.pipe(
concatMap(action =>
of(action).pipe(
mergeMap(async action => {
try {
const x = await req(action);
console.log("RESULT:", x);
return x;
} catch (e) {
if (e.status >= 500) {
throw e;
}
}
}),
retryWhen(err$ =>
merge(
retry$,
err$.pipe(
mergeMap((err, index) =>
index >= retryDelays.length
? EMPTY
: of(err).pipe(delay(retryDelays[index]))
)
)
)
)
)
)
);
// mock SyncLeadsAlarm
setInterval(() => {
retry$.next("retry");
}, 2 * sec);
// mock server status
let serverRunning = false;
setTimeout(() => {
serverRunning = true;
}, 6 * sec);
// mock server request
const req = action => {
return new Promise((resolve, reject) => {
setTimeout(() => {
const x = action === "c" && !serverRunning;
console.log(action, !x);
if (x) {
reject({ status: 500 });
return;
}
resolve(action.toUpperCase());
}, 300);
});
};
output$; // eval output for rxviz
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment