Skip to content

Instantly share code, notes, and snippets.

@caiorss
Forked from rizo/pipes.ml
Last active June 7, 2018 08:53
Show Gist options
  • Save caiorss/aceffb85337a8126ab9d938fd7a6f166 to your computer and use it in GitHub Desktop.
Save caiorss/aceffb85337a8126ab9d938fd7a6f166 to your computer and use it in GitHub Desktop.
Quick self-contained demonstration of coroutine pipes in OCaml. - Talk given by Rizo Isrof
(**
Talk given by Rizo Isrof
- https://pusher.com/sessions/meetup/the-realtime-guild/realtime-stream-processing-with-coroutines
- https://www.reddit.com/r/ocaml/comments/66pe0s/stream_processing_with_coroutines_from_c_to_ocaml/
*)
# empty ;;
- : ('a, 'b, unit) pipe = Ready ()
#
Ready 10 ;;
- : ('a, 'b, int) pipe = Ready 10
#
yield 5 >> lazy (yield 10) ;;
- : ('_a, int, unit) pipe = Yield (5, <lazy>)
#
yield 5 >> lazy (yield 10) >> lazy (yield 20) >> lazy (Ready 10) ;;
- : ('_a, int, int) pipe = Yield (5, <lazy>)
#
let expr = yield 5 >> lazy (yield 10) >> lazy (yield 20) >> lazy (Ready 10) ;;
val expr : ('_a, int, int) pipe = Yield (5, <lazy>)
#
collect expr ;;
- : int list = [5; 10; 20]
#
collect ;;
- : ('a, 'b, 'c) pipe -> 'b list = <fun>
#
count() >>> take 10 |> collect ;;
- : int list = [0; 1; 2; 3; 4; 5; 6; 7; 8; 9]
#
count() ;;
- : ('_a, int, 'b) pipe = Yield (0, <lazy>)
#
count() >>> take 10 ;;
- : ('_a, int, unit) pipe = Yield (0, <lazy>)
#
count() >>> take 10 |> collect ;;
- : int list = [0; 1; 2; 3; 4; 5; 6; 7; 8; 9]
#
# list [1; 2; 3; 4; 5; 6] ;;
- : ('_a, int, unit) pipe = Yield (1, <lazy>)
#
list [1; 2; 3; 4; 5; 6] |> collect ;;
- : int list = [1; 2; 3; 4; 5; 6]
#
# file "/etc/lsb-release" ;;
- : ('_a, string, unit) pipe = Yield ("DISTRIB_ID=ManjaroLinux", <lazy>)
#
file "/etc/lsb-release" |> collect ;;
- : string list =
["DISTRIB_ID=ManjaroLinux"; "DISTRIB_RELEASE=17.0.1";
"DISTRIB_CODENAME=Gellivara"; "DISTRIB_DESCRIPTION=\"Manjaro Linux\""]
#
# file "/etc/lsb-release" |> iter printLine ;;
DISTRIB_ID=ManjaroLinux
DISTRIB_RELEASE=17.0.1
DISTRIB_CODENAME=Gellivara
DISTRIB_DESCRIPTION="Manjaro Linux"
- : unit = ()
#
(**
Talk given by Rizo Isrof
- https://pusher.com/sessions/meetup/the-realtime-guild/realtime-stream-processing-with-coroutines
- https://www.reddit.com/r/ocaml/comments/66pe0s/stream_processing_with_coroutines_from_c_to_ocaml/
*)
type ('a, 'b, 'r) pipe =
| Await of ('a -> ('a, 'b, 'r) pipe)
| Yield of ('b * ('a, 'b, 'r) pipe lazy_t)
| Ready of 'r
let empty = Ready ()
let yield b = Yield (b, lazy empty)
let await = Await (fun b -> Ready b)
let return r = Ready r
let rec (>>=) n f =
match n with
| Yield (b, lazy n') -> Yield (b, lazy (n' >>= f))
| Await k -> Await (fun a -> k a >>= f)
| Ready r -> f r
let (>>) n1 n2 =
n1 >>= fun _ -> Lazy.force n2
let rec compose d u =
match d, u with
| Ready r , _ -> Ready r
| Yield (b, lazy d') , _ -> Yield (b, lazy (compose d' u))
| Await k , Yield (b, lazy u') -> compose (k b) u'
| Await _ , Await k -> Await (fun a -> compose d (k a))
| _ , Ready r -> Ready r
let (<<<) d u = compose d u
let (>>>) u d = compose d u
let next pipe =
match pipe with
| Ready _ -> None
| Yield (a, lazy p') -> Some (a, p')
| Await k -> failwith "stream requires more input."
let count () =
let rec loop n =
yield n >> lazy (loop (n + 1)) in
loop 0
let rec map f =
await >>= fun a -> yield (f a) >> lazy (map f)
let rec filter p =
await >>= fun a ->
if p a then yield a >> lazy (filter p)
else filter p
let rec take n =
if n = 0 then empty
else await >>= fun a -> yield a >> lazy (take (n - 1))
let collect pipe =
let rec loop pipe' acc =
match next pipe' with
| Some (a, rest) -> loop rest (a::acc)
| None -> List.rev acc in
loop pipe []
let rec reduce f acc p =
match next p with
| Some (a, pnext) -> reduce f (f acc a) pnext
| None -> acc
let iter fn p = reduce (fun acc x -> fn x) () p
let rec list input =
match input with
| x::xs -> yield x >> lazy (list xs)
| [] -> empty
let file path =
let rec loop chan =
match input_line chan with
| line -> yield line >> lazy (loop chan)
| exception End_of_file -> return (close_in chan)
in
loop (open_in path)
let printLine line = print_string line ; print_newline()
let () =
let stream = count ()
>>> take 10
>>> filter (fun n -> n mod 2 = 0)
>>> map Int.to_string in
assert (collect stream = ["0"; "2"; "4"; "6"; "8"])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment