Skip to content

Instantly share code, notes, and snippets.

@finalclass
Created December 11, 2024 11:45
Show Gist options
  • Save finalclass/dc1ebcd30708adc5635eb20f57c3ea81 to your computer and use it in GitHub Desktop.
Save finalclass/dc1ebcd30708adc5635eb20f57c3ea81 to your computer and use it in GitHub Desktop.
H2 server sending and responding with MessagePack
open Base
open Protocol_conv_msgpack
open Protocol_conv_json
module Msg = struct
type t = {message: string}
[@@deriving protocol ~driver:(module Msgpack), protocol ~driver:(module Json)]
end
module type Request = sig
type t
val of_msgpack : Msgpack.t -> (t, Msgpack.error) Result.t
end
module type Response = sig
type t
val to_msgpack : t -> Msgpack.t
end
type ('req, 'res) method_def =
{ name: string
; req_mod: (module Request with type t = 'req)
; res_mod: (module Response with type t = 'res)
; handle_request: 'req -> 'res }
let serve (type req res) ~env ~sw ~port ~(methods : (req, res) method_def list)
=
let net = Eio.Stdenv.net env in
let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, port) in
let socket = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:10 addr in
let error_handler client_address ?request:_ _error start_response =
Eio.traceln "Error in request from:%a" Eio.Net.Sockaddr.pp client_address ;
let response_body = start_response H2.Headers.empty in
H2.Body.Writer.write_string
response_body
"There was an error handling your request.\n" ;
H2.Body.Writer.close response_body
in
let bad_request reqd =
let body = "Invalid method" in
let headers =
H2.Headers.of_list
[ ("content-length", body |> String.length |> Int.to_string)
; ("content-type", "text/plain")
; ("server", "h2-eio") ]
in
Eio.Fiber.fork ~sw @@ fun () ->
H2.Reqd.respond_with_string
reqd
(H2.Response.create ~headers `Bad_request)
body
in
let parse_url reqd f =
let r = H2.Reqd.request reqd in
match r.target |> String.split ~on:'/' with
| [""; "rpc"; method_name] -> (
match
methods
|> List.find ~f:(fun method_def ->
String.equal method_def.name method_name )
with
| Some method_def -> f method_def
| None -> bad_request reqd )
| _ -> bad_request reqd
in
let read_response reqd f =
let chunks = ref [] in
let total_len = ref 0 in
let rec read_response () =
H2.Body.Reader.schedule_read
(H2.Reqd.request_body reqd)
~on_eof:(fun () ->
Stdio.print_endline "[DEBUG] === End of request body ===" ;
Stdio.print_endline
("[DEBUG] Total chunks: " ^ Int.to_string (List.length !chunks)) ;
Stdio.print_endline
("[DEBUG] Total length: " ^ Int.to_string !total_len) ;
let result = Bigstringaf.create !total_len in
let offset = ref 0 in
!chunks
|> List.rev
|> List.iter ~f:(fun chunk ->
let len = Bytes.length chunk in
Bigstringaf.blit_from_bytes
chunk
~src_off:0
result
~dst_off:!offset
~len ;
offset := !offset + len ) ;
chunks := [] ;
total_len := 0 ;
f result )
~on_read:(fun bigstring ~off ~len ->
Stdio.printf "[DEBUG] Reading chunk: offset=%d length=%d\n" off len ;
Stdio.print_endline "" ;
let chunk = Bytes.create len in
Bigstringaf.blit_to_bytes
bigstring
~src_off:off (* Use the provided offset *)
chunk
~dst_off:0
~len ;
let hex_str =
Bytes.sub chunk ~pos:0 ~len
|> Bytes.to_string
|> String.to_list
|> List.map ~f:(fun c -> Printf.sprintf "%02x" (Char.to_int c))
|> String.concat ~sep:" "
in
Stdio.print_endline ("[DEBUG] CHUNK bytes: " ^ hex_str) ;
total_len := !total_len + Bytes.length chunk ;
chunks := chunk :: !chunks ;
read_response () )
in
read_response ()
in
let body_bytes_to_msgpack (body : Bigstringaf.t) : Msgpack.t =
let len = Bigstringaf.length body in
let bytes = Bytes.create len in
Bigstringaf.blit_to_bytes body ~src_off:0 bytes ~dst_off:0 ~len ;
bytes |> Msgpck.Bytes.read |> snd
in
let execute_request
(type req res)
reqd
(method_def : (req, res) method_def)
body
f =
let (module Req) = method_def.req_mod in
let (module Res) = method_def.res_mod in
match body |> body_bytes_to_msgpack |> Req.of_msgpack with
| Ok input ->
Stdio.print_endline "HERE" ;
f (input |> method_def.handle_request |> Res.to_msgpack)
(* f (input |> method_def.handle_request |> Res.to_msgpack) *)
| Error err ->
Stdio.print_endline ("Error: " ^ (err |> Msgpack.error_to_string_hum)) ;
bad_request reqd
in
let request_handler _client_address reqd =
parse_url reqd @@ fun method_def ->
read_response reqd @@ fun body ->
execute_request reqd method_def body @@ fun msgpack_data ->
(* let msgpack_data = Msg.to_msgpack {message= "Hello World pack!"} in *)
let size = Msgpck.size msgpack_data in
let bytes = Bytes.create size in
let len = Msgpck.Bytes.write bytes ~pos:0 msgpack_data in
let bigstring = Bigstringaf.create len in
Bigstringaf.blit_from_bytes bytes ~src_off:0 bigstring ~dst_off:0 ~len ;
let headers =
H2.Headers.of_list
[ ("content-length", len |> Int.to_string)
; ("content-type", "application/x-msgpack")
; ("server", "h2-eio") ]
in
Eio.Fiber.fork ~sw (fun () ->
H2.Reqd.respond_with_bigstring
reqd
(H2.Response.create ~headers `OK)
bigstring )
in
let handler socket addr =
Eio.traceln "[INFO] New connection from:%a" Eio.Net.Sockaddr.pp addr ;
try
Eio.traceln "[DEBUG] Starting HTTP/2 connection handler" ;
H2_eio.Server.create_connection_handler
?config:None (* Use default config *)
~request_handler
~error_handler
~sw
addr
socket ;
Eio.traceln "[INFO] Connection handler completed normally"
with
| exn ->
Eio.traceln
"[ERROR] Connection handler failed: %s\n%s"
(Stdlib.Printexc.to_string exn)
(Stdlib.Printexc.get_backtrace ())
in
let rec loop () =
Eio.traceln "[INFO] Waiting for connection..." ;
Eio.Net.accept_fork
~sw
socket
~on_error:(fun exn ->
Eio.traceln "[ERROR] Accept error: %s" (Stdlib.Printexc.to_string exn) )
handler ;
loop ()
in
loop () |> ignore
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment