Skip to content

Instantly share code, notes, and snippets.

@alanshaw
Last active September 22, 2023 23:49
Show Gist options
  • Save alanshaw/591dc7dd54e4f99338a347ef568d6ee9 to your computer and use it in GitHub Desktop.
Save alanshaw/591dc7dd54e4f99338a347ef568d6ee9 to your computer and use it in GitHub Desktop.
Streaming iterables WAT?

Streaming iterables

Your friends from pull stream, but in terms of async iterators.

source it

A "source" is something that can be consumed. It is an iterable object.

const ints = {
  [Symbol.asyncIterator] () {
    let i = 0
    return {
      async next () {
        return { done: false, value: i++ }
      }
    }
  }
}

// or, more succinctly using a generator and for/await:

const ints = (async function * () {
  let i = 0
  while (true) yield i++
})()

In Node.js stream terms this is a "readable".

sink it

A "sink" is something that consumes (or drains) a source. It is a function that takes a source and iterates over it. It optionally returns a value.

const logger = async source => {
  const it = source[Symbol.asyncIterator]()
  while (true) {
    const { done, value } = await it.next()
    if (done) break
    console.log(value) // prints 0, 1, 2, 3...
  }
}

// or, more succinctly using a generator and for/await:

const logger = async source => {
  for await (const chunk of source) {
    console.log(chunk) // prints 0, 1, 2, 3...
  }
}

In Node.js stream terms this is a "writable".

transform it

A "transform" is both a sink and a source where the values it consumes and the values that can be consumed from it are connected in some way. It is a function that takes a source and returns a source.

const doubler = source => {
  return {
    [Symbol.asyncIterator] () {
      const it = source[Symbol.asyncIterator]()
      return {
        async next () {
          const { done, value } = await it.next()
          if (done) return { done }
          return { done, value: value * 2 }
        }
        return () {
          return it.return && it.return()
        }
      }
    }
  }
}

// or, more succinctly using a generator and for/await:

const doubler = source => (async function * () {
  for await (const chunk of source) {
    yield chunk * 2
  }
})()

duplex it

A "duplex" is similar to a transform but the values it consumes are not necessarily connected to the values that can be consumed from it. It is an object with two properties, sink and source.

const duplex = {
  sink: async source => {/* ... */},
  source: { [Symbol.asyncIterator] () {/* ... */} }
}

pipelining

To thread together multiple streaming iterables, you can just call them:

logger(doubler(ints))

This can look a bit back-to-front so you could use a convenience function to "pipe" them together and make the code more readable:

const pipe = (...fns) => {
  let res
  while (fns.length)
    res = fns.shift()(res)
  return res
}

pipe(() => ints, doubler, logger)
// Note, we can await on pipe for the return value of logger:
// const result = await pipe(...)

https://github.com/alanshaw/it-pipe is a utility module that does this.

@alanshaw
Copy link
Author

alanshaw commented Mar 29, 2019

Some notes:

  • The streaming-iterables module is ace, we should totally use it

  • The pipeline function from streaming-iterables does exactly what is mentioned here

  • pipeline doesn't work with duplex objects - pull has magic that allows you to use them in a pipeline. Right now the workaround is:

    // Instead of
    pull(source, duplex, sink)
    
    // We can:
    pipeline(() => duplex.source, sink)
    pipeline(() => source, duplex.sink)
    
    // OR:
    
    pipeline(
      source,
      source => {
        duplex.sink(source) // Missing some error propagation here
        return duplex.source
      },
      sink
    )

    We can build a smarter pipeline that works with duplex and source (so the first item in the pipeline doesn't need a function wrapper).

    EDIT: I did! https://github.com/alanshaw/it-pipe

@dirkmc
Copy link

dirkmc commented Mar 29, 2019

This is super helpful! +1 to using streaming-iterables, I've been using it a bunch, really nice library

@vasco-santos
Copy link

Great insights @alanshaw! Thanks for putting this together!

@alanshaw
Copy link
Author

alanshaw commented Apr 2, 2019

Pipeline that works with duplex objects: https://github.com/alanshaw/it-pipe

@achingbrain
Copy link

What's the value in having duplex objects as well as transforms? Couldn't they just be generators that take an iterable as an arg but then don't yield values in a way that has a 1-1 relation to how they consume values (e.g. is an implementation detail)?

@alanshaw
Copy link
Author

alanshaw commented Aug 8, 2019

@achingbrain that's a transform, it's a special case of a duplex where the two sides are connected. That technique works for outgoing streams but when you have an incoming stream you want to read from, then transform and then write to it doesn't work:

e.g.

pipe(duplex, duplex) // echo server
pipe(duplex, transform, duplex) // save some data, send a response

If duplex was source => newSource (i.e. a transform) my mind breaks thinking about how you'd make an echo server with that or compose it to make a pipeline.

There's also situations where you want to write without being blocked by a read which could happen using a transform.

Some other reasons too? I'm happy to be corrected here, this just seemed most natural. Both pull streams and Node.js streams have the concept of duplex so I feel like it's necessary!

@jacobheun
Copy link

The duplex is necessary, otherwise it's going to be super painful to do anything. When you start getting into handshake scenarios the logic can get pretty convoluted very quickly.

@alanshaw
Copy link
Author

@bmordan yes 😁

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment