Last active
December 10, 2023 22:31
-
-
Save hubgit/021228f33c5938be71e4346415553866 to your computer and use it in GitHub Desktop.
Reader and Writer web streams for Deno
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { TextLineStream } from 'https://deno.land/[email protected]/streams/mod.ts' | |
// const input = await jsonLinesReader('input.jsonl.gz') | |
// const output = await jsonLinesWriter('output.jsonl.gz') | |
// for await (const item of input) { | |
//// do something | |
// await output.write(item) | |
// } | |
export const jsonLinesReader = async <T>(path: string) => { | |
const file = await Deno.open(path, { | |
read: true, | |
}) | |
return file.readable | |
.pipeThrough(new DecompressionStream('gzip')) | |
.pipeThrough(new TextDecoderStream()) | |
.pipeThrough(new TextLineStream()) | |
.pipeThrough( | |
new TransformStream<string, T>({ | |
transform(chunk, controller) { | |
if (chunk !== '\n') { | |
controller.enqueue(JSON.parse(chunk)) | |
} | |
}, | |
}) | |
) | |
} | |
export const jsonArrayReader = async <T>(path: string) => { | |
const file = await Deno.open(path, { | |
read: true, | |
}) | |
const chunks: string[] = [] | |
return file.readable | |
.pipeThrough(new DecompressionStream('gzip')) | |
.pipeThrough(new TextDecoderStream()) | |
.pipeThrough( | |
new TransformStream<string, T>({ | |
transform: (chunk) => { | |
chunks.push(chunk) | |
}, | |
flush: (controller) => { | |
const data = JSON.parse(chunks.join('')) | |
for (const item of data.items) { | |
controller.enqueue(item) | |
} | |
}, | |
}) | |
) | |
} | |
export const jsonLinesWriter = async (path: string) => { | |
const file = await Deno.open(path, { | |
create: true, | |
write: true, | |
truncate: true, | |
}) | |
const stream = new TransformStream() | |
stream.readable | |
.pipeThrough( | |
new TransformStream({ | |
transform(chunk, controller) { | |
controller.enqueue(JSON.stringify(chunk)) | |
controller.enqueue('\n') | |
}, | |
}) | |
) | |
.pipeThrough(new TextEncoderStream()) | |
.pipeThrough(new CompressionStream('gzip')) | |
.pipeTo(file.writable) | |
return stream.writable.getWriter() | |
} | |
export const tsvLinesWriter = async (path: string) => { | |
const file = await Deno.open(path, { | |
create: true, | |
write: true, | |
truncate: true, | |
}) | |
const stream = new TransformStream() | |
stream.readable | |
.pipeThrough( | |
new TransformStream<string[], string>({ | |
transform: (items, controller) => { | |
controller.enqueue(items.join('\t')) | |
controller.enqueue('\n') | |
}, | |
}) | |
) | |
.pipeThrough(new TextEncoderStream()) | |
.pipeThrough(new CompressionStream('gzip')) | |
.pipeTo(file.writable) | |
return stream.writable.getWriter() | |
} | |
export const textWriter = async (path: string) => { | |
const file = await Deno.open(path, { | |
create: true, | |
write: true, | |
truncate: true, | |
}) | |
const stream = new TransformStream() | |
stream.readable | |
.pipeThrough(new TextEncoderStream()) | |
.pipeThrough(new CompressionStream('gzip')) | |
.pipeTo(file.writable) | |
return stream.writable.getWriter() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment