Created
March 16, 2013 17:47
-
-
Save mackwic/5177470 to your computer and use it in GitHub Desktop.
OUnit patch proposal for parallelism
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--- old-ounit/src/oUnitCore.ml 2013-03-16 18:45:34.806295575 +0100 | |
+++ new-ounit/src/oUnitCore.ml 2013-03-16 18:45:34.814295575 +0100 | |
@@ -14,6 +14,11 @@ | |
* Types and global states. | |
*) | |
+type runner_type = | |
+ | Sequential | |
+ | Threaded | |
+ | Processus | |
+ | |
let global_verbose = | |
OUnitConf.make | |
"verbose" | |
@@ -51,19 +56,21 @@ | |
let global_chooser = ref OUnitChooser.simple | |
+let thread_pool_threshold = 10 | |
+(** Under this limit, create exactly one thread by test *) | |
+let thread_pool_size = 15 | |
+(** Should be adjusted but depends of too many parameters, make you own tests *) | |
+ | |
(* Events which can happen during testing *) | |
-(* Run all tests, report starts, errors, failures, and return the results *) | |
-let perform_test logger test = | |
+let report path e = OUnitLogger.report !global_logger (TestEvent (path, e)) | |
- let report path e = | |
- OUnitLogger.report logger (TestEvent (path, e)) | |
- in | |
- | |
- let run_test_case f path = | |
- let result = | |
- try | |
+(* Run all tests, sequential version *) | |
+let run_all_tests_seq test_cases = | |
+ let run_test_case f path = | |
+ let result = | |
+ try | |
f (); | |
RSuccess | |
with e -> | |
@@ -73,17 +80,169 @@ | |
else | |
None | |
in | |
- match e with | |
+ match e with | |
| Failure s -> RFailure (s, backtrace) | |
| Skip s -> RSkip s | |
| Todo s -> RTodo s | |
| s -> RError (Printexc.to_string s, backtrace) | |
+ in | |
+ let position = | |
+ OUnitLogger.position !global_logger | |
+ in | |
+ result, position | |
+ in | |
+ let runner (path, f) = | |
+ let result, position = | |
+ report path EStart; | |
+ run_test_case f path | |
in | |
- let position = | |
- OUnitLogger.position logger | |
+ report path (EResult result); | |
+ report path EEnd; | |
+ path, result, position | |
+ in | |
+ let rec iter state = | |
+ match state.tests_planned with | |
+ | [] -> | |
+ state.results | |
+ | _ -> | |
+ let (path, f) = !global_chooser state in | |
+ let result = runner (path, f) in | |
+ iter | |
+ { | |
+ results = result :: state.results; | |
+ tests_planned = | |
+ List.filter | |
+ (fun (path', _) -> path <> path') | |
+ state.tests_planned | |
+ } | |
+ in | |
+ iter {results = []; tests_planned = test_cases} | |
+ | |
+(* Run all test, threaded version *) | |
+let run_all_tests_threaded test_cases = | |
+ (* perform_test.run_test_case equivalent *) | |
+ let thread_run test_fun = try | |
+ test_fun (); RSuccess | |
+ with e -> (* No backtraces because I suspect them to not be thread-safe *) | |
+ match e with | |
+ | Failure s -> RFailure (s, None) | |
+ | Skip s -> RSkip s | |
+ | Todo s -> RTodo s | |
+ | s -> RError (Printexc.to_string s, None) | |
+ in | |
+ | |
+ (* thread-wide synchronization *) | |
+ let thread_main (wait_chan, result_chan) = | |
+ while true do | |
+ let event = Event.receive wait_chan in | |
+ let (test_path, test_fun) = Event.sync event in | |
+ report test_path EStart; (* FIXME *) | |
+ (* seems a very bad idea as report is NOT thread safe, broking lines in *) | |
+ (* the log file, maybe more one day. Find an other solution of patch*) | |
+ (* report *) | |
+ let test_res = thread_run test_fun in | |
+ Event.sync (Event.send result_chan (test_path, test_res)) | |
+ done | |
+ in | |
+ | |
+ (* application-wide synchronization, end of perfom_test.runner equivalent *) | |
+ let synchronizer_main (test_number, result_chan, suite_result_chan) = | |
+ let i = ref test_number and l = ref [] in | |
+ while !i > 0 do | |
+ let (path, res) = Event.sync (Event.receive result_chan) in | |
+ report path (EResult res); | |
+ report path EEnd; | |
+ l := (path, res, None)::!l; | |
+ decr i | |
+ done; | |
+ Event.sync (Event.send suite_result_chan !l) | |
+ in | |
+ | |
+ (* beginning of preform_test.runn equivalent, wait results from synchronizer *) | |
+ let rec schedule wait_chan suite_result_chan = function | |
+ | [] -> Event.sync (Event.receive suite_result_chan) | |
+ | test::tests_planned -> | |
+ Event.sync (Event.send wait_chan test); | |
+ schedule wait_chan suite_result_chan tests_planned | |
+ in | |
+ (* init channels to pass values with easy synchronization *) | |
+ let len = List.length test_cases | |
+ and wait_chan = Event.new_channel () (* threads will get tests by there *) | |
+ and result_chan = Event.new_channel () (* and send test result here *) | |
+ and suite_result_chan = Event.new_channel () (* theses result will be | |
+ * aggregated here | |
+ *) | |
+ in | |
+ (* then init our threads: a pool, a scheduler that dispatches tests,*) | |
+ (* and a synchronizer that aggregates result and call the logger *) | |
+ let pool_size = if len < thread_pool_threshold | |
+ then len else thread_pool_size | |
+ in | |
+ Thread.create synchronizer_main (len, result_chan, suite_result_chan); | |
+ for i = 0 to pool_size do | |
+ Thread.create thread_main (wait_chan, result_chan) | |
+ done; | |
+ schedule wait_chan suite_result_chan test_cases | |
+ | |
+ | |
+ | |
+ | |
+let base_port = 32757 (* last 5 md5 of "ocaml" *) | |
+ | |
+(* Run all tests, processus version *) | |
+let run_all_tests_process test_cases = | |
+ | |
+ let mk_sock () = | |
+ (* we must choose PF_INET as it's the only portable choice *) | |
+ let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in | |
+ Unix.bind | |
+ sock | |
+ (Unix.ADDR_INET (Unix.inet_addr_loopback, base_port)); | |
+ sock | |
+ | |
+ and mk_process nb = | |
+ let l = ref [] in | |
+ for i = 0 to nb do | |
+ l := (Unix.create_process | |
+ "oUnitWorker" | |
+ [| (string_of_int base_port) |] | |
+ Unix.stdin | |
+ Unix.stdout | |
+ Unix.stderr) :: !l | |
+ done; | |
+ !l | |
+ | |
+ (* send a test in one socket *) | |
+ and send_test (path, test) sock = | |
+ report path EStart; | |
+ let data = Marshal.to_string (Some (path, test)) [Marshal.Closures] in | |
+ if 0 = Unix.write sock data 0 (String.length data) | |
+ then () (* error handling *) | |
+ else () | |
+ | |
+ (* read a result from one socket then report it *) | |
+ and recv_test sock = | |
+ let buff = Buffer.create 4096 | |
+ and str = ref (String.make 4096 (Char.chr 0)) | |
in | |
- result, position | |
+ while 0 < Unix.read sock !str 0 (String.length !str) do | |
+ Buffer.add_string buff !str; | |
+ str := String.make 4096 (Char.chr 0) | |
+ done; | |
+ let (path, res) = Marshal.from_string (Buffer.contents buff) 0 in | |
+ report path EEnd; | |
+ (path, res) | |
+ | |
+ in | |
+ | |
+ let sock = mk_sock () | |
+ and process = mk_process 5 | |
in | |
+ (* TODO accept on socket, then select on the resulting sockets *) | |
+ [] | |
+ | |
+(* Run all tests, report starts, errors, failures, and return the results *) | |
+let perform_test logger test = | |
let rec flatten_test path acc = | |
function | |
| TestCase(f) -> | |
@@ -96,39 +255,18 @@ | |
((ListItem cnt)::path) | |
acc t) | |
acc tests | |
- | |
| TestLabel (label, t) -> | |
flatten_test ((Label label)::path) acc t | |
in | |
let test_cases = | |
List.rev (flatten_test [] [] test) | |
in | |
- let runner (path, f) = | |
- let result, position = | |
- report path EStart; | |
- run_test_case f path | |
- in | |
- report path (EResult result); | |
- report path EEnd; | |
- path, result, position | |
- in | |
- let rec iter state = | |
- match state.tests_planned with | |
- | [] -> | |
- state.results | |
- | _ -> | |
- let (path, f) = !global_chooser state in | |
- let result = runner (path, f) in | |
- iter | |
- { | |
- results = result :: state.results; | |
- tests_planned = | |
- List.filter | |
- (fun (path', _) -> path <> path') | |
- state.tests_planned | |
- } | |
- in | |
- iter {results = []; tests_planned = test_cases} | |
+ let run_type = Processus in | |
+ match run_type with (* TODO use a parameter in the function to switch *) | |
+ | Sequential -> run_all_tests_seq test_cases | |
+ | Threaded -> run_all_tests_threaded test_cases | |
+ | Processus -> run_all_tests_process test_cases | |
+ | |
(* A simple (currently too simple) text based test runner *) | |
let run_test_tt ?verbose test = | |
@@ -164,7 +302,7 @@ | |
(* Now start the test *) | |
let running_time, test_results = | |
time_fun | |
- perform_test | |
+ perform_test | |
logger | |
test | |
in |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(***********************************************************************) | |
(* The OUnit library *) | |
(* *) | |
(* Copyright (C) 2002-2008 Maas-Maarten Zeeman. *) | |
(* Copyright (C) 2010 OCamlCore SARL *) | |
(* *) | |
(* See LICENSE for details. *) | |
(***********************************************************************) | |
open OUnitUtils | |
include OUnitTypes | |
(* | |
* Types and global states. | |
*) | |
type runner_type = | |
| Sequential | |
| Threaded | |
| Processus | |
let global_verbose = | |
OUnitConf.make | |
"verbose" | |
(fun r -> Arg.Set r) | |
~printer:string_of_bool | |
false | |
"Run test in verbose mode." | |
let global_output_file = | |
let pwd = Sys.getcwd () in | |
let ocamlbuild_dir = Filename.concat pwd "_build" in | |
let dir = | |
if Sys.file_exists ocamlbuild_dir && Sys.is_directory ocamlbuild_dir then | |
ocamlbuild_dir | |
else | |
pwd | |
in | |
let fn = Filename.concat dir "oUnit.log" in | |
OUnitConf.make | |
"output_file" | |
~arg_string:"fn" | |
~alternates:["no_output_file", | |
(fun r -> Arg.Unit (fun () -> r:= None)), | |
None, | |
"Prevent to write log in a file."] | |
~printer:(function | |
| None -> "<none>" | |
| Some fn -> Printf.sprintf "%S" fn) | |
(fun r -> Arg.String (fun s -> r := Some s)) | |
(Some fn) | |
"Output verbose log in the given file." | |
(* TODO: remove *) | |
let global_logger = ref OUnitLogger.null_logger | |
let global_chooser = ref OUnitChooser.simple | |
let thread_pool_threshold = 10 | |
(** Under this limit, create exactly one thread by test *) | |
let thread_pool_size = 15 | |
(** Should be adjusted but depends of too many parameters, make you own tests *) | |
(* Events which can happen during testing *) | |
let report path e = OUnitLogger.report !global_logger (TestEvent (path, e)) | |
(* Run all tests, sequential version *) | |
let run_all_tests_seq test_cases = | |
let run_test_case f path = | |
let result = | |
try | |
f (); | |
RSuccess | |
with e -> | |
let backtrace = | |
if Printexc.backtrace_status () then | |
Some (Printexc.get_backtrace ()) | |
else | |
None | |
in | |
match e with | |
| Failure s -> RFailure (s, backtrace) | |
| Skip s -> RSkip s | |
| Todo s -> RTodo s | |
| s -> RError (Printexc.to_string s, backtrace) | |
in | |
let position = | |
OUnitLogger.position !global_logger | |
in | |
result, position | |
in | |
let runner (path, f) = | |
let result, position = | |
report path EStart; | |
run_test_case f path | |
in | |
report path (EResult result); | |
report path EEnd; | |
path, result, position | |
in | |
let rec iter state = | |
match state.tests_planned with | |
| [] -> | |
state.results | |
| _ -> | |
let (path, f) = !global_chooser state in | |
let result = runner (path, f) in | |
iter | |
{ | |
results = result :: state.results; | |
tests_planned = | |
List.filter | |
(fun (path', _) -> path <> path') | |
state.tests_planned | |
} | |
in | |
iter {results = []; tests_planned = test_cases} | |
(* Run all test, threaded version *) | |
let run_all_tests_threaded test_cases = | |
(* perform_test.run_test_case equivalent *) | |
let thread_run test_fun = try | |
test_fun (); RSuccess | |
with e -> (* No backtraces because I suspect them to not be thread-safe *) | |
match e with | |
| Failure s -> RFailure (s, None) | |
| Skip s -> RSkip s | |
| Todo s -> RTodo s | |
| s -> RError (Printexc.to_string s, None) | |
in | |
(* thread-wide synchronization *) | |
let thread_main (wait_chan, result_chan) = | |
while true do | |
let event = Event.receive wait_chan in | |
let (test_path, test_fun) = Event.sync event in | |
report test_path EStart; (* FIXME *) | |
(* seems a very bad idea as report is NOT thread safe, broking lines in *) | |
(* the log file, maybe more one day. Find an other solution of patch*) | |
(* report *) | |
let test_res = thread_run test_fun in | |
Event.sync (Event.send result_chan (test_path, test_res)) | |
done | |
in | |
(* application-wide synchronization, end of perfom_test.runner equivalent *) | |
let synchronizer_main (test_number, result_chan, suite_result_chan) = | |
let i = ref test_number and l = ref [] in | |
while !i > 0 do | |
let (path, res) = Event.sync (Event.receive result_chan) in | |
report path (EResult res); | |
report path EEnd; | |
l := (path, res, None)::!l; | |
decr i | |
done; | |
Event.sync (Event.send suite_result_chan !l) | |
in | |
(* beginning of preform_test.runn equivalent, wait results from synchronizer *) | |
let rec schedule wait_chan suite_result_chan = function | |
| [] -> Event.sync (Event.receive suite_result_chan) | |
| test::tests_planned -> | |
Event.sync (Event.send wait_chan test); | |
schedule wait_chan suite_result_chan tests_planned | |
in | |
(* init channels to pass values with easy synchronization *) | |
let len = List.length test_cases | |
and wait_chan = Event.new_channel () (* threads will get tests by there *) | |
and result_chan = Event.new_channel () (* and send test result here *) | |
and suite_result_chan = Event.new_channel () (* theses result will be | |
* aggregated here | |
*) | |
in | |
(* then init our threads: a pool, a scheduler that dispatches tests,*) | |
(* and a synchronizer that aggregates result and call the logger *) | |
let pool_size = if len < thread_pool_threshold | |
then len else thread_pool_size | |
in | |
Thread.create synchronizer_main (len, result_chan, suite_result_chan); | |
for i = 0 to pool_size do | |
Thread.create thread_main (wait_chan, result_chan) | |
done; | |
schedule wait_chan suite_result_chan test_cases | |
let base_port = 32757 (* last 5 md5 of "ocaml" *) | |
(* Run all tests, processus version *) | |
let run_all_tests_process test_cases = | |
let mk_sock () = | |
(* we must choose PF_INET as it's the only portable choice *) | |
let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in | |
Unix.bind | |
sock | |
(Unix.ADDR_INET (Unix.inet_addr_loopback, base_port)); | |
sock | |
and mk_process nb = | |
let l = ref [] in | |
for i = 0 to nb do | |
l := (Unix.create_process | |
"oUnitWorker" | |
[| (string_of_int base_port) |] | |
Unix.stdin | |
Unix.stdout | |
Unix.stderr) :: !l | |
done; | |
!l | |
(* send a test in one socket *) | |
and send_test (path, test) sock = | |
report path EStart; | |
let data = Marshal.to_string (Some (path, test)) [Marshal.Closures] in | |
if 0 = Unix.write sock data 0 (String.length data) | |
then () (* error handling *) | |
else () | |
(* read a result from one socket then report it *) | |
and recv_test sock = | |
let buff = Buffer.create 4096 | |
and str = ref (String.make 4096 (Char.chr 0)) | |
in | |
while 0 < Unix.read sock !str 0 (String.length !str) do | |
Buffer.add_string buff !str; | |
str := String.make 4096 (Char.chr 0) | |
done; | |
let (path, res) = Marshal.from_string (Buffer.contents buff) 0 in | |
report path EEnd; | |
(path, res) | |
in | |
let sock = mk_sock () | |
and process = mk_process 5 | |
in | |
(* TODO accept on socket, then select on the resulting sockets *) | |
[] | |
(* Run all tests, report starts, errors, failures, and return the results *) | |
let perform_test logger test = | |
let rec flatten_test path acc = | |
function | |
| TestCase(f) -> | |
(path, f) :: acc | |
| TestList (tests) -> | |
fold_lefti | |
(fun acc t cnt -> | |
flatten_test | |
((ListItem cnt)::path) | |
acc t) | |
acc tests | |
| TestLabel (label, t) -> | |
flatten_test ((Label label)::path) acc t | |
in | |
let test_cases = | |
List.rev (flatten_test [] [] test) | |
in | |
let run_type = Processus in | |
match run_type with (* TODO use a parameter in the function to switch *) | |
| Sequential -> run_all_tests_seq test_cases | |
| Threaded -> run_all_tests_threaded test_cases | |
| Processus -> run_all_tests_process test_cases | |
(* A simple (currently too simple) text based test runner *) | |
let run_test_tt ?verbose test = | |
let () = | |
Printexc.record_backtrace true | |
in | |
let base_logger = | |
OUnitLogger.create | |
(global_output_file ()) | |
(global_verbose ()) | |
OUnitLogger.null_logger | |
in | |
let html_logger = | |
OUnitLoggerHTML.create () | |
in | |
let junit_logger = | |
OUnitLoggerJUnit.create () | |
in | |
let logger = | |
OUnitLogger.combine | |
[base_logger; html_logger; junit_logger] | |
in | |
let () = | |
(* TODO: is it really useful to override this ? *) | |
global_logger := logger | |
in | |
let () = | |
OUnitConf.dump (OUnitLogger.report logger) | |
in | |
(* Now start the test *) | |
let running_time, test_results = | |
time_fun | |
perform_test | |
logger | |
test | |
in | |
(* Print test report *) | |
OUnitLogger.report logger | |
(GlobalEvent | |
(GResults (running_time, test_results, test_case_count test))); | |
(* Reset logger. *) | |
OUnitLogger.close logger; | |
global_logger := OUnitLogger.null_logger; | |
(* Return the results possibly for further processing *) | |
test_results | |
(* Call this one from you test suites *) | |
let run_test_tt_main ?(arg_specs=[]) ?(set_verbose=ignore) ?fexit suite = | |
let fexit = | |
match fexit with | |
| Some f -> f | |
| None -> | |
(fun test_results -> | |
if not (was_successful test_results) then | |
exit 1) | |
in | |
let only_test = | |
OUnitConf.make | |
"only_test" | |
~arg_string:"path" | |
~printer:(fun lst -> String.concat "," (List.map (Printf.sprintf "%S") lst)) | |
(fun r -> Arg.String (fun str -> r := str :: !r)) | |
[] | |
"Run only the selected tests." | |
in | |
let list_test = | |
OUnitConf.make | |
"list_test" | |
(fun r -> Arg.Set r) | |
~printer:string_of_bool | |
false | |
"List tests" | |
in | |
let () = | |
OUnitConf.load arg_specs | |
in | |
if list_test () then | |
begin | |
List.iter | |
(fun pth -> print_endline (string_of_path pth)) | |
(OUnitTest.test_case_paths suite) | |
end | |
else | |
begin | |
let nsuite = | |
if only_test () = [] then | |
suite | |
else | |
begin | |
match OUnitTest.test_filter ~skip:true (only_test ()) suite with | |
| Some test -> | |
test | |
| None -> | |
failwith | |
(Printf.sprintf | |
"Filtering test %s lead to no tests." | |
(String.concat ", " (only_test ()))) | |
end | |
in | |
let test_results = | |
set_verbose (global_verbose ()); | |
run_test_tt ~verbose:(global_verbose ()) nsuite | |
in | |
fexit test_results | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(** OUnitWorker | |
* | |
* A OUnitWorker is a single process that wait for a socket, read test, execute | |
* it, and return the result on the socket | |
* | |
* This need to be done in another process because ocaml Threads are not truly | |
* concurrent. Moreover we cannot use Unix.fork because it's not portable | |
*) | |
open OUnitTypes | |
let (|>) f x = x f | |
let (|-) f g x = g (f x) | |
let tap f x = f x; x | |
let runner socket = | |
let rec wait_socket sock = | |
let (socks, _ , _) = (Unix.select [socket] [] [] (-1.)) in | |
match socks with | |
| [] -> wait_socket sock | |
| s_test_cases::l -> s_test_cases | |
and read_socket sock = | |
let buff = Buffer.create 4096 | |
and str = ref (String.make 4096 (Char.chr 0)) | |
in | |
while 0 < Unix.read sock !str 0 4096 do | |
Buffer.add_string buff !str; | |
str := String.make 4096 (Char.chr 0) | |
done; | |
Buffer.contents buff | |
and run_test = function | |
| None -> ("", RSuccess) | |
| Some (path, test) -> | |
try | |
test (); (path, RSuccess) | |
with e -> | |
let backtrace = | |
if Printexc.backtrace_status () | |
then Some (Printexc.get_backtrace ()) | |
else None | |
in | |
match e with | |
| Failure s -> (path, RFailure (s, backtrace)) | |
| Skip s -> (path, RSkip s) | |
| Todo s -> (path, RTodo s) | |
| s -> (path, RError (Printexc.to_string s, backtrace)) | |
and write_socket sock str = | |
if 0 = Unix.single_write sock str 0 (String.length str) | |
then () (* TODO error handling : exit ? log something ? *) | |
else () | |
and quit_if_end = function | |
| None -> exit 0 | |
| Some _ -> () | |
in | |
while true do | |
wait_socket socket | |
|> read_socket | |
|> (fun s -> Marshal.from_string s 0) | |
|> tap quit_if_end (* if None is send, exit here *) | |
|> run_test | |
|> (fun res -> Marshal.to_string res [Marshal.Closures]) | |
|> (write_socket socket) | |
done | |
let _ = | |
if Array.length Sys.argv < 2 | |
then (print_endline "oUnitWorker is only called by OUnit !"; exit 64) | |
else | |
let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in | |
int_of_string Sys.argv.(1) | |
|> fun s -> (Unix.inet_addr_loopback, s) | |
|> fun (a,b) -> Unix.ADDR_INET (a,b) | |
|> (Unix.connect sock); | |
runner sock |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment