Skip to content

Instantly share code, notes, and snippets.

@valmat
Last active August 25, 2022 13:27
Show Gist options
  • Save valmat/0e431059f0a02445e10d72d43ae23a05 to your computer and use it in GitHub Desktop.
Save valmat/0e431059f0a02445e10d72d43ae23a05 to your computer and use it in GitHub Desktop.
Multithreading find and execute
#!/usr/bin/rdmd --shebang= -I. -w -debug -g
//
// multithreading find and execute
//
/*
ldc2 -O3 -dw -release -w -boundscheck=off -fvisibility=hidden -c find_mt.d -of=find_mt.o && \
ldc2 -O3 -dw -release -w -link-defaultlib-shared=false -L-l:libphobos2-ldc.a -L-l:libdruntime-ldc.a -L-l:libz.a find_mt.o -of=find_mt && \
strip --strip-all find_mt && \
rm find_mt.o
*/
// Пример
// ./find_mt -f /tmp/0/2 -s '.28' -e 'bzip2 --best {}'
// ./find_mt -f /tmp/0/2 -s '.bz2' -e 'bzip2 -d {}'
// ./find_mt -f /tmp/0/2 -e 'bzip2 --best {}'
import std.stdio : writeln, stderr, stdout, write;
import std.getopt : getopt, defaultGetoptPrinter;
import std.algorithm : map, filter, endsWith, joiner;
import std.array : array;
import std.range : iota, replace;
import std.conv : to;
import std.file : dirEntries, SpanMode, exists, isFile, isDir;
import std.parallelism : parallel, totalCPUs;
import std.process : executeShell;
import core.sync.mutex : Mutex;
import core.thread : Thread;
import core.atomic : atomicOp;
struct PlaseOpt
{
string value;
bool valid = false;
alias valid this;
}
shared class Places
{
private:
Mutex _mtx;
shared string[] _places;
size_t _cur;
public:
this(string[] from_dirs, string suff, bool no_links, SpanMode dir_mode) shared
{
_mtx = new shared Mutex();
_cur = 0;
auto files_range = from_dirs
.map!dirFix
.filter!(dir => dir.exists && dir.isDir())
.map!(dir => dir.dirEntries(dir_mode, !no_links))
.joiner()
.filter!"a.isFile"
.map!"a.name";
_places = cast(shared string[]) (suff.length ?
files_range.filter!(name => name.endsWith(suff)).array :
files_range.array);
}
ref shared(const(string[])) values() shared const pure nothrow
{
return _places;
}
PlaseOpt retrive() shared
{
_mtx.lock();
PlaseOpt opt;
if(_cur < _places.length) {
opt = PlaseOpt(_places[_cur], true);
}
atomicOp!"+="(this._cur, 1);
_mtx.unlock();
return opt;
}
}
struct Conf
{
string exe_cmd;
bool verbose;
}
string dirFix(string dir) pure nothrow
{
return (dir[$-1] != '/') ? dir ~ '/' : dir;
}
int main(string[] args)
{
// Параметры командной строки
size_t cpus_num = cast(size_t) totalCPUs();
string[] from_dirs;
string suff;
string exe_cmd;
bool verbose = false;
bool no_links = false;
string dir_md_s = "depth";
{
auto helpInformation = getopt(args,
"f|from", "dir", &from_dirs,
"e|exe", "Команда. {} -- шаблон имени файла", &exe_cmd,
"s|suff", "Суффикс окончания имени файла", &suff,
"t|threads", "Количество потоков (Default: "~ cpus_num.to!string ~")", &cpus_num,
"v|verbose", "Verbose out (Default: false)", &verbose,
"nl|no-links", "Don't follow directory symlink (Default: false)", &no_links,
"dm|dir-mode", "'shallow', 'depth', 'breadth'" ~
"(See: dlang SpanMode. Default: depth)", &dir_md_s,
);
if (helpInformation.helpWanted || !from_dirs.length || !exe_cmd.length) {
defaultGetoptPrinter("Usage:\n" ~ args[0] ~ " -f <dir1> [... -f <dirN>] -e <exe_cmd> [...]\n", helpInformation.options);
return 1;
}
}
SpanMode dir_mode = SpanMode.depth;
final switch(dir_md_s) {
case "shallow" : dir_mode = SpanMode.shallow; break;
case "depth" : dir_mode = SpanMode.depth; break;
case "breadth" : dir_mode = SpanMode.breadth; break;
}
immutable(Conf) cfg = Conf(exe_cmd, verbose);
immutable(size_t[]) cores = iota(size_t(0), cpus_num).array;
if(cfg.verbose) {
stderr.writeln("cores: ", cores.length);
}
shared Places places = new shared Places(from_dirs, suff, no_links, dir_mode);
foreach(i; parallel(cores) ) {
PlaseOpt place_opt = places.retrive();
while (place_opt.valid) {
string cmd = cfg.exe_cmd.replace("{}", `'` ~ place_opt.value ~ `'`);
if(cfg.verbose) {
stderr.writeln(cmd);
}
auto rez = executeShell(cmd);
if (rez.status != 0) {
stderr.writeln("status: cmd is not 0: ", rez.status);
}
if(cfg.verbose) {
stderr.writeln(rez.output);
}
place_opt = places.retrive();
}
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment