Last active
August 29, 2015 14:27
-
-
Save mschubert/79aae1c2b4f9500c05d3 to your computer and use it in GitHub Desktop.
Minimal example of using rzmq to submit a worker job using LSF
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#BSUB-J {{ job_name }} # name of the job / array jobs | |
#BSUB-o {{ log_file | /dev/null }} # output is sent to logfile, stdout + stderr by default | |
#BSUB-P {{ queue }} # Job queue | |
#BSUB-W {{ walltime }} # Walltime in minutes | |
#BSUB-M {{ memory | 4096 }} # Memory requirements in Mbytes | |
#BSUB-R rusage[mem={{ memory | 4096 }}] # Memory requirements in Mbytes | |
#BSUB-R select[panfs_nobackup_research] | |
R --no-save --no-restore --args "{{ args }}" < "{{ rscript }}" | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
library(rzmq) | |
library(infuser) | |
# use the template & submit | |
values = list( | |
queue = "research-rh6", | |
walltime = 10080, | |
memory = 1024, | |
rscript = "worker.r", | |
args = sprintf("tcp://%s:%i", Sys.info()[['nodename']], 6124) | |
) | |
# bind status socket | |
zmq.context = init.context() | |
socket = init.socket(zmq.context, "ZMQ_REP") | |
bind.socket(socket, "tcp://*:6124") | |
# do the submissions | |
njobs = 10 | |
for (j in 1:njobs) { | |
values$job_name = paste0("rzmq-", j) | |
values$log_file = paste0(values$job_name, ".log") | |
system("bsub", input=infuse("LSF.tmpl", values)) | |
} | |
# define the function to run on the workers and input data | |
fun = function(x, y) x*x+y | |
job_data = 1:1e4 | |
job_const = list(y=100) | |
job_status = factor(rep("queued", length(job_data)), levels=c("queued", "running", "done")) | |
job_result = list() | |
# send and receive messages | |
while(any(job_status %in% c("queued", "running"))) { | |
msg = receive.socket(socket) | |
print(msg) | |
if (msg$id == 0) | |
send.socket(socket, data=list(fun=fun, const=job_const), send.more=TRUE) | |
else { | |
job_status[msg$id] = "done" | |
job_result[[msg$id]] = msg$result | |
} | |
id = which(job_status == "queued")[1] | |
if (!is.na(id)) { | |
send.socket(socket, data=list(id=id, iter=job_data[id])) | |
job_status[id] = "running" | |
} else | |
send.socket(socket, data=list(id=0)) | |
Sys.sleep(0.001) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# this should be submitted by bsub | |
# and get the server as argument | |
master = commandArgs(TRUE)[1] | |
#master = "tcp://ebi-002.ebi.ac.uk:6124" | |
print(master) | |
library(rzmq) | |
context = init.context() | |
socket = init.socket(context, "ZMQ_REQ") | |
connect.socket(socket, master) | |
send.socket(socket, data=list(id=0)) | |
msg = receive.socket(socket) | |
print(msg) | |
fun = msg$fun | |
const = msg$const | |
while(TRUE) { | |
msg = receive.socket(socket) | |
print(msg) | |
if (msg$id == 0) | |
break | |
result = try(do.call(fun, c(const, msg$iter))) | |
print(result) | |
send.socket(socket, data=list(id = msg$id, result=result)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment