Skip to content

Instantly share code, notes, and snippets.

@mackwic
Created March 16, 2013 17:47
Show Gist options
  • Save mackwic/5177470 to your computer and use it in GitHub Desktop.
Save mackwic/5177470 to your computer and use it in GitHub Desktop.
OUnit patch proposal for parallelism
--- old-ounit/src/oUnitCore.ml 2013-03-16 18:45:34.806295575 +0100
+++ new-ounit/src/oUnitCore.ml 2013-03-16 18:45:34.814295575 +0100
@@ -14,6 +14,11 @@
* Types and global states.
*)
+type runner_type =
+ | Sequential
+ | Threaded
+ | Processus
+
let global_verbose =
OUnitConf.make
"verbose"
@@ -51,19 +56,21 @@
let global_chooser = ref OUnitChooser.simple
+let thread_pool_threshold = 10
+(** Under this limit, create exactly one thread by test *)
+let thread_pool_size = 15
+(** Should be adjusted but depends of too many parameters, make you own tests *)
+
(* Events which can happen during testing *)
-(* Run all tests, report starts, errors, failures, and return the results *)
-let perform_test logger test =
+let report path e = OUnitLogger.report !global_logger (TestEvent (path, e))
- let report path e =
- OUnitLogger.report logger (TestEvent (path, e))
- in
-
- let run_test_case f path =
- let result =
- try
+(* Run all tests, sequential version *)
+let run_all_tests_seq test_cases =
+ let run_test_case f path =
+ let result =
+ try
f ();
RSuccess
with e ->
@@ -73,17 +80,169 @@
else
None
in
- match e with
+ match e with
| Failure s -> RFailure (s, backtrace)
| Skip s -> RSkip s
| Todo s -> RTodo s
| s -> RError (Printexc.to_string s, backtrace)
+ in
+ let position =
+ OUnitLogger.position !global_logger
+ in
+ result, position
+ in
+ let runner (path, f) =
+ let result, position =
+ report path EStart;
+ run_test_case f path
in
- let position =
- OUnitLogger.position logger
+ report path (EResult result);
+ report path EEnd;
+ path, result, position
+ in
+ let rec iter state =
+ match state.tests_planned with
+ | [] ->
+ state.results
+ | _ ->
+ let (path, f) = !global_chooser state in
+ let result = runner (path, f) in
+ iter
+ {
+ results = result :: state.results;
+ tests_planned =
+ List.filter
+ (fun (path', _) -> path <> path')
+ state.tests_planned
+ }
+ in
+ iter {results = []; tests_planned = test_cases}
+
+(* Run all test, threaded version *)
+let run_all_tests_threaded test_cases =
+ (* perform_test.run_test_case equivalent *)
+ let thread_run test_fun = try
+ test_fun (); RSuccess
+ with e -> (* No backtraces because I suspect them to not be thread-safe *)
+ match e with
+ | Failure s -> RFailure (s, None)
+ | Skip s -> RSkip s
+ | Todo s -> RTodo s
+ | s -> RError (Printexc.to_string s, None)
+ in
+
+ (* thread-wide synchronization *)
+ let thread_main (wait_chan, result_chan) =
+ while true do
+ let event = Event.receive wait_chan in
+ let (test_path, test_fun) = Event.sync event in
+ report test_path EStart; (* FIXME *)
+ (* seems a very bad idea as report is NOT thread safe, broking lines in *)
+ (* the log file, maybe more one day. Find an other solution of patch*)
+ (* report *)
+ let test_res = thread_run test_fun in
+ Event.sync (Event.send result_chan (test_path, test_res))
+ done
+ in
+
+ (* application-wide synchronization, end of perfom_test.runner equivalent *)
+ let synchronizer_main (test_number, result_chan, suite_result_chan) =
+ let i = ref test_number and l = ref [] in
+ while !i > 0 do
+ let (path, res) = Event.sync (Event.receive result_chan) in
+ report path (EResult res);
+ report path EEnd;
+ l := (path, res, None)::!l;
+ decr i
+ done;
+ Event.sync (Event.send suite_result_chan !l)
+ in
+
+ (* beginning of preform_test.runn equivalent, wait results from synchronizer *)
+ let rec schedule wait_chan suite_result_chan = function
+ | [] -> Event.sync (Event.receive suite_result_chan)
+ | test::tests_planned ->
+ Event.sync (Event.send wait_chan test);
+ schedule wait_chan suite_result_chan tests_planned
+ in
+ (* init channels to pass values with easy synchronization *)
+ let len = List.length test_cases
+ and wait_chan = Event.new_channel () (* threads will get tests by there *)
+ and result_chan = Event.new_channel () (* and send test result here *)
+ and suite_result_chan = Event.new_channel () (* theses result will be
+ * aggregated here
+ *)
+ in
+ (* then init our threads: a pool, a scheduler that dispatches tests,*)
+ (* and a synchronizer that aggregates result and call the logger *)
+ let pool_size = if len < thread_pool_threshold
+ then len else thread_pool_size
+ in
+ Thread.create synchronizer_main (len, result_chan, suite_result_chan);
+ for i = 0 to pool_size do
+ Thread.create thread_main (wait_chan, result_chan)
+ done;
+ schedule wait_chan suite_result_chan test_cases
+
+
+
+
+let base_port = 32757 (* last 5 md5 of "ocaml" *)
+
+(* Run all tests, processus version *)
+let run_all_tests_process test_cases =
+
+ let mk_sock () =
+ (* we must choose PF_INET as it's the only portable choice *)
+ let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
+ Unix.bind
+ sock
+ (Unix.ADDR_INET (Unix.inet_addr_loopback, base_port));
+ sock
+
+ and mk_process nb =
+ let l = ref [] in
+ for i = 0 to nb do
+ l := (Unix.create_process
+ "oUnitWorker"
+ [| (string_of_int base_port) |]
+ Unix.stdin
+ Unix.stdout
+ Unix.stderr) :: !l
+ done;
+ !l
+
+ (* send a test in one socket *)
+ and send_test (path, test) sock =
+ report path EStart;
+ let data = Marshal.to_string (Some (path, test)) [Marshal.Closures] in
+ if 0 = Unix.write sock data 0 (String.length data)
+ then () (* error handling *)
+ else ()
+
+ (* read a result from one socket then report it *)
+ and recv_test sock =
+ let buff = Buffer.create 4096
+ and str = ref (String.make 4096 (Char.chr 0))
in
- result, position
+ while 0 < Unix.read sock !str 0 (String.length !str) do
+ Buffer.add_string buff !str;
+ str := String.make 4096 (Char.chr 0)
+ done;
+ let (path, res) = Marshal.from_string (Buffer.contents buff) 0 in
+ report path EEnd;
+ (path, res)
+
+ in
+
+ let sock = mk_sock ()
+ and process = mk_process 5
in
+ (* TODO accept on socket, then select on the resulting sockets *)
+ []
+
+(* Run all tests, report starts, errors, failures, and return the results *)
+let perform_test logger test =
let rec flatten_test path acc =
function
| TestCase(f) ->
@@ -96,39 +255,18 @@
((ListItem cnt)::path)
acc t)
acc tests
-
| TestLabel (label, t) ->
flatten_test ((Label label)::path) acc t
in
let test_cases =
List.rev (flatten_test [] [] test)
in
- let runner (path, f) =
- let result, position =
- report path EStart;
- run_test_case f path
- in
- report path (EResult result);
- report path EEnd;
- path, result, position
- in
- let rec iter state =
- match state.tests_planned with
- | [] ->
- state.results
- | _ ->
- let (path, f) = !global_chooser state in
- let result = runner (path, f) in
- iter
- {
- results = result :: state.results;
- tests_planned =
- List.filter
- (fun (path', _) -> path <> path')
- state.tests_planned
- }
- in
- iter {results = []; tests_planned = test_cases}
+ let run_type = Processus in
+ match run_type with (* TODO use a parameter in the function to switch *)
+ | Sequential -> run_all_tests_seq test_cases
+ | Threaded -> run_all_tests_threaded test_cases
+ | Processus -> run_all_tests_process test_cases
+
(* A simple (currently too simple) text based test runner *)
let run_test_tt ?verbose test =
@@ -164,7 +302,7 @@
(* Now start the test *)
let running_time, test_results =
time_fun
- perform_test
+ perform_test
logger
test
in
(***********************************************************************)
(* The OUnit library *)
(* *)
(* Copyright (C) 2002-2008 Maas-Maarten Zeeman. *)
(* Copyright (C) 2010 OCamlCore SARL *)
(* *)
(* See LICENSE for details. *)
(***********************************************************************)
open OUnitUtils
include OUnitTypes
(*
* Types and global states.
*)
type runner_type =
| Sequential
| Threaded
| Processus
let global_verbose =
OUnitConf.make
"verbose"
(fun r -> Arg.Set r)
~printer:string_of_bool
false
"Run test in verbose mode."
let global_output_file =
let pwd = Sys.getcwd () in
let ocamlbuild_dir = Filename.concat pwd "_build" in
let dir =
if Sys.file_exists ocamlbuild_dir && Sys.is_directory ocamlbuild_dir then
ocamlbuild_dir
else
pwd
in
let fn = Filename.concat dir "oUnit.log" in
OUnitConf.make
"output_file"
~arg_string:"fn"
~alternates:["no_output_file",
(fun r -> Arg.Unit (fun () -> r:= None)),
None,
"Prevent to write log in a file."]
~printer:(function
| None -> "<none>"
| Some fn -> Printf.sprintf "%S" fn)
(fun r -> Arg.String (fun s -> r := Some s))
(Some fn)
"Output verbose log in the given file."
(* TODO: remove *)
let global_logger = ref OUnitLogger.null_logger
let global_chooser = ref OUnitChooser.simple
let thread_pool_threshold = 10
(** Under this limit, create exactly one thread by test *)
let thread_pool_size = 15
(** Should be adjusted but depends of too many parameters, make you own tests *)
(* Events which can happen during testing *)
let report path e = OUnitLogger.report !global_logger (TestEvent (path, e))
(* Run all tests, sequential version *)
let run_all_tests_seq test_cases =
let run_test_case f path =
let result =
try
f ();
RSuccess
with e ->
let backtrace =
if Printexc.backtrace_status () then
Some (Printexc.get_backtrace ())
else
None
in
match e with
| Failure s -> RFailure (s, backtrace)
| Skip s -> RSkip s
| Todo s -> RTodo s
| s -> RError (Printexc.to_string s, backtrace)
in
let position =
OUnitLogger.position !global_logger
in
result, position
in
let runner (path, f) =
let result, position =
report path EStart;
run_test_case f path
in
report path (EResult result);
report path EEnd;
path, result, position
in
let rec iter state =
match state.tests_planned with
| [] ->
state.results
| _ ->
let (path, f) = !global_chooser state in
let result = runner (path, f) in
iter
{
results = result :: state.results;
tests_planned =
List.filter
(fun (path', _) -> path <> path')
state.tests_planned
}
in
iter {results = []; tests_planned = test_cases}
(* Run all test, threaded version *)
let run_all_tests_threaded test_cases =
(* perform_test.run_test_case equivalent *)
let thread_run test_fun = try
test_fun (); RSuccess
with e -> (* No backtraces because I suspect them to not be thread-safe *)
match e with
| Failure s -> RFailure (s, None)
| Skip s -> RSkip s
| Todo s -> RTodo s
| s -> RError (Printexc.to_string s, None)
in
(* thread-wide synchronization *)
let thread_main (wait_chan, result_chan) =
while true do
let event = Event.receive wait_chan in
let (test_path, test_fun) = Event.sync event in
report test_path EStart; (* FIXME *)
(* seems a very bad idea as report is NOT thread safe, broking lines in *)
(* the log file, maybe more one day. Find an other solution of patch*)
(* report *)
let test_res = thread_run test_fun in
Event.sync (Event.send result_chan (test_path, test_res))
done
in
(* application-wide synchronization, end of perfom_test.runner equivalent *)
let synchronizer_main (test_number, result_chan, suite_result_chan) =
let i = ref test_number and l = ref [] in
while !i > 0 do
let (path, res) = Event.sync (Event.receive result_chan) in
report path (EResult res);
report path EEnd;
l := (path, res, None)::!l;
decr i
done;
Event.sync (Event.send suite_result_chan !l)
in
(* beginning of preform_test.runn equivalent, wait results from synchronizer *)
let rec schedule wait_chan suite_result_chan = function
| [] -> Event.sync (Event.receive suite_result_chan)
| test::tests_planned ->
Event.sync (Event.send wait_chan test);
schedule wait_chan suite_result_chan tests_planned
in
(* init channels to pass values with easy synchronization *)
let len = List.length test_cases
and wait_chan = Event.new_channel () (* threads will get tests by there *)
and result_chan = Event.new_channel () (* and send test result here *)
and suite_result_chan = Event.new_channel () (* theses result will be
* aggregated here
*)
in
(* then init our threads: a pool, a scheduler that dispatches tests,*)
(* and a synchronizer that aggregates result and call the logger *)
let pool_size = if len < thread_pool_threshold
then len else thread_pool_size
in
Thread.create synchronizer_main (len, result_chan, suite_result_chan);
for i = 0 to pool_size do
Thread.create thread_main (wait_chan, result_chan)
done;
schedule wait_chan suite_result_chan test_cases
let base_port = 32757 (* last 5 md5 of "ocaml" *)
(* Run all tests, processus version *)
let run_all_tests_process test_cases =
let mk_sock () =
(* we must choose PF_INET as it's the only portable choice *)
let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
Unix.bind
sock
(Unix.ADDR_INET (Unix.inet_addr_loopback, base_port));
sock
and mk_process nb =
let l = ref [] in
for i = 0 to nb do
l := (Unix.create_process
"oUnitWorker"
[| (string_of_int base_port) |]
Unix.stdin
Unix.stdout
Unix.stderr) :: !l
done;
!l
(* send a test in one socket *)
and send_test (path, test) sock =
report path EStart;
let data = Marshal.to_string (Some (path, test)) [Marshal.Closures] in
if 0 = Unix.write sock data 0 (String.length data)
then () (* error handling *)
else ()
(* read a result from one socket then report it *)
and recv_test sock =
let buff = Buffer.create 4096
and str = ref (String.make 4096 (Char.chr 0))
in
while 0 < Unix.read sock !str 0 (String.length !str) do
Buffer.add_string buff !str;
str := String.make 4096 (Char.chr 0)
done;
let (path, res) = Marshal.from_string (Buffer.contents buff) 0 in
report path EEnd;
(path, res)
in
let sock = mk_sock ()
and process = mk_process 5
in
(* TODO accept on socket, then select on the resulting sockets *)
[]
(* Run all tests, report starts, errors, failures, and return the results *)
let perform_test logger test =
let rec flatten_test path acc =
function
| TestCase(f) ->
(path, f) :: acc
| TestList (tests) ->
fold_lefti
(fun acc t cnt ->
flatten_test
((ListItem cnt)::path)
acc t)
acc tests
| TestLabel (label, t) ->
flatten_test ((Label label)::path) acc t
in
let test_cases =
List.rev (flatten_test [] [] test)
in
let run_type = Processus in
match run_type with (* TODO use a parameter in the function to switch *)
| Sequential -> run_all_tests_seq test_cases
| Threaded -> run_all_tests_threaded test_cases
| Processus -> run_all_tests_process test_cases
(* A simple (currently too simple) text based test runner *)
let run_test_tt ?verbose test =
let () =
Printexc.record_backtrace true
in
let base_logger =
OUnitLogger.create
(global_output_file ())
(global_verbose ())
OUnitLogger.null_logger
in
let html_logger =
OUnitLoggerHTML.create ()
in
let junit_logger =
OUnitLoggerJUnit.create ()
in
let logger =
OUnitLogger.combine
[base_logger; html_logger; junit_logger]
in
let () =
(* TODO: is it really useful to override this ? *)
global_logger := logger
in
let () =
OUnitConf.dump (OUnitLogger.report logger)
in
(* Now start the test *)
let running_time, test_results =
time_fun
perform_test
logger
test
in
(* Print test report *)
OUnitLogger.report logger
(GlobalEvent
(GResults (running_time, test_results, test_case_count test)));
(* Reset logger. *)
OUnitLogger.close logger;
global_logger := OUnitLogger.null_logger;
(* Return the results possibly for further processing *)
test_results
(* Call this one from you test suites *)
let run_test_tt_main ?(arg_specs=[]) ?(set_verbose=ignore) ?fexit suite =
let fexit =
match fexit with
| Some f -> f
| None ->
(fun test_results ->
if not (was_successful test_results) then
exit 1)
in
let only_test =
OUnitConf.make
"only_test"
~arg_string:"path"
~printer:(fun lst -> String.concat "," (List.map (Printf.sprintf "%S") lst))
(fun r -> Arg.String (fun str -> r := str :: !r))
[]
"Run only the selected tests."
in
let list_test =
OUnitConf.make
"list_test"
(fun r -> Arg.Set r)
~printer:string_of_bool
false
"List tests"
in
let () =
OUnitConf.load arg_specs
in
if list_test () then
begin
List.iter
(fun pth -> print_endline (string_of_path pth))
(OUnitTest.test_case_paths suite)
end
else
begin
let nsuite =
if only_test () = [] then
suite
else
begin
match OUnitTest.test_filter ~skip:true (only_test ()) suite with
| Some test ->
test
| None ->
failwith
(Printf.sprintf
"Filtering test %s lead to no tests."
(String.concat ", " (only_test ())))
end
in
let test_results =
set_verbose (global_verbose ());
run_test_tt ~verbose:(global_verbose ()) nsuite
in
fexit test_results
end
(** OUnitWorker
*
* A OUnitWorker is a single process that wait for a socket, read test, execute
* it, and return the result on the socket
*
* This need to be done in another process because ocaml Threads are not truly
* concurrent. Moreover we cannot use Unix.fork because it's not portable
*)
open OUnitTypes
let (|>) f x = x f
let (|-) f g x = g (f x)
let tap f x = f x; x
let runner socket =
let rec wait_socket sock =
let (socks, _ , _) = (Unix.select [socket] [] [] (-1.)) in
match socks with
| [] -> wait_socket sock
| s_test_cases::l -> s_test_cases
and read_socket sock =
let buff = Buffer.create 4096
and str = ref (String.make 4096 (Char.chr 0))
in
while 0 < Unix.read sock !str 0 4096 do
Buffer.add_string buff !str;
str := String.make 4096 (Char.chr 0)
done;
Buffer.contents buff
and run_test = function
| None -> ("", RSuccess)
| Some (path, test) ->
try
test (); (path, RSuccess)
with e ->
let backtrace =
if Printexc.backtrace_status ()
then Some (Printexc.get_backtrace ())
else None
in
match e with
| Failure s -> (path, RFailure (s, backtrace))
| Skip s -> (path, RSkip s)
| Todo s -> (path, RTodo s)
| s -> (path, RError (Printexc.to_string s, backtrace))
and write_socket sock str =
if 0 = Unix.single_write sock str 0 (String.length str)
then () (* TODO error handling : exit ? log something ? *)
else ()
and quit_if_end = function
| None -> exit 0
| Some _ -> ()
in
while true do
wait_socket socket
|> read_socket
|> (fun s -> Marshal.from_string s 0)
|> tap quit_if_end (* if None is send, exit here *)
|> run_test
|> (fun res -> Marshal.to_string res [Marshal.Closures])
|> (write_socket socket)
done
let _ =
if Array.length Sys.argv < 2
then (print_endline "oUnitWorker is only called by OUnit !"; exit 64)
else
let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
int_of_string Sys.argv.(1)
|> fun s -> (Unix.inet_addr_loopback, s)
|> fun (a,b) -> Unix.ADDR_INET (a,b)
|> (Unix.connect sock);
runner sock
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment