Created
May 11, 2012 20:35
-
-
Save mushkevych/2662252 to your computer and use it in GitHub Desktop.
Exemplary R Reducer to illustrate basic principles of running R from Hadoop mapreduce
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.hbase.client.Put; | |
import org.apache.hadoop.hbase.client.Result; | |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; | |
import org.apache.hadoop.io.Writable; | |
import org.apache.log4j.Logger; | |
import org.rosuda.JRI.REXP; | |
import org.rosuda.JRI.RMainLoopCallbacks; | |
import org.rosuda.JRI.Rengine; | |
import java.io.IOException; | |
/** | |
* @author Bohdan Mushkevych | |
* date: May 2012 | |
*/ | |
public class ExemplaryRReducer extends AbstractTableReducer<ImmutableBytesWritable, Writable> { | |
private static Logger log = Logger.getLogger(ExemplaryRReducer.class); | |
protected Rengine re; | |
static class LoggingConsole implements RMainLoopCallbacks { | |
private Logger log; | |
LoggingConsole(Logger log) { | |
this.log = log; | |
} | |
public void rWriteConsole(Rengine re, String text, int oType) { | |
log.info("*** R: callback rWriteConsole(" + text + ")"); | |
} | |
public void rBusy(Rengine re, int which) { | |
log.info("*** R: callback rBusy(" + which + ")"); | |
} | |
public void rShowMessage(Rengine re, String message) { | |
log.info("*** R: callback rShowMessage \"" + message + "\""); | |
} | |
public String rReadConsole(Rengine re, String prompt, int addToHistory) { | |
return null; | |
} | |
public String rChooseFile(Rengine re, int newFile) { | |
return null; | |
} | |
public void rFlushConsole(Rengine re) { | |
} | |
public void rLoadHistory(Rengine re, String filename) { | |
} | |
public void rSaveHistory(Rengine re, String filename) { | |
} | |
} | |
/** | |
* R Engine initialization is in a different method than *setup* to enable Unit Testing | |
* @param runMainLoop if set to <code>true</code> the the event loop will be started as soon as possible, | |
* otherwise no event loop is started. Running loop requires <code>initialCallbacks</code> to be set correspondingly as well. | |
*/ | |
public void initR(boolean runMainLoop) { | |
// Call R and perform coefficient computing | |
// just making sure we have the right version of everything | |
if (!Rengine.versionCheck()) { | |
throw new IllegalStateException("*** R: version mismatch - Java files don't match R library version."); | |
} | |
// --vanilla Combine --no-save, --no-restore, --no-site-file, --no-init-file and --no-environ | |
// --slave Make R run as quietly as possible | |
// for more details run <code>R --help</code> from command line | |
re = new Rengine(new String[]{"--vanilla", "--slave"}, runMainLoop, new LoggingConsole(log)); | |
// the engine creates R is a new thread, so we should wait until it's ready | |
if (!re.waitForR()) { | |
throw new IllegalStateException("*** R: cannot start the engine."); | |
} | |
try { | |
// check if "reshape" package is installed | |
re.eval("is.installed <- function(mypkg) is.element(mypkg, installed.packages()[,1])"); | |
REXP isInstalled = re.eval("is.installed(\"reshape\")"); | |
if (isInstalled.asBool().isFALSE()) { | |
log.info("*** R: reshape package is missing. Installing locally."); | |
// install "reshape" package if this is needed | |
re.eval("install.packages(\"reshape\", repos=\"http://cran.stat.sfu.ca/\")"); | |
} else { | |
log.info("*** R: reshape package is installed. Proceeding."); | |
} | |
} catch (Exception e) { | |
log.error("*** Exception during R initialization: ", e); | |
} | |
try { | |
// load "reshape" package | |
re.eval("library(reshape)"); | |
} catch (Exception e) { | |
log.error("*** Exception while loading reshape package: ", e); | |
} | |
} | |
@Override | |
protected void setup(Context context) throws IOException, InterruptedException { | |
super.setup(context); | |
initR(false); | |
} | |
/** | |
* method performs R computations | |
*/ | |
public void performRComputations() { | |
try { | |
// clear workspace before new processing round | |
re.eval("rm(list=ls())"); | |
// perform some useful computations | |
re.eval("N <- SOMETHING_USEFUL"); | |
} catch (Exception e) { | |
log.error("*** Exception on R stage: ", e); | |
} | |
} | |
@Override | |
protected void reduce(ImmutableBytesWritable key, Iterable<Writable> values, Context context) throws IOException, InterruptedException { | |
// aggregate records | |
for (Writable value : values) { | |
Result singleResult = (Result) value; | |
// do something useful with singleResult | |
// perform R computations | |
performRComputations(); | |
// place computation results into HBase | |
// remember to configure Put object properly | |
Put put = new Put(); | |
context.write(key, put); | |
} | |
} | |
/** | |
* R Engine closure is in a separate method than *cleanup* to enable Unit Testing | |
*/ | |
public void cleanR() { | |
if (re == null) { | |
return; | |
} | |
re.end(); | |
if (!re.waitForR()) { | |
log.info("*** R: engine is stopped."); | |
} else { | |
log.info("*** R: engine turned to zombie."); | |
} | |
} | |
@Override | |
protected void cleanup(Context context) throws IOException, InterruptedException { | |
cleanR(); | |
super.cleanup(context); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment