Skip to content

Instantly share code, notes, and snippets.

@pgherveou
Created August 31, 2016 23:34
Show Gist options
  • Save pgherveou/12bb639706906c2b4aec664ed8aacc3e to your computer and use it in GitHub Desktop.
Save pgherveou/12bb639706906c2b4aec664ed8aacc3e to your computer and use it in GitHub Desktop.
pull stream playground in swift
// https://github.com/dominictarr/pull-stream-examples/blob/master/pull.js
import Foundation
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
/// Pull stream events
enum Event<T> {
case next(T)
case error(Error)
case end
}
struct StreamError: Error, CustomStringConvertible {
let description: String
}
// Generic Aliases for Source, Sink and Through streams
typealias CallBack<T> = (Event<T>) -> Void
typealias Source<T> = (_ abort: Bool, CallBack<T>) -> Void
typealias Sink<T> = (Source<T>) -> Void
typealias Through<T,V> = (Source<T>) -> Source<V>
/// source stream that reads from an array.
func fromArray<T> (_ arr: Array<T>) -> Source<T> {
var i = 0
return { abort, cb in
guard !abort && i < arr.count else { return cb(.end) }
let value = arr[i]
i += 1
cb(.next(value))
}
}
/// source stream that emits integers from to infinity
func infinite() -> Source<Int> {
var i = 0
return { abort, cb in
guard !abort else { return cb(.end) }
i += 1
cb(.next(i))
}
}
/// sink stream that log to console
func logger<T> () -> Sink<T> {
return { (source: Source<T>) in
func next(event: Event<T>) {
switch event {
case .error(let err):
print("error: \(err)")
case
.end: print("end")
case
.next(let value): print("next: \(value)")
source(false, next)
}
}
source(false, next)
}
}
/// through stream that maps T to V
func map<T, V>(_ mapper: @escaping (T) -> V) -> (Source<T>) -> Source<V> {
return { (source: Source<T>) in
return { (abort: Bool, cb: CallBack<V>) in
source(abort) { (event: Event<T>) in
switch event {
case .end:
cb(.end)
case .error(let err):
cb(.error(err))
case .next(let value):
cb(.next(mapper(value)))
}
}
}
}
}
/// through stream that flatten [T] -> T
func flatten<T>() -> (Source<Array<T>>) -> Source<T> {
return { (source: Source<Array<T>>) in
var buffer: Array<T> = []
func read (_ abort: Bool, _ cb: CallBack<T>) {
guard buffer.count == 0 else {
let value = buffer.removeFirst()
return cb(.next(value))
}
source(abort) { (event: Event<Array<T>>) in
switch event {
case .end:
cb(.end)
case .error(let err):
cb(.error(err))
case .next(let value):
buffer = value
read(abort, cb)
}
}
}
return read
}
}
/// through stream that abort after n items
func take<T> (_ max: Int) -> (Source<T>) -> Source<T> {
var n = 0
return { (read: Source<T>) in
return { (abort: Bool, cb: CallBack<T>) in
n += 1
read(n > max, cb)
}
}
}
/// through stream that create an error when emitting 42
func makeError42 () -> (Source<Int>) -> Source<Int> {
return { (read: Source<Int>) in
return { (abort: Bool, cb: CallBack<Int>) in
read(abort) { (event: Event<Int>) in
if case .next(let value) = event, value == 42 {
cb(.error(StreamError(description: "error 42")))
} else {
cb(event)
}
}
}
}
}
func toArray<T> (completionBlock: @escaping (Array<T>) -> Void) -> Sink<T> {
return { (source: Source<T>) in
var buffer: Array<T> = []
func next(event: Event<T>) {
switch event {
case .error, .end:
completionBlock(buffer)
case .next(let value):
buffer.append(value)
source(false, next)
}
}
source(false, next)
}
}
/// pull reads from right to left
precedencegroup PullPrecedence {
associativity: right
}
/// create the pull operator
infix operator |>: PullPrecedence
/// pipe a through and a source
func |><T, V>(left: Source<T>, right: Through<T, V>) -> Source<V> {
return right(left)
}
/// pipe two through together
func |><A, B, C>(left: Through<A, B>, right: Through<B, C>) -> (Source<A>) -> Source<C> {
return { (source: Source<A>) in right(left(source)) }
}
/// pipe a source to a sink
func |><T>(left: Source<T>, right: Sink<T>) -> Void {
return right(left)
}
/// pipe a through to a sink
func |><T, V>(left: Through<T, V>, right: Sink<V>) -> (Source<T>) -> Void {
return { (source: Source<T>) in right(left(source)) }
}
print("1/")
fromArray([1, 2, 3, 4])
|> map({ (value: Int) in return value * value })
|> logger()
print("\n2/")
infinite()
|> take(10)
|> logger()
print("\n2 bis/")
infinite()
|> take(100)
|> makeError42()
|> logger()
print("\n3/")
fromArray([ "hello\nworld", "how\nare\nyou\ndoing?"])
|> map({ (value: String) -> Array<String> in
return value.characters.split(separator: "\n").map { String($0) }
})
|> flatten()
|> logger()
print("\n4/")
fromArray([ "hello\nworld", "how\nare\nyou\ndoing?"])
|> map({ (value: String) -> Array<String> in
return value.characters.split(separator: "\n").map { String($0) }
})
|> flatten()
|> toArray() { print($0) }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment