Skip to content

Instantly share code, notes, and snippets.

@podhmo
Last active April 14, 2025 20:34
Show Gist options
  • Save podhmo/9641a904a65a6b5785f24fffd25cbe55 to your computer and use it in GitHub Desktop.
Save podhmo/9641a904a65a6b5785f24fffd25cbe55 to your computer and use it in GitHub Desktop.
deno streams APIを使ってみたい
import { TextLineStream } from "jsr:@std/[email protected]";
const filename = "README.md";
// cat like
{
using input = await Deno.open(filename);
await input.readable
.pipeTo(Deno.stdout.writable, {
preventClose: true,
});
}
console.log("\n\n");
// cat like with line number
{
using input = await Deno.open(filename);
let i = 1;
for await (
const line of input
.readable.pipeThrough(new TextDecoderStream())
.pipeThrough(new TextLineStream())
) {
console.log(`${i++}: ${line}`);
}
}
console.log("\n\n");
// cat like with line number and pipe
{
let i = 1;
const pipe = new TransformStream({
transform(chunk, controller) {
controller.enqueue(`${i++}: ${chunk}\n`);
},
});
using input = await Deno.open(filename);
await input.readable
.pipeThrough(new TextDecoderStream())
.pipeThrough(new TextLineStream())
.pipeThrough(pipe)
.pipeThrough(new TextEncoderStream())
.pipeTo(Deno.stdout.writable, {
preventClose: true,
});
}
import {
concatReadableStreams,
mergeReadableStreams,
toText,
toTransformStream,
} from "jsr:@std/[email protected]";
// manual writable stream with ReadableStream.from
{
const redable = ReadableStream.from([1, 2, 3, 4, 5]);
const writable = new WritableStream({
write: (chunk) => {
console.log(chunk);
},
});
// use it
await redable.pipeTo(writable);
}
console.log("");
// manual pipe
{
const readable = new ReadableStream({
pull: (controller) => {
for (let i = 0; i < 10; i++) {
controller.enqueue(i);
}
controller.close();
},
});
const writable = new WritableStream({
write: (chunk) => {
console.log(chunk);
},
});
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk * 2);
},
});
// use it
await readable.pipeThrough(transform).pipeTo(writable);
}
console.log("");
// using toText, toTransformStream
{
// concat
{
const stream = concatReadableStreams(
ReadableStream.from([1, 2, 3, 4, 5]),
ReadableStream.from([10, 20, 30, 40, 50]),
).pipeThrough(toTransformStream(
async function* (src) {
for await (const chunk of src) {
yield `${chunk * 2}, `;
}
},
));
// use it
console.log("%o", await toText(stream));
}
// merge
{
const stream = mergeReadableStreams(
ReadableStream.from([1, 2, 3, 4, 5]),
ReadableStream.from([10, 20, 30, 40, 50]),
).pipeThrough(toTransformStream(
async function* (src) {
for await (const chunk of src) {
yield `${chunk * 2}, `;
}
},
));
// use it
console.log("%o", await toText(stream));
}
}

deno streams APIを使ってみたい

  • ReadableStream
    • ReadableStream.from() で array,mapやasync iteratorから変換
    • for awaitでループ出来る
  • TransformStream
    • pipeThrough() で接続
  • WritableStream
    • pipeTo() で接続

references

00 とりあえずファイル入出力

行区切りのstreamが手に入ればうれしい。 stdoutをcloseしないでpipeTo()に渡す方法はないものか?(追記: preventCloseオプションを使えば良いことが分かった)

01 自作でstreamを作ってみる

それぞれのstreamのconstructorを自分で使って作ってみる。

00:
deno run -A $@*.ts
01:
deno run -A $@*.ts

できてなさそうなこと

バックプレッシャーとかenqueueを何回もやるとかはやれていない。あとメモリ効率は意識していないかも?

なぜ直接的な .map() がないのか?

  1. 非同期変換への対応:

    • 配列の map は基本的に同期的な処理を想定しています。各要素に対して関数を適用し、すぐに結果を返します。
    • しかし、ストリーム処理では、チャンクの変換処理自体が非同期になる場合があります(例: チャンクごとに非同期APIを叩く、重い計算をする)。
    • TransformStreamtransform メソッドは Promise を返すことができるため、非同期的な変換処理を自然に組み込めます。単純な map 関数では、この非同期性をどう扱うかという設計上の課題が生じます。
  2. バックプレッシャーの必要性:

    • 配列の map では、すべての要素を処理し終えてから新しい配列が完成します。途中で処理が追いつかなくなる、ということは(メモリ不足を除けば)あまり考えません。
    • ストリームでは、データの生成速度と消費速度が異なる場合があります。変換処理が速すぎると後続の処理が追いつかずメモリを圧迫し、遅すぎると全体の処理が滞ります。
    • TransformStream は内部に独自のバッファ(キュー)と highWaterMark を持ち、ReadableStreamWritableStream の両方の側面を持つことで、パイプライン全体のバックプレッシャー機構に正しく組み込まれるように設計されています。つまり、後続のストリームが詰まっている場合、TransformStream は入力を受け付けなくなり、それがさらに上流の ReadableStream に伝播します。
    • 単純な .map() 関数だけでは、このバックプレッシャーをどう実装・管理するかが不明確になります。TransformStream を使うことで、この複雑な制御を標準化された方法で実現しています。
  3. 1対多、多対1、フィルタリングなどの柔軟な変換:

    • 配列の map は、入力要素1つに対して出力要素1つを返します(1対1変換)。
    • ストリーム処理では、より柔軟な変換が必要になることがあります。
      • 1対多: 1つの入力チャンクから複数の出力チャンクを生成する(例: 大きなJSONオブジェクトを個別のレコードに分割する)。
      • 多対1: 複数の入力チャンクをまとめて1つの出力チャンクを生成する(例: チャンクをバッファリングして一定サイズになったら出力する)。
      • フィルタリング: 条件に合わないチャンクは出力しない(0個の出力)。
    • TransformStreamtransform メソッド内では、controller.enqueue() を複数回呼んだり、一度も呼ばなかったり、内部状態にデータを溜め込んで flush でまとめて出力したりすることで、これらの複雑な変換パターンに対応できます。単純な map 関数ではこれらの表現が困難です。
  4. 状態管理:

    • 変換処理が状態を持つ必要がある場合(例: 行番号を付与する)、TransformStream のインスタンス(または transformer オブジェクト)内で状態を保持するのが自然です。
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment