Created
June 5, 2021 18:07
-
-
Save gioragutt/9f7658dafbb40c963a9b915b91fa548d to your computer and use it in GitHub Desktop.
I'm an RXJS Pleb please help
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { defer, NEVER, Subject, throwError } from "rxjs"; | |
import { | |
bufferTime, | |
catchError, | |
filter, | |
mapTo, | |
mergeMap, | |
retry, | |
// tap, | |
} from "rxjs/operators"; | |
import type { Risk } from "./types"; | |
import type { upsertInsights as upsertInsightsToGraph } from "./upsertInsights.mutation"; | |
function splitArrayInTwo<T>(arr: T[]): [T[], T[]] { | |
const indexToSplit = arr.length / 2; | |
const firstHalf = arr.slice(0, indexToSplit); | |
const secondHalf = arr.slice(indexToSplit); | |
return [firstHalf, secondHalf]; | |
} | |
export function handleRiskUpdates( | |
updates: Subject<Risk>, | |
upsertInsights: typeof upsertInsightsToGraph | |
) { | |
const graphUpsertRequests$ = new Subject<Risk[]>(); | |
updates.pipe(bufferTime(1000, null, 10)).subscribe(graphUpsertRequests$); | |
// .subscribe((chunks) => graphUpsertRequests$.next(chunks)); | |
return graphUpsertRequests$.pipe( | |
// tap((risks) => { | |
// if (updates.closed && risks.length === 0) { | |
// graphUpsertRequests$.complete(); | |
// } | |
// }), | |
filter((risks) => risks.length > 0), | |
mergeMap((risks) => { | |
// Defer so that the promise will not be reused | |
return defer(() => upsertInsights(risks)).pipe( | |
catchError((err) => { | |
if (err.code === 413) { | |
// If Payload Too Large, make the payload smaller and retry | |
splitArrayInTwo(risks).forEach((chunk) => | |
graphUpsertRequests$.next(chunk) | |
); | |
// NEVER to stop acting on that update | |
return NEVER; | |
} | |
return throwError(err); | |
}), | |
retry(3), | |
catchError(() => NEVER), | |
mapTo(risks) | |
); | |
}) | |
); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { subscribeSpyTo } from "@hirez_io/observer-spy"; | |
import { splitEvery } from "ramda"; | |
import { Subject } from "rxjs"; | |
import { handleRiskUpdates } from "./handleRiskUpdates"; | |
import { asyncFakeTime } from "./testing-utils"; | |
import type { Risk } from "./types"; | |
import type { upsertInsights as upsertInsightsToGraph } from "./upsertInsights.mutation"; | |
let riskId = 0; | |
const createRisk = (): Risk<null> => { | |
riskId++; | |
const risk = `test_risk_${riskId}`; | |
(risk as any).id = risk; | |
return risk as any; | |
}; | |
const upsertInsights: jest.MockedFunction<typeof upsertInsightsToGraph> = | |
jest.fn(); | |
describe("handleRiskUpdates", () => { | |
beforeEach(() => { | |
riskId = 0; | |
upsertInsights.mockClear(); | |
}); | |
it( | |
"happy path", | |
asyncFakeTime(async (flush) => { | |
const updates$ = new Subject<Risk>(); | |
const spy = subscribeSpyTo(handleRiskUpdates(updates$, upsertInsights)); | |
upsertInsights.mockResolvedValue(); | |
const risks = Array.from({ length: 12 }).map(createRisk); | |
const [firstTen, rest] = splitEvery(10, risks); | |
risks.forEach((risk) => updates$.next(risk)); | |
updates$.complete(); | |
await flush(); | |
expect(spy.receivedComplete()).toBeTruthy(); | |
expect(spy.getValues()).toStrictEqual([firstTen, rest]); | |
}) | |
); | |
it( | |
"bla", | |
asyncFakeTime(async (flush) => { | |
const updates$ = new Subject<Risk>(); | |
const spy = subscribeSpyTo(handleRiskUpdates(updates$, upsertInsights)); | |
upsertInsights.mockImplementation(async (_, sentRisks) => { | |
console.log("sending risks", sentRisks); | |
if (sentRisks.length >= 10) { | |
const error = new Error(); | |
Object.assign(error, { code: 413 }); | |
throw error; | |
} | |
}); | |
const risks = Array.from({ length: 12 }).map(createRisk); | |
const firstTen = risks.slice(0, 10); | |
const [firstChunk, secondChunk] = splitEvery(5, firstTen); | |
const rest = risks.slice(10); | |
risks.forEach((risk) => updates$.next(risk)); | |
updates$.complete(); | |
await flush(); | |
expect(upsertInsights).toHaveBeenCalledWith(firstTen); | |
expect(upsertInsights).toHaveBeenCalledWith(firstChunk); | |
expect(upsertInsights).toHaveBeenCalledWith(secondChunk); | |
expect(spy.getValues()).toStrictEqual([rest, firstChunk, secondChunk]); | |
// Should be this with something like concatMap & expand | |
// expect(spy.getValues()).toStrictEqual([firstChunk, secondChunk, rest]); | |
}) | |
); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment