Created
August 31, 2016 23:34
-
-
Save pgherveou/12bb639706906c2b4aec664ed8aacc3e to your computer and use it in GitHub Desktop.
pull stream playground in swift
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// https://github.com/dominictarr/pull-stream-examples/blob/master/pull.js | |
import Foundation | |
import PlaygroundSupport | |
PlaygroundPage.current.needsIndefiniteExecution = true | |
/// Pull stream events | |
enum Event<T> { | |
case next(T) | |
case error(Error) | |
case end | |
} | |
struct StreamError: Error, CustomStringConvertible { | |
let description: String | |
} | |
// Generic Aliases for Source, Sink and Through streams | |
typealias CallBack<T> = (Event<T>) -> Void | |
typealias Source<T> = (_ abort: Bool, CallBack<T>) -> Void | |
typealias Sink<T> = (Source<T>) -> Void | |
typealias Through<T,V> = (Source<T>) -> Source<V> | |
/// source stream that reads from an array. | |
func fromArray<T> (_ arr: Array<T>) -> Source<T> { | |
var i = 0 | |
return { abort, cb in | |
guard !abort && i < arr.count else { return cb(.end) } | |
let value = arr[i] | |
i += 1 | |
cb(.next(value)) | |
} | |
} | |
/// source stream that emits integers from to infinity | |
func infinite() -> Source<Int> { | |
var i = 0 | |
return { abort, cb in | |
guard !abort else { return cb(.end) } | |
i += 1 | |
cb(.next(i)) | |
} | |
} | |
/// sink stream that log to console | |
func logger<T> () -> Sink<T> { | |
return { (source: Source<T>) in | |
func next(event: Event<T>) { | |
switch event { | |
case .error(let err): | |
print("error: \(err)") | |
case | |
.end: print("end") | |
case | |
.next(let value): print("next: \(value)") | |
source(false, next) | |
} | |
} | |
source(false, next) | |
} | |
} | |
/// through stream that maps T to V | |
func map<T, V>(_ mapper: @escaping (T) -> V) -> (Source<T>) -> Source<V> { | |
return { (source: Source<T>) in | |
return { (abort: Bool, cb: CallBack<V>) in | |
source(abort) { (event: Event<T>) in | |
switch event { | |
case .end: | |
cb(.end) | |
case .error(let err): | |
cb(.error(err)) | |
case .next(let value): | |
cb(.next(mapper(value))) | |
} | |
} | |
} | |
} | |
} | |
/// through stream that flatten [T] -> T | |
func flatten<T>() -> (Source<Array<T>>) -> Source<T> { | |
return { (source: Source<Array<T>>) in | |
var buffer: Array<T> = [] | |
func read (_ abort: Bool, _ cb: CallBack<T>) { | |
guard buffer.count == 0 else { | |
let value = buffer.removeFirst() | |
return cb(.next(value)) | |
} | |
source(abort) { (event: Event<Array<T>>) in | |
switch event { | |
case .end: | |
cb(.end) | |
case .error(let err): | |
cb(.error(err)) | |
case .next(let value): | |
buffer = value | |
read(abort, cb) | |
} | |
} | |
} | |
return read | |
} | |
} | |
/// through stream that abort after n items | |
func take<T> (_ max: Int) -> (Source<T>) -> Source<T> { | |
var n = 0 | |
return { (read: Source<T>) in | |
return { (abort: Bool, cb: CallBack<T>) in | |
n += 1 | |
read(n > max, cb) | |
} | |
} | |
} | |
/// through stream that create an error when emitting 42 | |
func makeError42 () -> (Source<Int>) -> Source<Int> { | |
return { (read: Source<Int>) in | |
return { (abort: Bool, cb: CallBack<Int>) in | |
read(abort) { (event: Event<Int>) in | |
if case .next(let value) = event, value == 42 { | |
cb(.error(StreamError(description: "error 42"))) | |
} else { | |
cb(event) | |
} | |
} | |
} | |
} | |
} | |
func toArray<T> (completionBlock: @escaping (Array<T>) -> Void) -> Sink<T> { | |
return { (source: Source<T>) in | |
var buffer: Array<T> = [] | |
func next(event: Event<T>) { | |
switch event { | |
case .error, .end: | |
completionBlock(buffer) | |
case .next(let value): | |
buffer.append(value) | |
source(false, next) | |
} | |
} | |
source(false, next) | |
} | |
} | |
/// pull reads from right to left | |
precedencegroup PullPrecedence { | |
associativity: right | |
} | |
/// create the pull operator | |
infix operator |>: PullPrecedence | |
/// pipe a through and a source | |
func |><T, V>(left: Source<T>, right: Through<T, V>) -> Source<V> { | |
return right(left) | |
} | |
/// pipe two through together | |
func |><A, B, C>(left: Through<A, B>, right: Through<B, C>) -> (Source<A>) -> Source<C> { | |
return { (source: Source<A>) in right(left(source)) } | |
} | |
/// pipe a source to a sink | |
func |><T>(left: Source<T>, right: Sink<T>) -> Void { | |
return right(left) | |
} | |
/// pipe a through to a sink | |
func |><T, V>(left: Through<T, V>, right: Sink<V>) -> (Source<T>) -> Void { | |
return { (source: Source<T>) in right(left(source)) } | |
} | |
print("1/") | |
fromArray([1, 2, 3, 4]) | |
|> map({ (value: Int) in return value * value }) | |
|> logger() | |
print("\n2/") | |
infinite() | |
|> take(10) | |
|> logger() | |
print("\n2 bis/") | |
infinite() | |
|> take(100) | |
|> makeError42() | |
|> logger() | |
print("\n3/") | |
fromArray([ "hello\nworld", "how\nare\nyou\ndoing?"]) | |
|> map({ (value: String) -> Array<String> in | |
return value.characters.split(separator: "\n").map { String($0) } | |
}) | |
|> flatten() | |
|> logger() | |
print("\n4/") | |
fromArray([ "hello\nworld", "how\nare\nyou\ndoing?"]) | |
|> map({ (value: String) -> Array<String> in | |
return value.characters.split(separator: "\n").map { String($0) } | |
}) | |
|> flatten() | |
|> toArray() { print($0) } | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment