Skip to content

Instantly share code, notes, and snippets.

@LusciousPear
Created May 10, 2016 18:01
Show Gist options
  • Save LusciousPear/f39d9af869b1e9469a5086e9a53ce4c1 to your computer and use it in GitHub Desktop.
Save LusciousPear/f39d9af869b1e9469a5086e9a53ce4c1 to your computer and use it in GitHub Desktop.
package com.acacia.responsyspipeline;
import com.acacia.sdk.AbstractTransform;
import com.acacia.sdk.GenericDataflowAppException;
import com.google.auto.service.AutoService;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.LongSerializationPolicy;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.python.core.*;
import org.python.util.PythonInterpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
@AutoService(AbstractTransform.class)
public class PythonPipeline extends AbstractTransform {
private static final Logger LOG = LoggerFactory.getLogger(PythonPipeline.class);
private PythonInterpreter interp;
String function;
String module;
String filePath;
String output_table;
PySystemState systemState;
/*
* Transforms a JSON string using Python
*/
@Override
public String transform(String s) throws GenericDataflowAppException {
String outstr = "";
String outmap = "";
if (s != null && !s.equals("")) {
//we have to setup the state here because CDF tries to serialize after constructor is called, and Python interpreter etc can't be serialized
setupState();
LOG.info(function + " transforming: " + s);
try {
interp.set("out", new PyUnicode());
interp.set("inval", new PyUnicode(s));
String exc = "out = " + module + "." + function + "(inval)";
interp.exec(exc);
outstr = interp.get("out").asStringOrNull();
Gson gson = new GsonBuilder().setLongSerializationPolicy(LongSerializationPolicy.STRING).create();
Map<String, Object> hm = gson.<Map<String, Object>>fromJson(s, (new HashMap<String, Object>()).getClass());
if(outstr == null) {
throw new GenericDataflowAppException("error in:" +function + ", no output, likely bad HAPI request");
}
hm.put("resource", outstr);
hm.put("output_table", output_table);
outmap = gson.toJson(hm);
LOG.info(function + " out: " + outmap);
} catch (PyException e) {
String ex = ExceptionUtils.getStackTrace(e);
throw new GenericDataflowAppException(
"errror:" + e.toString() + " " +
e.getMessage() + " trace:" + ex);
}
}
return outmap;
}
/*
Instantiates an object which reads a JSON string, does python to it, and outputs a JSON string.
*/
public PythonPipeline(String module, String filePath, String function, String output_table) {
super();
this.module = module;
this.function = function;
this.filePath = filePath;
this.output_table = output_table;
}
private void setupState() {
if (interp == null) {
interp = new PythonInterpreter();
systemState = Py.getSystemState();
systemState.ps1 = systemState.ps2 = Py.EmptyString;
systemState.__setattr__("_jy_interpreter", Py.java2py(interp));
InputStream stream = PythonPipeline.class.getResourceAsStream(filePath);
if (stream == null) {
try {
throw new FileNotFoundException(" File " + filePath
+ " does not exist");
} catch (FileNotFoundException e) {
e.printStackTrace();
}
} else {
BufferedReader rdr = new BufferedReader(new InputStreamReader(stream));
interp.compile(rdr);
interp.exec("import sys");
interp.exec("import " + module);
}
}
}
@Override
public String transform(String s, Map<String, String> map) throws GenericDataflowAppException {
return transform(s);
}
@Override
public String toString() {
return "PythonPipeline{" + module + "." + function + "}";
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment