Skip to content

Instantly share code, notes, and snippets.

@dinosaure
Created January 31, 2019 19:01
Show Gist options
  • Save dinosaure/999ab99834d444dd464820385da42656 to your computer and use it in GitHub Desktop.
Save dinosaure/999ab99834d444dd464820385da42656 to your computer and use it in GitHub Desktop.
diff --git a/src/lib_stdlib/dune b/src/lib_stdlib/dune
index 9544949..d60e9ac 100644
--- a/src/lib_stdlib/dune
+++ b/src/lib_stdlib/dune
@@ -8,7 +8,7 @@
re
zarith
lwt
- lwt.log)
+ lwt_log)
(flags (:standard -safe-string)))
(alias
diff --git a/vendors/irmin-lmdb/dune b/vendors/irmin-lmdb/dune
index 8a51327..1b99707 100644
--- a/vendors/irmin-lmdb/dune
+++ b/vendors/irmin-lmdb/dune
@@ -1,5 +1,5 @@
(library
(name irmin_lmdb)
(public_name irmin-lmdb)
- (libraries irmin lmdb lwt.unix ocplib-endian)
+ (libraries irmin ke lmdb lwt.unix ocplib-endian)
(flags (:standard -safe-string)))
diff --git a/vendors/irmin-lmdb/irmin_lmdb.ml b/vendors/irmin-lmdb/irmin_lmdb.ml
index 21bf6c4..a5511b6 100644
--- a/vendors/irmin-lmdb/irmin_lmdb.ml
+++ b/vendors/irmin-lmdb/irmin_lmdb.ml
@@ -935,15 +935,19 @@ module Make
| true -> Lwt.return ()
| false -> aux [ks, k, `Gray, 0]
+ [@@@warning "-32"]
+
let copy_commit t k =
Lwt_switch.check t.switch;
P.XCommit.unsafe_find t.old_db k >|= Option.get >>= fun v ->
let k = P.XCommit.of_key k in
- (* we ignore the parents *)
+ (* we ignore the parents, XXX(dinosaure): because it's a root commit? *)
copy_node t (P.Commit.Val.node v) >>= fun () ->
incr_commits t.stats;
promote "commit" t k
+ [@@@warning "+32"]
+
let root repo =
let c = repo.P.Repo.config in
match Irmin.Private.Conf.get c Conf.root with
@@ -982,12 +986,130 @@ module Make
end
+ type status =
+ | Do_scan (* `Gray *) | Do_promotion (* `Black *)
+ type action =
+ | Promote_and_continue_with of string | Stop
+ and value = { key : H.t; derivation : string; status : status ; depth : int }
+ and rd_queue = value Queue.t
+ and wr_queue = action Queue.t
+ and 'a protected = { (* mutable *) value : 'a; mutex : Lwt_mutex.t }
+ and context =
+ { rd : rd_queue protected
+ ; wr : wr_queue protected
+ ; gc : Irmin_GC.t }
+
+ let scan t value =
+ let k' = P.XNode.of_key value.key in
+ Irmin_GC.mem t.gc k' >>= function
+ | true -> Lwt.return ()
+ | false ->
+ Irmin_GC.Tbl.add t.gc.Irmin_GC.tbl k' ;
+ P.XNode.unsafe_find t.gc.Irmin_GC.old_db value.key >|= Option.get >>= fun v ->
+ let children = P.Node.Val.list v in
+ Irmin_GC.incr_nodes t.gc.Irmin_GC.stats ;
+ Irmin_GC.update_width t.gc.Irmin_GC.stats children ;
+ Irmin_GC.update_depth t.gc.Irmin_GC.stats value.depth ;
+ Lwt_list.iter_p (fun (_, c) -> match c with
+ | `Contents (k, _) ->
+ let k' = P.XContents.of_key k in
+ Lwt_mutex.lock t.wr.mutex >>= fun () ->
+ Queue.push (Promote_and_continue_with k') t.wr.value ;
+ Lwt_mutex.unlock t.wr.mutex ;
+ Lwt.return ()
+ | `Node k ->
+ let k' = P.XNode.of_key k in
+ Lwt_mutex.lock t.rd.mutex >>= fun () ->
+ Queue.push { key=k; derivation= k'; status= Do_scan; depth= value.depth + 1 } t.rd.value ;
+ Lwt_mutex.unlock t.rd.mutex ;
+ Lwt.return ())
+ children >>= fun () ->
+ Lwt_mutex.lock t.rd.mutex >>= fun () ->
+ Queue.push { value with status= Do_promotion } t.rd.value ;
+ Lwt.return ()
+
+ let rec dispatcher context =
+ Lwt_mutex.lock context.rd.mutex >>= fun () ->
+ Lwt_mutex.lock context.wr.mutex >>= fun () ->
+ try
+ while (Queue.top context.rd.value).status = Do_promotion
+ do let { derivation; _ } = Queue.pop context.rd.value in
+ Queue.push (Promote_and_continue_with derivation) context.wr.value done ;
+ let value = Queue.pop context.rd.value in
+ Lwt_mutex.unlock context.wr.mutex ;
+ Lwt_mutex.unlock context.rd.mutex ;
+ scan context value >>= fun () ->
+ dispatcher context
+ with Queue.Empty ->
+ Lwt_mutex.unlock context.wr.mutex ;
+ Lwt_mutex.unlock context.rd.mutex ;
+ Lwt.return ()
+
+ let rec write_thread context =
+ Lwt_mutex.lock context.wr.mutex >>= fun () ->
+ match Queue.pop context.wr.value with
+ | Stop -> Lwt.return ()
+ | Promote_and_continue_with k' ->
+ Irmin_GC.mem context.gc k' >>= fun exists ->
+ (match exists with
+ | true -> Lwt.return ()
+ | false ->
+ Irmin_GC.Tbl.add context.gc.Irmin_GC.tbl k';
+ Irmin_GC.incr_contents context.gc.Irmin_GC.stats;
+ Irmin_GC.promote "whatever" context.gc k')
+ | exception Queue.Empty -> (* sleep? *) write_thread context
+
+ let pass gc roots =
+ let make_context_from roots =
+ let rd_queue = Queue.create () in
+ let wr_queue = Queue.create () in
+
+ let rec go = function
+ | [] -> ()
+ | (k, k') :: roots ->
+ Queue.add { key= k; derivation= k'; status= Do_scan; depth= 0 } rd_queue ;
+ go roots in
+ go roots; { rd= { value= rd_queue; mutex= Lwt_mutex.create () }
+ ; wr= { value= wr_queue; mutex= Lwt_mutex.create () }
+ ; gc } in
+ let context = make_context_from roots in
+ let scan_pool = Lwt_pool.create 4 (fun () -> Lwt.return context) in
+ let scan_threads () =
+ Lwt.join
+ [ Lwt_pool.use scan_pool dispatcher
+ ; Lwt_pool.use scan_pool dispatcher
+ ; Lwt_pool.use scan_pool dispatcher
+ ; Lwt_pool.use scan_pool dispatcher ]
+ >>= fun () ->
+ Lwt_mutex.lock context.wr.mutex >>= fun () ->
+ Queue.push Stop context.wr.value ;
+ Lwt_mutex.unlock context.wr.mutex ;
+ Lwt.return () in
+ Lwt.join [ scan_threads (); write_thread context ]
+
+ let copy_root gc k =
+ let k' = P.XNode.of_key k in
+ Irmin_GC.mem gc k' >>= function
+ | true -> Lwt.return ()
+ | false ->
+ Irmin_GC.Tbl.add gc.Irmin_GC.tbl k';
+ pass gc [ k, k' ] >>= fun () ->
+ Irmin_GC.promote "root" gc k'
+
+ let copy_commit gc k =
+ Lwt_switch.check gc.Irmin_GC.switch;
+ P.XCommit.unsafe_find gc.Irmin_GC.old_db k >|= Option.get >>= fun v ->
+ let k' = P.XCommit.of_key k in
+ copy_root gc (P.Commit.Val.node v) >>= fun () ->
+ Irmin_GC.incr_commits gc.Irmin_GC.stats;
+ Irmin_GC.promote "commit" gc k'
+
let promote_all ~(repo:repo) ?before_pivot ~branches t roots =
repo.db.gc_mode <- Promotion;
let init_time = Unix.gettimeofday () in
let last_time = ref init_time in
Lwt_list.iteri_s (fun i k ->
- Irmin_GC.copy_commit t k >>= fun () ->
+ copy_commit t k >>= fun () ->
let current_time = Unix.gettimeofday () in
if current_time -. !last_time > 5. (* print something every 5s *)
|| i = 0 || i = List.length roots - 1
diff --git a/vendors/ocaml-lmdb/src/lmdb_stubs.c b/vendors/ocaml-lmdb/src/lmdb_stubs.c
index 9461b2a..19b8ad4 100644
--- a/vendors/ocaml-lmdb/src/lmdb_stubs.c
+++ b/vendors/ocaml-lmdb/src/lmdb_stubs.c
@@ -10,6 +10,7 @@
#include <caml/memory.h>
#include <caml/custom.h>
#include <caml/bigarray.h>
+#include <caml/threads.h>
#include "lmdb.h"
@@ -342,7 +343,9 @@ CAMLprim value stub_mdb_get(value txn, value dbi, value key) {
k.mv_size = caml_string_length(key);
k.mv_data = String_val(key);
+ caml_release_runtime_system();
ret = mdb_get(Txn_val(txn), Nativeint_val(dbi), &k, &v);
+ caml_acquire_runtime_system();
if (ret) {
result = caml_alloc(1, 1);
Store_field(result, 0, Val_int(ret));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment