Last active
September 26, 2018 13:30
-
-
Save rizo/681cdf1bebf89bf49ab1d9eba628705a to your computer and use it in GitHub Desktop.
Coroutine pipes in OCaml. Slides: <http://odis.io/talks/ocaml-coro.pdf>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type void | |
(* Core pipe type *) | |
type ('a, 'b, 'r) pipe = | |
| Yield of ('b * (unit -> ('a, 'b, 'r) pipe)) | |
| Await of ('a -> ('a, 'b, 'r) pipe) | |
| Ready of 'r | |
(* Derived pipe types *) | |
type ('b, 'r) producer = (void, 'b, 'r) pipe | |
type ('a, 'r) consumer = ( 'a, void, 'r) pipe | |
type ( 'r) pipeline = (void, void, 'r) pipe | |
(* Monad instance *) | |
let return r = Ready r | |
let rec (>>=) p f = | |
match p with | |
| Yield (b, p') -> Yield (b, fun () -> p' () >>= f) | |
| Await k -> Await (fun a -> k a >>= f) | |
| Ready r -> f r | |
let (>>) p1 p2' = | |
p1 >>= fun _ -> p2' () | |
let rec forever m = | |
m >> fun () -> forever m | |
let rec replicate n m = | |
if n = 0 then return () | |
else m >> fun () -> replicate (n - 1) m | |
(* Pipes creation *) | |
let empty = Ready () | |
let yield b = Yield (b, fun () -> empty) | |
let await = Await (fun b -> Ready b) | |
(* Category instance *) | |
let id = Await (fun a -> yield a) | |
let rec compose d u = | |
match d, u with | |
| Ready r , _ -> Ready r | |
| Yield (b, d') , _ -> Yield (b, fun () -> compose (d' ()) u) | |
| Await k , Yield (b, u') -> compose (k b) (u' ()) | |
| Await _ , Await k -> Await (fun a -> compose d (k a)) | |
| _ , Ready r -> Ready r | |
let (<-<) d u = compose d u | |
let (>->) u d = compose d u | |
(* Consumers *) | |
let next pipe = | |
match pipe with | |
| Yield (a, p') -> Some (a, p' ()) | |
| _ -> None | |
let collect pipe = | |
let rec loop pipe' acc = | |
match next pipe' with | |
| Some (a, rest) -> loop rest (a::acc) | |
| None -> List.rev acc in | |
loop pipe [] | |
let rec fold f acc source = | |
match next source with | |
| Some (a, rest) -> fold f (f acc a) rest | |
| None -> acc | |
let nth n source = | |
let rec loop n source = | |
match next source with | |
| Some (a, rest) -> | |
if n = 0 then Some a | |
else loop (n - 1) rest | |
| None -> None | |
in loop n source | |
let sum source = fold (+) 0 source | |
let length source = fold (fun c _ -> c + 1) 0 source | |
(* Transformers *) | |
let rec map f = | |
await >>= fun a -> yield (f a) >> fun () -> map f | |
let rec filter p = | |
await >>= fun a -> | |
if p a then yield a >> fun () -> filter p | |
else filter p | |
let rec take n = | |
if n = 0 then empty | |
else await >>= fun a -> yield a >> fun () -> take (n - 1) | |
let rec take_while pred = | |
await >>= fun a -> | |
if pred a then yield a >> fun () -> take_while pred | |
else empty | |
let rec drop n = | |
if n = 0 then id | |
else await >>= fun a -> drop (n - 1) | |
let rec drop_while pred = | |
await >>= fun a -> | |
if pred a then drop_while pred | |
else id | |
let slice i j = | |
drop i >-> take (j - i) | |
(* Producers *) | |
let count () = | |
let rec loop n = | |
yield n >> fun () -> loop (n + 1) in | |
loop 0 | |
let rec repeat ?n x = | |
match n with | |
| Some n -> replicate n (yield x) | |
| None -> forever (yield x) | |
let rec iota stop = | |
count () >-> take stop | |
let range start stop = | |
count () >-> take stop >-> drop start | |
let rec file path = | |
let chan = open_in path in | |
let rec loop () = | |
match input_line chan with | |
| line -> yield line >> fun () -> loop () | |
| exception End_of_file -> close_in chan; empty | |
in | |
loop () | |
(* Examples *) | |
let () = | |
let stream = count () | |
>-> take 10 | |
>-> filter (fun n -> n mod 2 = 0) | |
>-> map string_of_int in | |
assert (collect stream = ["0"; "2"; "4"; "6"; "8"]) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(* Pipes usage example | |
* | |
* Problem: | |
* Compute the total bytes sent to a particular host for | |
* successful responses until a given date. | |
* | |
* Build: | |
* $ ocamlbuild successful_bytes_sent.native | |
* | |
* Source: http://ita.ee.lbl.gov/html/contrib/NASA-HTTP.html *) | |
(* String extension *) | |
module String = struct | |
include String | |
let split ?(on=' ') str = | |
let rec indices acc i = | |
try | |
let i = succ (String.index_from str i on) in | |
indices (i::acc) i | |
with Not_found -> | |
(String.length str + 1) :: acc | |
in | |
let is = indices [0] 0 in | |
let rec aux acc = function | |
| last::start::tl -> | |
let w = String.sub str start (last - start - 1) in | |
aux (w::acc) (start::tl) | |
| _ -> acc | |
in | |
aux [] is | |
let starts_with s2 s1 = | |
let len1 = String.length s1 | |
and len2 = String.length s2 in | |
if len1 < len2 then false else | |
let sub = String.sub s1 0 len2 in | |
(sub = s2) | |
end | |
(* Total bytes sent to a host until a given date *) | |
let successful_bytes_sent input ~host ~until:date = | |
let bytes = | |
file input | |
>-> map (String.split ~on:' ') | |
>-> filter (fun entries -> List.nth entries 0 = host) | |
>-> filter (fun entries -> List.nth entries 8 = "200") | |
>-> take_while (fun entries -> List.nth entries 3 |> String.starts_with ("[" ^ date)) | |
>-> map (fun entries -> List.nth entries 9) | |
>-> map int_of_string | |
in | |
fold (+) 0 bytes | |
open Printf | |
let () = | |
let host = "port26.annex2.nwlink.com" in | |
let date = "01/Jul" in | |
let res = successful_bytes_sent "NASA_access_log_Jul95" ~host ~until:date | |
in | |
printf "Total successful bytes sent to %s until %s: %d" host date res |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment