Skip to content

Instantly share code, notes, and snippets.

@serpent7776
Created September 6, 2025 14:02
Show Gist options
  • Select an option

  • Save serpent7776/f198cd3d53bb66c3dbe6a82803848cbd to your computer and use it in GitHub Desktop.

Select an option

Save serpent7776/f198cd3d53bb66c3dbe6a82803848cbd to your computer and use it in GitHub Desktop.
type influx_config = {
host : string;
database : string;
token : string;
org : string option; (* Optional for InfluxDB 3.0 *)
}
type field_value =
| Float of float
| Int of int64
| Uint of int64
| String of string
| Bool of bool
type data_point = {
measurement : string;
tags : (string * string) list;
fields : (string * field_value) list;
timestamp : int; (* Unix timestamp in seconds *)
}
type t = {
config : influx_config;
mutable buf : Buffer.t;
mutable count : int;
mutable flush_handler : (unit -> unit) -> unit;
}
let default_handler f = f ()
let make ?flush_handler config =
{
config;
buf = Buffer.create 4096;
count = 0;
flush_handler = Option.value ~default:default_handler flush_handler;
}
(* Convert field value to line protocol format *)
let field_value_to_string = function
| Float f -> Printf.sprintf "%g" f
| Uint i ->
assert (i >= 0L);
Printf.sprintf "%Ldu" i
| Int i -> Printf.sprintf "%Ldi" i
| String s -> Printf.sprintf "\"%s\"" (String.escaped s)
| Bool b -> if b then "true" else "false"
let escape_tag_value s =
let buf = Buffer.create (String.length s) in
String.iter
(function ',' | ' ' | '=' -> Buffer.add_char buf '\\' | c -> Buffer.add_char buf c)
s;
Buffer.contents buf
let point_to_line_protocol point =
let measurement = point.measurement in
let tags_str =
if point.tags = [] then ""
else
","
^ String.concat ","
(List.map
(fun (k, v) -> escape_tag_value k ^ "=" ^ escape_tag_value v)
point.tags)
in
let fields_str =
String.concat ","
(List.map (fun (k, v) -> k ^ "=" ^ field_value_to_string v) point.fields)
in
let timestamp_str = " " ^ Int.to_string point.timestamp in
measurement ^ tags_str ^ " " ^ fields_str ^ timestamp_str ^ "\n"
let authenticator =
match Ca_certs.authenticator () with
| Ok auth -> auth
| _ -> failwith "tls certs authenticator not found"
let connect_via_tls url socket =
let tls_config = Tls.Config.client ~authenticator () in
let host =
Uri.host url |> Option.map (fun u -> Domain_name.(host_exn (of_string_exn u)))
in
Tls_eio.client_of_flow ?host tls_config socket
let https_post ~sw ~net ~headers ~body uri =
let body = Cohttp_eio.Body.of_string body in
let headers = Cohttp.Header.of_list headers in
let client = Cohttp_eio.Client.make ~https:None net in
let resp, body = Cohttp_eio.Client.post ~sw ~headers ~body client uri in
let status = Http.Response.status resp in
let headers = Http.Response.headers resp in
let body = Eio.Flow.read_all body in
(status, headers, body)
let write ~sw ~net ~db =
let config = db.config in
let uri = Uri.of_string (Printf.sprintf "%s/api/v2/write" config.host) in
let uri = Uri.add_query_param uri ("bucket", [ config.database ]) in
let uri = Uri.add_query_param uri ("precision", [ "s" ]) in
let uri =
match config.org with
| Some org -> Uri.add_query_param uri ("org", [ org ])
| None -> uri
in
let body = Buffer.contents db.buf in
let headers =
[
("Authorization", "Bearer " ^ config.token);
("Content-Type", "text/plain; charset=utf-8");
("Accept", "application/json");
]
in
let status, _headers, body = https_post ~sw ~net ~headers ~body uri in
match status with
| `No_content | `OK -> Logs.warn (fun f -> f "Data written successfully")
| _ ->
failwith
(Printf.sprintf "Failed to write data: %s - %s"
(Cohttp.Code.string_of_status status)
body)
let flush ~sw ~net ~db =
if Buffer.length db.buf > 0 then
db.flush_handler (fun () ->
write ~sw ~net ~db;
Buffer.clear db.buf;
db.count <- 0)
let store ~sw ~net ~db table ~tags ~fields timestamp =
let line = point_to_line_protocol { measurement = table; tags; fields; timestamp } in
Buffer.add_string db.buf line;
db.count <- db.count + 1;
(* The optimal batch size is 10,000 lines of line protocol or 10 MBs, whichever threshold is met first *)
if db.count >= 10000 then flush ~sw ~net ~db
let uri_append_path uri path =
let new_path = Uri.path uri ^ path in
Uri.with_path uri new_path
let https_get ~sw ~net ~headers uri path params =
let headers = Cohttp.Header.of_list headers in
let client = Cohttp_eio.Client.make ~https:(Some connect_via_tls) net in
let uri = uri_append_path uri path in
let uri = Uri.add_query_params' uri params in
let resp, body = Cohttp_eio.Client.get ~sw ~headers client uri in
let status = Http.Response.status resp in
let headers = Http.Response.headers resp in
let body = Eio.Flow.read_all body in
(status, headers, body)
let query ~sw ~net ~db q =
let config = db.config in
let uri = Uri.of_string (Printf.sprintf "%s/api/v3/query_sql" config.host) in
let uri = Uri.add_query_param uri ("db", [ config.database ]) in
let uri = Uri.add_query_param uri ("q", [ q ]) in
let uri =
match config.org with
| Some org -> Uri.add_query_param uri ("org", [ org ])
| None -> uri
in
let headers =
[
("Authorization", "Bearer " ^ config.token);
("Content-Type", "text/plain; charset=utf-8");
("Accept", "application/json");
]
in
let status, _headers, body = https_get ~sw ~net ~headers uri "" [] in
match status with
| `OK -> Yojson.Basic.from_string body
| _ ->
failwith
(Printf.sprintf "Failed to query: %s - %s"
(Cohttp.Code.string_of_status status)
body)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment