-
-
Save caiorss/aceffb85337a8126ab9d938fd7a6f166 to your computer and use it in GitHub Desktop.
Quick self-contained demonstration of coroutine pipes in OCaml. - Talk given by Rizo Isrof
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(** | |
Talk given by Rizo Isrof | |
- https://pusher.com/sessions/meetup/the-realtime-guild/realtime-stream-processing-with-coroutines | |
- https://www.reddit.com/r/ocaml/comments/66pe0s/stream_processing_with_coroutines_from_c_to_ocaml/ | |
*) | |
# empty ;; | |
- : ('a, 'b, unit) pipe = Ready () | |
# | |
Ready 10 ;; | |
- : ('a, 'b, int) pipe = Ready 10 | |
# | |
yield 5 >> lazy (yield 10) ;; | |
- : ('_a, int, unit) pipe = Yield (5, <lazy>) | |
# | |
yield 5 >> lazy (yield 10) >> lazy (yield 20) >> lazy (Ready 10) ;; | |
- : ('_a, int, int) pipe = Yield (5, <lazy>) | |
# | |
let expr = yield 5 >> lazy (yield 10) >> lazy (yield 20) >> lazy (Ready 10) ;; | |
val expr : ('_a, int, int) pipe = Yield (5, <lazy>) | |
# | |
collect expr ;; | |
- : int list = [5; 10; 20] | |
# | |
collect ;; | |
- : ('a, 'b, 'c) pipe -> 'b list = <fun> | |
# | |
count() >>> take 10 |> collect ;; | |
- : int list = [0; 1; 2; 3; 4; 5; 6; 7; 8; 9] | |
# | |
count() ;; | |
- : ('_a, int, 'b) pipe = Yield (0, <lazy>) | |
# | |
count() >>> take 10 ;; | |
- : ('_a, int, unit) pipe = Yield (0, <lazy>) | |
# | |
count() >>> take 10 |> collect ;; | |
- : int list = [0; 1; 2; 3; 4; 5; 6; 7; 8; 9] | |
# | |
# list [1; 2; 3; 4; 5; 6] ;; | |
- : ('_a, int, unit) pipe = Yield (1, <lazy>) | |
# | |
list [1; 2; 3; 4; 5; 6] |> collect ;; | |
- : int list = [1; 2; 3; 4; 5; 6] | |
# | |
# file "/etc/lsb-release" ;; | |
- : ('_a, string, unit) pipe = Yield ("DISTRIB_ID=ManjaroLinux", <lazy>) | |
# | |
file "/etc/lsb-release" |> collect ;; | |
- : string list = | |
["DISTRIB_ID=ManjaroLinux"; "DISTRIB_RELEASE=17.0.1"; | |
"DISTRIB_CODENAME=Gellivara"; "DISTRIB_DESCRIPTION=\"Manjaro Linux\""] | |
# | |
# file "/etc/lsb-release" |> iter printLine ;; | |
DISTRIB_ID=ManjaroLinux | |
DISTRIB_RELEASE=17.0.1 | |
DISTRIB_CODENAME=Gellivara | |
DISTRIB_DESCRIPTION="Manjaro Linux" | |
- : unit = () | |
# |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(** | |
Talk given by Rizo Isrof | |
- https://pusher.com/sessions/meetup/the-realtime-guild/realtime-stream-processing-with-coroutines | |
- https://www.reddit.com/r/ocaml/comments/66pe0s/stream_processing_with_coroutines_from_c_to_ocaml/ | |
*) | |
type ('a, 'b, 'r) pipe = | |
| Await of ('a -> ('a, 'b, 'r) pipe) | |
| Yield of ('b * ('a, 'b, 'r) pipe lazy_t) | |
| Ready of 'r | |
let empty = Ready () | |
let yield b = Yield (b, lazy empty) | |
let await = Await (fun b -> Ready b) | |
let return r = Ready r | |
let rec (>>=) n f = | |
match n with | |
| Yield (b, lazy n') -> Yield (b, lazy (n' >>= f)) | |
| Await k -> Await (fun a -> k a >>= f) | |
| Ready r -> f r | |
let (>>) n1 n2 = | |
n1 >>= fun _ -> Lazy.force n2 | |
let rec compose d u = | |
match d, u with | |
| Ready r , _ -> Ready r | |
| Yield (b, lazy d') , _ -> Yield (b, lazy (compose d' u)) | |
| Await k , Yield (b, lazy u') -> compose (k b) u' | |
| Await _ , Await k -> Await (fun a -> compose d (k a)) | |
| _ , Ready r -> Ready r | |
let (<<<) d u = compose d u | |
let (>>>) u d = compose d u | |
let next pipe = | |
match pipe with | |
| Ready _ -> None | |
| Yield (a, lazy p') -> Some (a, p') | |
| Await k -> failwith "stream requires more input." | |
let count () = | |
let rec loop n = | |
yield n >> lazy (loop (n + 1)) in | |
loop 0 | |
let rec map f = | |
await >>= fun a -> yield (f a) >> lazy (map f) | |
let rec filter p = | |
await >>= fun a -> | |
if p a then yield a >> lazy (filter p) | |
else filter p | |
let rec take n = | |
if n = 0 then empty | |
else await >>= fun a -> yield a >> lazy (take (n - 1)) | |
let collect pipe = | |
let rec loop pipe' acc = | |
match next pipe' with | |
| Some (a, rest) -> loop rest (a::acc) | |
| None -> List.rev acc in | |
loop pipe [] | |
let rec reduce f acc p = | |
match next p with | |
| Some (a, pnext) -> reduce f (f acc a) pnext | |
| None -> acc | |
let iter fn p = reduce (fun acc x -> fn x) () p | |
let rec list input = | |
match input with | |
| x::xs -> yield x >> lazy (list xs) | |
| [] -> empty | |
let file path = | |
let rec loop chan = | |
match input_line chan with | |
| line -> yield line >> lazy (loop chan) | |
| exception End_of_file -> return (close_in chan) | |
in | |
loop (open_in path) | |
let printLine line = print_string line ; print_newline() | |
let () = | |
let stream = count () | |
>>> take 10 | |
>>> filter (fun n -> n mod 2 = 0) | |
>>> map Int.to_string in | |
assert (collect stream = ["0"; "2"; "4"; "6"; "8"]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment