Last active
August 25, 2022 13:27
-
-
Save valmat/0e431059f0a02445e10d72d43ae23a05 to your computer and use it in GitHub Desktop.
Multithreading find and execute
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/rdmd --shebang= -I. -w -debug -g | |
// | |
// multithreading find and execute | |
// | |
/* | |
ldc2 -O3 -dw -release -w -boundscheck=off -fvisibility=hidden -c find_mt.d -of=find_mt.o && \ | |
ldc2 -O3 -dw -release -w -link-defaultlib-shared=false -L-l:libphobos2-ldc.a -L-l:libdruntime-ldc.a -L-l:libz.a find_mt.o -of=find_mt && \ | |
strip --strip-all find_mt && \ | |
rm find_mt.o | |
*/ | |
// Пример | |
// ./find_mt -f /tmp/0/2 -s '.28' -e 'bzip2 --best {}' | |
// ./find_mt -f /tmp/0/2 -s '.bz2' -e 'bzip2 -d {}' | |
// ./find_mt -f /tmp/0/2 -e 'bzip2 --best {}' | |
import std.stdio : writeln, stderr, stdout, write; | |
import std.getopt : getopt, defaultGetoptPrinter; | |
import std.algorithm : map, filter, endsWith, joiner; | |
import std.array : array; | |
import std.range : iota, replace; | |
import std.conv : to; | |
import std.file : dirEntries, SpanMode, exists, isFile, isDir; | |
import std.parallelism : parallel, totalCPUs; | |
import std.process : executeShell; | |
import core.sync.mutex : Mutex; | |
import core.thread : Thread; | |
import core.atomic : atomicOp; | |
struct PlaseOpt | |
{ | |
string value; | |
bool valid = false; | |
alias valid this; | |
} | |
shared class Places | |
{ | |
private: | |
Mutex _mtx; | |
shared string[] _places; | |
size_t _cur; | |
public: | |
this(string[] from_dirs, string suff, bool no_links, SpanMode dir_mode) shared | |
{ | |
_mtx = new shared Mutex(); | |
_cur = 0; | |
auto files_range = from_dirs | |
.map!dirFix | |
.filter!(dir => dir.exists && dir.isDir()) | |
.map!(dir => dir.dirEntries(dir_mode, !no_links)) | |
.joiner() | |
.filter!"a.isFile" | |
.map!"a.name"; | |
_places = cast(shared string[]) (suff.length ? | |
files_range.filter!(name => name.endsWith(suff)).array : | |
files_range.array); | |
} | |
ref shared(const(string[])) values() shared const pure nothrow | |
{ | |
return _places; | |
} | |
PlaseOpt retrive() shared | |
{ | |
_mtx.lock(); | |
PlaseOpt opt; | |
if(_cur < _places.length) { | |
opt = PlaseOpt(_places[_cur], true); | |
} | |
atomicOp!"+="(this._cur, 1); | |
_mtx.unlock(); | |
return opt; | |
} | |
} | |
struct Conf | |
{ | |
string exe_cmd; | |
bool verbose; | |
} | |
string dirFix(string dir) pure nothrow | |
{ | |
return (dir[$-1] != '/') ? dir ~ '/' : dir; | |
} | |
int main(string[] args) | |
{ | |
// Параметры командной строки | |
size_t cpus_num = cast(size_t) totalCPUs(); | |
string[] from_dirs; | |
string suff; | |
string exe_cmd; | |
bool verbose = false; | |
bool no_links = false; | |
string dir_md_s = "depth"; | |
{ | |
auto helpInformation = getopt(args, | |
"f|from", "dir", &from_dirs, | |
"e|exe", "Команда. {} -- шаблон имени файла", &exe_cmd, | |
"s|suff", "Суффикс окончания имени файла", &suff, | |
"t|threads", "Количество потоков (Default: "~ cpus_num.to!string ~")", &cpus_num, | |
"v|verbose", "Verbose out (Default: false)", &verbose, | |
"nl|no-links", "Don't follow directory symlink (Default: false)", &no_links, | |
"dm|dir-mode", "'shallow', 'depth', 'breadth'" ~ | |
"(See: dlang SpanMode. Default: depth)", &dir_md_s, | |
); | |
if (helpInformation.helpWanted || !from_dirs.length || !exe_cmd.length) { | |
defaultGetoptPrinter("Usage:\n" ~ args[0] ~ " -f <dir1> [... -f <dirN>] -e <exe_cmd> [...]\n", helpInformation.options); | |
return 1; | |
} | |
} | |
SpanMode dir_mode = SpanMode.depth; | |
final switch(dir_md_s) { | |
case "shallow" : dir_mode = SpanMode.shallow; break; | |
case "depth" : dir_mode = SpanMode.depth; break; | |
case "breadth" : dir_mode = SpanMode.breadth; break; | |
} | |
immutable(Conf) cfg = Conf(exe_cmd, verbose); | |
immutable(size_t[]) cores = iota(size_t(0), cpus_num).array; | |
if(cfg.verbose) { | |
stderr.writeln("cores: ", cores.length); | |
} | |
shared Places places = new shared Places(from_dirs, suff, no_links, dir_mode); | |
foreach(i; parallel(cores) ) { | |
PlaseOpt place_opt = places.retrive(); | |
while (place_opt.valid) { | |
string cmd = cfg.exe_cmd.replace("{}", `'` ~ place_opt.value ~ `'`); | |
if(cfg.verbose) { | |
stderr.writeln(cmd); | |
} | |
auto rez = executeShell(cmd); | |
if (rez.status != 0) { | |
stderr.writeln("status: cmd is not 0: ", rez.status); | |
} | |
if(cfg.verbose) { | |
stderr.writeln(rez.output); | |
} | |
place_opt = places.retrive(); | |
} | |
} | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment