Created
June 9, 2020 10:07
-
-
Save reznord/128594a5ccbb8923c603c3444d8fddb4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import fetch from "isomorphic-unfetch"; | |
import { | |
makeSubject, | |
share, | |
pipe, | |
merge, | |
filter, | |
buffer, | |
take, | |
fromArray, | |
fromPromise, | |
fromValue, | |
concat, | |
delay, | |
mergeMap, | |
takeUntil, | |
} from "wonka"; | |
export const authExchange = ({ | |
initialDelayMs, | |
maxDelayMs, | |
randomDelay, | |
maxNumberAttempts, | |
retryIf: retryIfOption, | |
}) => { | |
let getNewSession; | |
const MIN_DELAY = initialDelayMs || 1000; | |
const MAX_DELAY = maxDelayMs || 15000; | |
const MAX_ATTEMPTS = maxNumberAttempts || 2; | |
const RANDOM_DELAY = randomDelay || true; | |
const retryIf = retryIfOption || ((err) => err && err.networkError); | |
// this is where the API call to get new session will be. | |
if (retryIf) { | |
const fetchOpts = { | |
method: "PUT", | |
body: JSON.stringify({}), | |
}; | |
getNewSession = (opCtx) => { | |
console.log(opCtx); | |
return fetch(process.env.api_authentication_base_url, fetchOpts) | |
.then((r) => r.json) | |
.then((res) => console.log(res)); | |
}; | |
} | |
return ({ forward, dispatchDebug }) => (ops$) => { | |
let bufferedOps$ = []; | |
const sharedOps$ = pipe(ops$, share); | |
// bufferedOps$ = pipe( | |
// sharedOps$, | |
// buffer(fromPromise(getNewSession)), | |
// take(1), | |
// mergeMap(fromArray), | |
// ); | |
const inputOps$ = pipe(concat([bufferedOps$, sharedOps$]), share); | |
const { source: retry$, next: nextRetryOperation } = makeSubject(); | |
const retryWithBackoff$ = pipe( | |
retry$, | |
mergeMap((op) => { | |
const { key, context } = op; | |
const retryCount = (context.retryCount || 0) + 1; | |
let delayAmount = context.retryDelay || MIN_DELAY; | |
bufferedOps$ = pipe( | |
sharedOps$, | |
buffer(fromPromise(getNewSession(op))), | |
take(1), | |
mergeMap(fromArray), | |
); | |
const backoffFactor = Math.random() + 1.5; | |
// if randomDelay is enabled and it won't exceed the max delay, apply a random | |
// amount to the delay to avoid thundering herd problem | |
if (RANDOM_DELAY && delayAmount * backoffFactor < MAX_DELAY) { | |
delayAmount *= backoffFactor; | |
} | |
// We stop the retries if a teardown event for this operation comes in | |
// But if this event comes through regularly we also stop the retries, since it's | |
// basically the query retrying itself, no backoff should be added! | |
const teardown$ = pipe( | |
inputOps$, | |
filter((opt) => { | |
return ( | |
(opt.operationName === "query" || | |
opt.operationName === "teardown") && | |
opt.key === key | |
); | |
}), | |
); | |
dispatchDebug({ | |
type: "retryAttempt", | |
message: `The operation has failed and a retry has been triggered (${retryCount} / ${MAX_ATTEMPTS})`, | |
operation: op, | |
data: { | |
retryCount, | |
}, | |
}); | |
// Add new retryDelay and retryCount to operation | |
return pipe( | |
fromValue({ | |
...op, | |
context: { | |
...op.context, | |
retryDelay: delayAmount, | |
retryCount, | |
}, | |
}), | |
delay(delayAmount), | |
// Stop retry if a teardown comes in | |
takeUntil(teardown$), | |
); | |
}), | |
); | |
const result$ = pipe( | |
merge([inputOps$, retryWithBackoff$]), | |
forward, | |
share, | |
filter((res) => { | |
// Only retry if the error passes the conditional retryIf function (if passed) | |
// or if the error contains a networkError | |
if (!res.error || !retryIf(res.error)) { | |
return true; | |
} | |
const maxNumberAttemptsExceeded = | |
(res.operation.context.retryCount || 0) >= MAX_ATTEMPTS - 1; | |
if (!maxNumberAttemptsExceeded) { | |
// Send failed responses to be retried by calling next on the retry$ subject | |
// Exclude operations that have been retried more than the specified max | |
nextRetryOperation(res.operation); | |
return false; | |
} | |
dispatchDebug({ | |
type: "retryExhausted", | |
message: | |
"Maximum number of retries has been reached. No further retries will be performed.", | |
operation: res.operation, | |
}); | |
return true; | |
}), | |
); | |
return result$; | |
}; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment