|
library("rredis") |
|
library("jsonlite") |
|
|
|
# TODO: find a way to variablize the callback function name (which is also the channel key name) |
|
|
|
poll <- TRUE |
|
|
|
# create a new environment to use as the context |
|
# for evaluating the received code, so that it is preserved |
|
# between calls. this also serves as a sandbox for the evaluated code |
|
env <- new.env(parent=baseenv()) |
|
|
|
channel <- function(data) { |
|
cat("Message received from channel:", data, "\n") |
|
|
|
if identical(data, "DONE") { |
|
poll <<- FALSE # NOTE: assign global with "<<-" operator |
|
} |
|
else { |
|
# need to set the connection context for the read connection |
|
redisSetContext(read_context) |
|
|
|
# retrieve the R code |
|
code <- redisGet(paste(data, ":request", sep="")) |
|
# TODO: validate the code string |
|
|
|
tryCatch( |
|
{ |
|
# evaluate the R code |
|
result <- eval(parse(text=code), env) |
|
|
|
# push the response as a JSON string |
|
redisSet(paste(data, ":response", sep=""), serializeJSON(result, digits=30)) |
|
}, |
|
error=function(condition) { |
|
# push the response as a JSON string |
|
redisSet(paste(data, ":response", sep=""), serializeJSON(condition)) |
|
} |
|
) |
|
|
|
# notify the client |
|
#redisMulti() |
|
redisDelete(paste("lock:owner:", data, ":lock", sep="")) |
|
redisDelete(paste("lock:expire:", data, ":lock", sep="")) |
|
# redisPublish(paste(data, ":response", sep=""), "ready") # for async |
|
#redisExec() |
|
} |
|
} |
|
|
|
# create connection for reads |
|
read_context <- redisConnect(returnRef=TRUE) |
|
|
|
# create connection for subscription |
|
subscribe_context <- redisConnect(returnRef=TRUE) |
|
redisSetContext(subscribe_context) |
|
|
|
# this looks for a variable of the same name |
|
# which is also the key of the subscription |
|
# FIXME: would be nice to associate the channel key with the callback function |
|
redisSubscribe(c("channel")) |
|
|
|
while(poll) |
|
{ |
|
# need to set the connection context for redis |
|
# as the subscription callback sets it to the read context |
|
# since the same connection cannot be used for subscriptions |
|
# FIXME: the redis* methods should (optionally) take a connection as an argument |
|
redisSetContext(subscribe_context) |
|
|
|
redisMonitorChannels() |
|
Sys.sleep(0.05) |
|
} |
|
|
|
# clean up |
|
redisSetContext(subscribe_context) |
|
redisUnsubscribe(c("channel")) |
|
redisClose() |
|
|
|
# clean up |
|
redisSetContext(read_context) |
|
redisClose() |