Created
January 31, 2019 19:01
-
-
Save dinosaure/999ab99834d444dd464820385da42656 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/src/lib_stdlib/dune b/src/lib_stdlib/dune | |
index 9544949..d60e9ac 100644 | |
--- a/src/lib_stdlib/dune | |
+++ b/src/lib_stdlib/dune | |
@@ -8,7 +8,7 @@ | |
re | |
zarith | |
lwt | |
- lwt.log) | |
+ lwt_log) | |
(flags (:standard -safe-string))) | |
(alias | |
diff --git a/vendors/irmin-lmdb/dune b/vendors/irmin-lmdb/dune | |
index 8a51327..1b99707 100644 | |
--- a/vendors/irmin-lmdb/dune | |
+++ b/vendors/irmin-lmdb/dune | |
@@ -1,5 +1,5 @@ | |
(library | |
(name irmin_lmdb) | |
(public_name irmin-lmdb) | |
- (libraries irmin lmdb lwt.unix ocplib-endian) | |
+ (libraries irmin ke lmdb lwt.unix ocplib-endian) | |
(flags (:standard -safe-string))) | |
diff --git a/vendors/irmin-lmdb/irmin_lmdb.ml b/vendors/irmin-lmdb/irmin_lmdb.ml | |
index 21bf6c4..a5511b6 100644 | |
--- a/vendors/irmin-lmdb/irmin_lmdb.ml | |
+++ b/vendors/irmin-lmdb/irmin_lmdb.ml | |
@@ -935,15 +935,19 @@ module Make | |
| true -> Lwt.return () | |
| false -> aux [ks, k, `Gray, 0] | |
+ [@@@warning "-32"] | |
+ | |
let copy_commit t k = | |
Lwt_switch.check t.switch; | |
P.XCommit.unsafe_find t.old_db k >|= Option.get >>= fun v -> | |
let k = P.XCommit.of_key k in | |
- (* we ignore the parents *) | |
+ (* we ignore the parents, XXX(dinosaure): because it's a root commit? *) | |
copy_node t (P.Commit.Val.node v) >>= fun () -> | |
incr_commits t.stats; | |
promote "commit" t k | |
+ [@@@warning "+32"] | |
+ | |
let root repo = | |
let c = repo.P.Repo.config in | |
match Irmin.Private.Conf.get c Conf.root with | |
@@ -982,12 +986,130 @@ module Make | |
end | |
+ type status = | |
+ | Do_scan (* `Gray *) | Do_promotion (* `Black *) | |
+ type action = | |
+ | Promote_and_continue_with of string | Stop | |
+ and value = { key : H.t; derivation : string; status : status ; depth : int } | |
+ and rd_queue = value Queue.t | |
+ and wr_queue = action Queue.t | |
+ and 'a protected = { (* mutable *) value : 'a; mutex : Lwt_mutex.t } | |
+ and context = | |
+ { rd : rd_queue protected | |
+ ; wr : wr_queue protected | |
+ ; gc : Irmin_GC.t } | |
+ | |
+ let scan t value = | |
+ let k' = P.XNode.of_key value.key in | |
+ Irmin_GC.mem t.gc k' >>= function | |
+ | true -> Lwt.return () | |
+ | false -> | |
+ Irmin_GC.Tbl.add t.gc.Irmin_GC.tbl k' ; | |
+ P.XNode.unsafe_find t.gc.Irmin_GC.old_db value.key >|= Option.get >>= fun v -> | |
+ let children = P.Node.Val.list v in | |
+ Irmin_GC.incr_nodes t.gc.Irmin_GC.stats ; | |
+ Irmin_GC.update_width t.gc.Irmin_GC.stats children ; | |
+ Irmin_GC.update_depth t.gc.Irmin_GC.stats value.depth ; | |
+ Lwt_list.iter_p (fun (_, c) -> match c with | |
+ | `Contents (k, _) -> | |
+ let k' = P.XContents.of_key k in | |
+ Lwt_mutex.lock t.wr.mutex >>= fun () -> | |
+ Queue.push (Promote_and_continue_with k') t.wr.value ; | |
+ Lwt_mutex.unlock t.wr.mutex ; | |
+ Lwt.return () | |
+ | `Node k -> | |
+ let k' = P.XNode.of_key k in | |
+ Lwt_mutex.lock t.rd.mutex >>= fun () -> | |
+ Queue.push { key=k; derivation= k'; status= Do_scan; depth= value.depth + 1 } t.rd.value ; | |
+ Lwt_mutex.unlock t.rd.mutex ; | |
+ Lwt.return ()) | |
+ children >>= fun () -> | |
+ Lwt_mutex.lock t.rd.mutex >>= fun () -> | |
+ Queue.push { value with status= Do_promotion } t.rd.value ; | |
+ Lwt.return () | |
+ | |
+ let rec dispatcher context = | |
+ Lwt_mutex.lock context.rd.mutex >>= fun () -> | |
+ Lwt_mutex.lock context.wr.mutex >>= fun () -> | |
+ try | |
+ while (Queue.top context.rd.value).status = Do_promotion | |
+ do let { derivation; _ } = Queue.pop context.rd.value in | |
+ Queue.push (Promote_and_continue_with derivation) context.wr.value done ; | |
+ let value = Queue.pop context.rd.value in | |
+ Lwt_mutex.unlock context.wr.mutex ; | |
+ Lwt_mutex.unlock context.rd.mutex ; | |
+ scan context value >>= fun () -> | |
+ dispatcher context | |
+ with Queue.Empty -> | |
+ Lwt_mutex.unlock context.wr.mutex ; | |
+ Lwt_mutex.unlock context.rd.mutex ; | |
+ Lwt.return () | |
+ | |
+ let rec write_thread context = | |
+ Lwt_mutex.lock context.wr.mutex >>= fun () -> | |
+ match Queue.pop context.wr.value with | |
+ | Stop -> Lwt.return () | |
+ | Promote_and_continue_with k' -> | |
+ Irmin_GC.mem context.gc k' >>= fun exists -> | |
+ (match exists with | |
+ | true -> Lwt.return () | |
+ | false -> | |
+ Irmin_GC.Tbl.add context.gc.Irmin_GC.tbl k'; | |
+ Irmin_GC.incr_contents context.gc.Irmin_GC.stats; | |
+ Irmin_GC.promote "whatever" context.gc k') | |
+ | exception Queue.Empty -> (* sleep? *) write_thread context | |
+ | |
+ let pass gc roots = | |
+ let make_context_from roots = | |
+ let rd_queue = Queue.create () in | |
+ let wr_queue = Queue.create () in | |
+ | |
+ let rec go = function | |
+ | [] -> () | |
+ | (k, k') :: roots -> | |
+ Queue.add { key= k; derivation= k'; status= Do_scan; depth= 0 } rd_queue ; | |
+ go roots in | |
+ go roots; { rd= { value= rd_queue; mutex= Lwt_mutex.create () } | |
+ ; wr= { value= wr_queue; mutex= Lwt_mutex.create () } | |
+ ; gc } in | |
+ let context = make_context_from roots in | |
+ let scan_pool = Lwt_pool.create 4 (fun () -> Lwt.return context) in | |
+ let scan_threads () = | |
+ Lwt.join | |
+ [ Lwt_pool.use scan_pool dispatcher | |
+ ; Lwt_pool.use scan_pool dispatcher | |
+ ; Lwt_pool.use scan_pool dispatcher | |
+ ; Lwt_pool.use scan_pool dispatcher ] | |
+ >>= fun () -> | |
+ Lwt_mutex.lock context.wr.mutex >>= fun () -> | |
+ Queue.push Stop context.wr.value ; | |
+ Lwt_mutex.unlock context.wr.mutex ; | |
+ Lwt.return () in | |
+ Lwt.join [ scan_threads (); write_thread context ] | |
+ | |
+ let copy_root gc k = | |
+ let k' = P.XNode.of_key k in | |
+ Irmin_GC.mem gc k' >>= function | |
+ | true -> Lwt.return () | |
+ | false -> | |
+ Irmin_GC.Tbl.add gc.Irmin_GC.tbl k'; | |
+ pass gc [ k, k' ] >>= fun () -> | |
+ Irmin_GC.promote "root" gc k' | |
+ | |
+ let copy_commit gc k = | |
+ Lwt_switch.check gc.Irmin_GC.switch; | |
+ P.XCommit.unsafe_find gc.Irmin_GC.old_db k >|= Option.get >>= fun v -> | |
+ let k' = P.XCommit.of_key k in | |
+ copy_root gc (P.Commit.Val.node v) >>= fun () -> | |
+ Irmin_GC.incr_commits gc.Irmin_GC.stats; | |
+ Irmin_GC.promote "commit" gc k' | |
+ | |
let promote_all ~(repo:repo) ?before_pivot ~branches t roots = | |
repo.db.gc_mode <- Promotion; | |
let init_time = Unix.gettimeofday () in | |
let last_time = ref init_time in | |
Lwt_list.iteri_s (fun i k -> | |
- Irmin_GC.copy_commit t k >>= fun () -> | |
+ copy_commit t k >>= fun () -> | |
let current_time = Unix.gettimeofday () in | |
if current_time -. !last_time > 5. (* print something every 5s *) | |
|| i = 0 || i = List.length roots - 1 | |
diff --git a/vendors/ocaml-lmdb/src/lmdb_stubs.c b/vendors/ocaml-lmdb/src/lmdb_stubs.c | |
index 9461b2a..19b8ad4 100644 | |
--- a/vendors/ocaml-lmdb/src/lmdb_stubs.c | |
+++ b/vendors/ocaml-lmdb/src/lmdb_stubs.c | |
@@ -10,6 +10,7 @@ | |
#include <caml/memory.h> | |
#include <caml/custom.h> | |
#include <caml/bigarray.h> | |
+#include <caml/threads.h> | |
#include "lmdb.h" | |
@@ -342,7 +343,9 @@ CAMLprim value stub_mdb_get(value txn, value dbi, value key) { | |
k.mv_size = caml_string_length(key); | |
k.mv_data = String_val(key); | |
+ caml_release_runtime_system(); | |
ret = mdb_get(Txn_val(txn), Nativeint_val(dbi), &k, &v); | |
+ caml_acquire_runtime_system(); | |
if (ret) { | |
result = caml_alloc(1, 1); | |
Store_field(result, 0, Val_int(ret)); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment