Created
May 10, 2016 18:01
-
-
Save LusciousPear/f39d9af869b1e9469a5086e9a53ce4c1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.acacia.responsyspipeline; | |
import com.acacia.sdk.AbstractTransform; | |
import com.acacia.sdk.GenericDataflowAppException; | |
import com.google.auto.service.AutoService; | |
import com.google.gson.Gson; | |
import com.google.gson.GsonBuilder; | |
import com.google.gson.LongSerializationPolicy; | |
import org.apache.commons.lang3.StringEscapeUtils; | |
import org.apache.commons.lang3.exception.ExceptionUtils; | |
import org.python.core.*; | |
import org.python.util.PythonInterpreter; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.BufferedReader; | |
import java.io.FileNotFoundException; | |
import java.io.InputStream; | |
import java.io.InputStreamReader; | |
import java.util.HashMap; | |
import java.util.Map; | |
@AutoService(AbstractTransform.class) | |
public class PythonPipeline extends AbstractTransform { | |
private static final Logger LOG = LoggerFactory.getLogger(PythonPipeline.class); | |
private PythonInterpreter interp; | |
String function; | |
String module; | |
String filePath; | |
String output_table; | |
PySystemState systemState; | |
/* | |
* Transforms a JSON string using Python | |
*/ | |
@Override | |
public String transform(String s) throws GenericDataflowAppException { | |
String outstr = ""; | |
String outmap = ""; | |
if (s != null && !s.equals("")) { | |
//we have to setup the state here because CDF tries to serialize after constructor is called, and Python interpreter etc can't be serialized | |
setupState(); | |
LOG.info(function + " transforming: " + s); | |
try { | |
interp.set("out", new PyUnicode()); | |
interp.set("inval", new PyUnicode(s)); | |
String exc = "out = " + module + "." + function + "(inval)"; | |
interp.exec(exc); | |
outstr = interp.get("out").asStringOrNull(); | |
Gson gson = new GsonBuilder().setLongSerializationPolicy(LongSerializationPolicy.STRING).create(); | |
Map<String, Object> hm = gson.<Map<String, Object>>fromJson(s, (new HashMap<String, Object>()).getClass()); | |
if(outstr == null) { | |
throw new GenericDataflowAppException("error in:" +function + ", no output, likely bad HAPI request"); | |
} | |
hm.put("resource", outstr); | |
hm.put("output_table", output_table); | |
outmap = gson.toJson(hm); | |
LOG.info(function + " out: " + outmap); | |
} catch (PyException e) { | |
String ex = ExceptionUtils.getStackTrace(e); | |
throw new GenericDataflowAppException( | |
"errror:" + e.toString() + " " + | |
e.getMessage() + " trace:" + ex); | |
} | |
} | |
return outmap; | |
} | |
/* | |
Instantiates an object which reads a JSON string, does python to it, and outputs a JSON string. | |
*/ | |
public PythonPipeline(String module, String filePath, String function, String output_table) { | |
super(); | |
this.module = module; | |
this.function = function; | |
this.filePath = filePath; | |
this.output_table = output_table; | |
} | |
private void setupState() { | |
if (interp == null) { | |
interp = new PythonInterpreter(); | |
systemState = Py.getSystemState(); | |
systemState.ps1 = systemState.ps2 = Py.EmptyString; | |
systemState.__setattr__("_jy_interpreter", Py.java2py(interp)); | |
InputStream stream = PythonPipeline.class.getResourceAsStream(filePath); | |
if (stream == null) { | |
try { | |
throw new FileNotFoundException(" File " + filePath | |
+ " does not exist"); | |
} catch (FileNotFoundException e) { | |
e.printStackTrace(); | |
} | |
} else { | |
BufferedReader rdr = new BufferedReader(new InputStreamReader(stream)); | |
interp.compile(rdr); | |
interp.exec("import sys"); | |
interp.exec("import " + module); | |
} | |
} | |
} | |
@Override | |
public String transform(String s, Map<String, String> map) throws GenericDataflowAppException { | |
return transform(s); | |
} | |
@Override | |
public String toString() { | |
return "PythonPipeline{" + module + "." + function + "}"; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment