Created
September 6, 2025 14:02
-
-
Save serpent7776/f198cd3d53bb66c3dbe6a82803848cbd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| type influx_config = { | |
| host : string; | |
| database : string; | |
| token : string; | |
| org : string option; (* Optional for InfluxDB 3.0 *) | |
| } | |
| type field_value = | |
| | Float of float | |
| | Int of int64 | |
| | Uint of int64 | |
| | String of string | |
| | Bool of bool | |
| type data_point = { | |
| measurement : string; | |
| tags : (string * string) list; | |
| fields : (string * field_value) list; | |
| timestamp : int; (* Unix timestamp in seconds *) | |
| } | |
| type t = { | |
| config : influx_config; | |
| mutable buf : Buffer.t; | |
| mutable count : int; | |
| mutable flush_handler : (unit -> unit) -> unit; | |
| } | |
| let default_handler f = f () | |
| let make ?flush_handler config = | |
| { | |
| config; | |
| buf = Buffer.create 4096; | |
| count = 0; | |
| flush_handler = Option.value ~default:default_handler flush_handler; | |
| } | |
| (* Convert field value to line protocol format *) | |
| let field_value_to_string = function | |
| | Float f -> Printf.sprintf "%g" f | |
| | Uint i -> | |
| assert (i >= 0L); | |
| Printf.sprintf "%Ldu" i | |
| | Int i -> Printf.sprintf "%Ldi" i | |
| | String s -> Printf.sprintf "\"%s\"" (String.escaped s) | |
| | Bool b -> if b then "true" else "false" | |
| let escape_tag_value s = | |
| let buf = Buffer.create (String.length s) in | |
| String.iter | |
| (function ',' | ' ' | '=' -> Buffer.add_char buf '\\' | c -> Buffer.add_char buf c) | |
| s; | |
| Buffer.contents buf | |
| let point_to_line_protocol point = | |
| let measurement = point.measurement in | |
| let tags_str = | |
| if point.tags = [] then "" | |
| else | |
| "," | |
| ^ String.concat "," | |
| (List.map | |
| (fun (k, v) -> escape_tag_value k ^ "=" ^ escape_tag_value v) | |
| point.tags) | |
| in | |
| let fields_str = | |
| String.concat "," | |
| (List.map (fun (k, v) -> k ^ "=" ^ field_value_to_string v) point.fields) | |
| in | |
| let timestamp_str = " " ^ Int.to_string point.timestamp in | |
| measurement ^ tags_str ^ " " ^ fields_str ^ timestamp_str ^ "\n" | |
| let authenticator = | |
| match Ca_certs.authenticator () with | |
| | Ok auth -> auth | |
| | _ -> failwith "tls certs authenticator not found" | |
| let connect_via_tls url socket = | |
| let tls_config = Tls.Config.client ~authenticator () in | |
| let host = | |
| Uri.host url |> Option.map (fun u -> Domain_name.(host_exn (of_string_exn u))) | |
| in | |
| Tls_eio.client_of_flow ?host tls_config socket | |
| let https_post ~sw ~net ~headers ~body uri = | |
| let body = Cohttp_eio.Body.of_string body in | |
| let headers = Cohttp.Header.of_list headers in | |
| let client = Cohttp_eio.Client.make ~https:None net in | |
| let resp, body = Cohttp_eio.Client.post ~sw ~headers ~body client uri in | |
| let status = Http.Response.status resp in | |
| let headers = Http.Response.headers resp in | |
| let body = Eio.Flow.read_all body in | |
| (status, headers, body) | |
| let write ~sw ~net ~db = | |
| let config = db.config in | |
| let uri = Uri.of_string (Printf.sprintf "%s/api/v2/write" config.host) in | |
| let uri = Uri.add_query_param uri ("bucket", [ config.database ]) in | |
| let uri = Uri.add_query_param uri ("precision", [ "s" ]) in | |
| let uri = | |
| match config.org with | |
| | Some org -> Uri.add_query_param uri ("org", [ org ]) | |
| | None -> uri | |
| in | |
| let body = Buffer.contents db.buf in | |
| let headers = | |
| [ | |
| ("Authorization", "Bearer " ^ config.token); | |
| ("Content-Type", "text/plain; charset=utf-8"); | |
| ("Accept", "application/json"); | |
| ] | |
| in | |
| let status, _headers, body = https_post ~sw ~net ~headers ~body uri in | |
| match status with | |
| | `No_content | `OK -> Logs.warn (fun f -> f "Data written successfully") | |
| | _ -> | |
| failwith | |
| (Printf.sprintf "Failed to write data: %s - %s" | |
| (Cohttp.Code.string_of_status status) | |
| body) | |
| let flush ~sw ~net ~db = | |
| if Buffer.length db.buf > 0 then | |
| db.flush_handler (fun () -> | |
| write ~sw ~net ~db; | |
| Buffer.clear db.buf; | |
| db.count <- 0) | |
| let store ~sw ~net ~db table ~tags ~fields timestamp = | |
| let line = point_to_line_protocol { measurement = table; tags; fields; timestamp } in | |
| Buffer.add_string db.buf line; | |
| db.count <- db.count + 1; | |
| (* The optimal batch size is 10,000 lines of line protocol or 10 MBs, whichever threshold is met first *) | |
| if db.count >= 10000 then flush ~sw ~net ~db | |
| let uri_append_path uri path = | |
| let new_path = Uri.path uri ^ path in | |
| Uri.with_path uri new_path | |
| let https_get ~sw ~net ~headers uri path params = | |
| let headers = Cohttp.Header.of_list headers in | |
| let client = Cohttp_eio.Client.make ~https:(Some connect_via_tls) net in | |
| let uri = uri_append_path uri path in | |
| let uri = Uri.add_query_params' uri params in | |
| let resp, body = Cohttp_eio.Client.get ~sw ~headers client uri in | |
| let status = Http.Response.status resp in | |
| let headers = Http.Response.headers resp in | |
| let body = Eio.Flow.read_all body in | |
| (status, headers, body) | |
| let query ~sw ~net ~db q = | |
| let config = db.config in | |
| let uri = Uri.of_string (Printf.sprintf "%s/api/v3/query_sql" config.host) in | |
| let uri = Uri.add_query_param uri ("db", [ config.database ]) in | |
| let uri = Uri.add_query_param uri ("q", [ q ]) in | |
| let uri = | |
| match config.org with | |
| | Some org -> Uri.add_query_param uri ("org", [ org ]) | |
| | None -> uri | |
| in | |
| let headers = | |
| [ | |
| ("Authorization", "Bearer " ^ config.token); | |
| ("Content-Type", "text/plain; charset=utf-8"); | |
| ("Accept", "application/json"); | |
| ] | |
| in | |
| let status, _headers, body = https_get ~sw ~net ~headers uri "" [] in | |
| match status with | |
| | `OK -> Yojson.Basic.from_string body | |
| | _ -> | |
| failwith | |
| (Printf.sprintf "Failed to query: %s - %s" | |
| (Cohttp.Code.string_of_status status) | |
| body) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment