Last active
February 11, 2019 12:24
-
-
Save sgrove/b2903ed1012f0270f156801f034c7d0c to your computer and use it in GitHub Desktop.
Bad interaction between Async wrapper around ocurl and httpaf
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[@@ocaml.doc " Async support for Curl, see https://github.com/ygrek/ocurl/blob/master/curl_lwt.ml "] | |
module M = Curl.Multi | |
type interrupt = unit Async.Ivar.t | |
type multi = | |
{ | |
mt: Curl.Multi.mt ; | |
all_events: (Unix.file_descr, interrupt list) Hashtbl.t ; | |
wakeners: (Curl.t, Curl.curlCode Async.Ivar.t) Hashtbl.t } | |
let debug = false | |
let create () = | |
let mt = M.create () in | |
let timer_cancel = ref (Async.Ivar.create ()) in | |
let all_events = Hashtbl.create 32 in | |
let wakeners = Hashtbl.create 32 in | |
let fdMap = Hashtbl.create 32 in | |
let fdClosing = Hashtbl.create 32 in | |
let fdCreateAfterClose = Hashtbl.create 32 in | |
let fdState = Hashtbl.create 32 in | |
let finished _s = | |
let rec loop n = | |
match M.remove_finished mt with | |
| None -> () | |
| ((Some ((h, code)))) -> | |
((try | |
let w = Hashtbl.find wakeners h in | |
Hashtbl.remove wakeners h; Async.Ivar.fill w code | |
with | |
| Not_found -> | |
Core.Log.error (("curl_async: orphan handle"))); | |
loop (n + 1)) in | |
loop 0 in | |
let on_readable fd _ = | |
let (_ : int) = M.action mt fd M.EV_IN in | |
finished (("on_readable")) in | |
let on_writable fd _ = | |
let (_ : int) = M.action mt fd M.EV_OUT in | |
finished (("on_writable")) in | |
let on_timer _ = | |
M.action_timeout mt; | |
finished (("on_timer")) in | |
let rec repeating_timer ~interval ~interrupt ~f = | |
let open Async in | |
Deferred.don't_wait_for | |
(choose | |
[choice interrupt (fun () -> ()); | |
choice (after (Core.Time.Span.of_ms (float_of_int interval))) | |
(fun () -> f (); repeating_timer ~interval ~interrupt ~f)]) in | |
let start_new_timer timeout = | |
let open Async in | |
let new_timer_cancel = Ivar.create () in | |
let old_timer_cancel = !timer_cancel in | |
timer_cancel := new_timer_cancel; | |
Ivar.fill old_timer_cancel (); | |
(match timeout with | |
| (-1) -> () | |
| 0 -> on_timer () | |
| interval -> | |
repeating_timer ~interval ~interrupt:(Ivar.read new_timer_cancel) | |
~f:on_timer) in | |
M.set_timer_function mt start_new_timer; | |
(let getFdId fd = Core.Unix.File_descr.to_int fd in | |
let logFdState msg fd = | |
if debug | |
then | |
let state = | |
match Hashtbl.find fdState fd with | |
| `Closed -> (("closed")) | |
| `Watching -> (("watching")) in | |
let inMap = | |
match Hashtbl.find_opt fdMap fd with | |
| Some _ -> true | |
| None -> false in | |
let closing = | |
match Hashtbl.find_opt fdClosing fd with | |
| Some _ -> true | |
| None -> false in | |
let creatingAfterClose = | |
match Hashtbl.find_opt fdCreateAfterClose fd with | |
| Some _ -> true | |
| None -> false in | |
Core.Log.infof | |
(("fd id=%d msg=%s state=%s, inMap=%b, closing=%b, creatingAfterClose=%b")) | |
(getFdId fd) msg state inMap closing creatingAfterClose in | |
let createAsyncFd fd = | |
match Hashtbl.find fdState fd with | |
| `Watching -> | |
(logFdState (("createAsyncFd `Watching")) fd; | |
((Ok | |
(((let open Async in | |
Unix.Fd.create ~avoid_nonblock_if_possible:false | |
((Unix.Fd.Kind.Socket (`Active))) fd | |
(Core.Info.of_string (("curl_async"))))))))) | |
| `Closed -> | |
(logFdState (("createAsyncFd `Closed")) fd; | |
((Error (`Closed)))) in | |
let getAsyncFd fd = | |
let open Async in | |
match Hashtbl.find_opt fdMap fd with | |
| None -> | |
(match Hashtbl.find_opt fdCreateAfterClose fd with | |
| ((Some (deferredAsyncFdIvar))) -> | |
(logFdState (("waiting for createAfterClose")) fd; | |
Ivar.read deferredAsyncFdIvar) | |
| None -> | |
(logFdState (("creating asyncFd")) fd; | |
(let asyncFd = createAsyncFd fd in | |
Hashtbl.add fdMap fd asyncFd; Deferred.return asyncFd))) | |
| ((Some (asyncFd))) -> | |
(match Hashtbl.find_opt fdClosing fd with | |
| None -> | |
(logFdState (("returning found asyncFd")) fd; | |
Deferred.return asyncFd) | |
| ((Some (closingIvar))) -> | |
(logFdState (("waiting for close")) fd; | |
(match Hashtbl.find_opt fdCreateAfterClose fd with | |
| ((Some (deferredAsyncFdIvar))) -> | |
(logFdState (("found createAfterClose")) fd; | |
Ivar.read deferredAsyncFdIvar) | |
| None -> | |
(logFdState (("creating after close finishes")) fd; | |
(let deferredAsyncFdIvar = Ivar.create () in | |
Hashtbl.add fdCreateAfterClose fd deferredAsyncFdIvar; | |
upon (Ivar.read closingIvar) | |
(fun () -> | |
Hashtbl.remove fdCreateAfterClose fd; | |
(let asyncFd = createAsyncFd fd in | |
Hashtbl.add fdMap fd asyncFd; | |
Ivar.fill deferredAsyncFdIvar asyncFd)); | |
Ivar.read deferredAsyncFdIvar))))) in | |
let closeFd unixFd = | |
let open Async in | |
match Hashtbl.find_opt fdMap unixFd with | |
| None -> | |
logFdState (("closeFd, not in map")) unixFd | |
| ((Some (asyncFd))) -> | |
(match Hashtbl.find_opt fdClosing unixFd with | |
| Some _ -> | |
logFdState (("closeFd, already closing")) unixFd | |
| None -> | |
(logFdState (("closeFd closing")) unixFd; | |
(let closingIvar = Ivar.create () in | |
Hashtbl.add fdClosing unixFd closingIvar; | |
(match asyncFd with | |
| Error _ -> | |
logFdState (("closeFd, unixFd is closing")) | |
unixFd | |
| ((Ok (asyncFd))) -> | |
let closingDeferred = | |
try | |
Unix.Fd.close | |
~file_descriptor_handling:Unix.Fd.Do_not_close_file_descriptor | |
asyncFd | |
with | |
| exn -> | |
(logFdState (("close failed")) unixFd; | |
Core.Log.errorf (("Error closing fd exn=%s")) | |
(Core.Exn.to_string exn); | |
raise exn) in | |
ignore | |
(closingDeferred >>| | |
(fun () -> | |
Hashtbl.remove fdMap unixFd; | |
Hashtbl.remove fdClosing unixFd; | |
Ivar.fill closingIvar (); | |
logFdState (("closeFd finished")) | |
unixFd)))))) in | |
let createSocketWatchEventHandler ~readWrite ~unixFd = | |
let open Async in | |
let interrupt = Ivar.create () in | |
Deferred.don't_wait_for | |
(match Hashtbl.find fdState unixFd with | |
| `Closed -> | |
(logFdState (("skip watching for closed fd")) unixFd; | |
Deferred.return ()) | |
| `Watching -> | |
(getAsyncFd unixFd) >>= | |
((fun asyncFd -> | |
logFdState (("watch handler")) unixFd; | |
(match asyncFd with | |
| ((Error (`Closed))) -> | |
(logFdState (("skip watching for closed state (2)")) unixFd; | |
Deferred.return ()) | |
| ((Ok (asyncFd))) -> | |
(logFdState (("watching")) unixFd; | |
(Unix.Fd.interruptible_every_ready_to asyncFd | |
readWrite ~interrupt:(Ivar.read interrupt) | |
(match readWrite with | |
| `Read -> on_readable unixFd | |
| `Write -> on_writable unixFd) ()) | |
>>| | |
((fun res -> | |
match res with | |
| `Bad_fd -> | |
(Core.Log.error | |
(("got `Bad_fd adding async fd watcher in curl_async")); | |
logFdState (("`Bad_fd")) | |
unixFd) | |
| `Unsupported -> | |
(Core.Log.error | |
(("got `Unsupported adding async fd watcher in curl_async")); | |
logFdState (("`Unsupported")) | |
unixFd) | |
| `Closed -> | |
logFdState (("`Closed")) unixFd | |
| `Interrupted -> | |
logFdState (("`Interrupted")) | |
unixFd))))))); | |
interrupt in | |
let processSocketEvent unixFd what = | |
let open Async in | |
(match what with | |
| M.POLL_REMOVE -> Hashtbl.add fdState unixFd `Closed | |
| M.POLL_NONE -> () | |
| M.POLL_IN|M.POLL_OUT|M.POLL_INOUT -> | |
Hashtbl.add fdState unixFd `Watching); | |
logFdState | |
(Printf.sprintf (("processSocketEvent %s")) | |
(match what with | |
| M.POLL_REMOVE -> (("remove")) | |
| M.POLL_NONE -> (("none")) | |
| M.POLL_IN -> (("in")) | |
| M.POLL_OUT -> (("out")) | |
| M.POLL_INOUT -> (("inout")))) | |
unixFd; | |
(match Hashtbl.find_opt all_events unixFd with | |
| ((Some (events))) -> | |
(Hashtbl.remove all_events unixFd; | |
List.iter (fun interrupt -> Ivar.fill interrupt ()) events) | |
| None -> ()); | |
(let events = | |
match what with | |
| M.POLL_REMOVE -> (closeFd unixFd; []) | |
| M.POLL_NONE -> [] | |
| M.POLL_IN -> | |
[createSocketWatchEventHandler ~readWrite:`Read ~unixFd] | |
| M.POLL_OUT -> | |
[createSocketWatchEventHandler ~readWrite:`Write ~unixFd] | |
| M.POLL_INOUT -> | |
[createSocketWatchEventHandler ~readWrite:`Read ~unixFd; | |
createSocketWatchEventHandler ~readWrite:`Write ~unixFd] in | |
match events with | |
| [] -> () | |
| _ -> Hashtbl.add all_events unixFd events) in | |
M.set_socket_function mt processSocketEvent; { mt; all_events; wakeners }) | |
let global = create () | |
let perform h = | |
let t = global in | |
let wakener = Async.Ivar.create () in | |
Hashtbl.add t.wakeners h wakener; M.add t.mt h; Async.Ivar.read wakener |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment