Skip to content

Instantly share code, notes, and snippets.

@Swoorup
Created May 23, 2022 17:09
Show Gist options
  • Save Swoorup/a59dd15b713f78b285c5c2993da5e0e6 to your computer and use it in GitHub Desktop.
Save Swoorup/a59dd15b713f78b285c5c2993da5e0e6 to your computer and use it in GitHub Desktop.
Fetch ndjson stream idea...
import * as S from '@typed/fp/Stream';
import * as E from 'fp-ts/Either';
import { pipe } from 'fp-ts/function';
import * as TE from 'fp-ts/TaskEither';
export function streamFromTaskEither<A>(taskEither: TE.TaskEither<Error, A>): S.Stream<A> {
return pipe(
taskEither,
S.fromTask,
S.chain(E.match(S.throwError, S.of)),
)
}
export function createNdJsonStream<T>(url: string): S.Stream<T> {
const ndjsonDecoder: (raw: Uint8Array) => T[] =
raw =>
new TextDecoder('utf-8')
.decode(raw)
.split('\n')
.filter((v) => v.trim() !== '')
.map((v) => JSON.parse(v)) as T[]
return pipe(
TE.tryCatchK(() => fetch(url, { method: 'GET' }), ((eee: unknown) => eee as Error))(),
TE.map(response => response.body?.getReader()),
TE.chainEitherK(E.fromNullable(() => new Error("failed to initialize streams reader"))),
streamFromTaskEither,
S.chain(reader => {
const readS = S.fromTask(() => reader.read())
return pipe(
false,
S.chainRec((done) => {
if (done) {
return S.empty() as S.Stream<E.Right<Uint8Array>>;
}
else
return pipe(
readS,
S.map(({ done, value }) => done ? E.left(done) : E.right(value))
)
}
)
)
}
),
S.chain(raw => S.mergeArray(ndjsonDecoder(raw).map(S.of))),
S.tap(console.log),
)
}
@Swoorup
Copy link
Author

Swoorup commented May 25, 2022

Requires @typed/fp but it's borked atm.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment