-
-
Save kimsterv/601331 to your computer and use it in GitHub Desktop.
import java.io.IOException; | |
import java.util.Map; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.InputFormat; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.RecordReader; | |
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; | |
import org.apache.pig.LoadFunc; | |
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigFileInputFormat; | |
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; | |
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat; | |
import org.apache.pig.data.Tuple; | |
import org.apache.pig.data.TupleFactory; | |
import org.json.simple.JSONObject; | |
import org.json.simple.parser.JSONParser; | |
import org.json.simple.parser.ParseException; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import com.google.common.collect.Maps; | |
public class PigJsonLoader extends LoadFunc { | |
private static final Logger LOG = LoggerFactory.getLogger(PigJsonLoader.class); | |
private static final TupleFactory tupleFactory_ = TupleFactory.getInstance(); | |
private final JSONParser jsonParser_ = new JSONParser(); | |
private LineRecordReader in = null; | |
public PigJsonLoader() { | |
} | |
@SuppressWarnings("unchecked") | |
@Override | |
public InputFormat getInputFormat() throws IOException { | |
return new PigTextInputFormat(); | |
} | |
@Override | |
public Tuple getNext() throws IOException { | |
boolean notDone = in.nextKeyValue(); | |
if (!notDone) { | |
return null; | |
} | |
String line; | |
Text val = in.getCurrentValue(); | |
if (val == null) { | |
return null; | |
} | |
line = val.toString(); | |
if (line.length() > 0) { | |
Tuple t = parseStringToTuple(line); | |
if (t != null) { | |
return t; | |
} | |
} | |
return null; | |
} | |
protected Tuple parseStringToTuple(String line) { | |
try { | |
Map<String, String> values = Maps.newHashMap(); | |
JSONObject jsonObj = (JSONObject) jsonParser_.parse(line); | |
for (Object key : jsonObj.keySet()) { | |
Object value = jsonObj.get(key); | |
values.put(key.toString(), value != null ? value.toString() | |
: null); | |
} | |
return tupleFactory_.newTuple(values); | |
} catch (ParseException e) { | |
LOG.warn("Could not json-decode string: " + line, e); | |
return null; | |
} catch (NumberFormatException e) { | |
LOG.warn("Very big number exceeds the scale of long: " + line, e); | |
return null; | |
} | |
} | |
@SuppressWarnings("unchecked") | |
@Override | |
public void prepareToRead(RecordReader reader, PigSplit split) | |
throws IOException { | |
in = (LineRecordReader) reader; | |
} | |
@Override | |
public void setLocation(String location, Job job) throws IOException { | |
PigFileInputFormat.setInputPaths(job, location); | |
} | |
} |
Hi Folks,
I have a version of this which is nearly ready for Piggybank. I'd appreciate some hand holding - or I can post it as a gist for someone else to submit.
I have gotten it creating a nested tree of maps AND also inserts arrays as new Tuples. (If arrays are found in the above code then it gets inserted as one big bytearray)
I have removed the reference to Google Collections /Guava as that is one extra jar needed.
If I submit it to piggybank it will make sense to use the Jackson JSON parser instead of JSon Simple because that is already packaged with piggybank/pig.
alexmc6,
There are docs here on contributing to piggybank: http://wiki.apache.org/pig/PiggyBank
I've never done it so I can't give you any lessons learned. If you need additional help, I would email the pig forum.
[on a related topic]
I have forked elephant-bird (gerritjvv's repository) to implement some very preliminary support for nested JSON structures (that does not rely on re-parsing values as in the code snippet above):
https://github.com/ccattuto/elephant-bird
com.twitter.elephantbird.mapreduce.input.LzoJsonRecordReader now returns a nested MapWritable (with Writable values) that mirrors the parsed JSONObject.
com.twitter.elephantbird.pig8.load.LzoJsonLoader now rewrites the MapWritable returned by the LzoJsonRecordReader into a Map<String,Object> that Pig can use.
This allows one to access deep fields in JSON records like this:
X = LOAD .... AS (json: map[]);
Y = FOREACH X GENERATE json#'xxx'#'yyy' as yyy;
I am no expert, so the code is very rough and currently it only walks nested dictionaries,
filtering out Bags/Tuples.
JsonStringToMap in elephant-bird will handle nested json:
Here's elephant-bird's jsonLoader:
I modified this code to convert JSONArray to Bags and nested Structures to Maps.
protected Tuple parseStringToTuple(String line) {
try {
Map<String, Object> values = Maps.newHashMap();
JSONObject jsonObj = (JSONObject) jsonParser_.parse(line);
flatten_value(jsonObj, values);
return tupleFactory_.newTuple(values);
} catch (ParseException e) {
LOG.warn("Could not json-decode string: " + line, e);
return null;
} catch (NumberFormatException e) {
LOG.warn("Very big number exceeds the scale of long: " + line, e);
return null;
}
}
private void flatten_value(JSONObject jsonObj, Map<String, Object> values) {
for (Object key : jsonObj.keySet()) {
String pref = key.toString();
Object value = jsonObj.get(key);
if(value instanceof JSONArray) {
JSONArray array = (JSONArray)value;
DataBag bag = DefaultBagFactory.getInstance().newDefaultBag();
int i = 0;
for(Object innervalue :array) {
flatten_array(innervalue, bag);
}
values.put(pref, bag);
} else if (value instanceof JSONObject){
Map<String, Object> values2 = Maps.newHashMap();
flatten_value((JSONObject)value, values2);
values.put(pref, tupleFactory_.newTuple(values2));
} else {
values.put(pref, value != null ? value.toString(): null);
}
}
}
private void flatten_array(Object value, DataBag bag) {
if(value instanceof JSONArray) {
JSONArray array = (JSONArray)value;
DataBag b = DefaultBagFactory.getInstance().newDefaultBag();
int i = 0;
for(Object innervalue :array) {
flatten_array(innervalue, b);
i++;
}
bag.addAll(b);
} else if (value instanceof JSONObject){
Map<String, Object> values2 = Maps.newHashMap();
flatten_value((JSONObject)value, values2);
bag.add(tupleFactory_.newTuple(values2));
} else {
if(value !=null) {
bag.add( tupleFactory_.newTuple(value));
}
}
}
Hi - Thank you for this, it's great! Nice work.
Just curious, any reason why you chose to use the Google collections version of HashMap instead of the standard java implementation? When I used this I modified the Google collections HashMap to be just a java.util.HashMap in the interest of making the jar small and depending on as little external libraries as possible.
Also, any word on if/when this will be included in PiggyBank? As far as I can see it's not in there. :(
Thanks!
Pig already has guava, and so does ElephantBird, so there is no real extra dependency.
Well, that's weird....When I try to run this script without registering the google-collections (guava) jar, I get a ClassNotFoundException: com.google.common.collect.Maps. I am doing something wrong?
I see. I'm running 0.8.0
Cool.
If you guys are cool with it, I'd be more than happy to roll up this little class into a patch and get it submitted into PiggyBank for all of you. I've never done it before, but it seems straight-forward enough from the directions. The only thing I would need to add are some unit tests (shouldn't be a big deal). There is a jira issue pertaining to just this at: https://issues.apache.org/jira/browse/PIG-1914
Basically, if the authors (kimsterv and ayonsinha, or anyone else who helped write this that I left out) are cool with the apache license then I'd love to go ahead and get a patch going. I think this is definitely a must-have UDF and would be appreciated by the Pig user community.
Just let me know,
Thanks
I'm running into an error when trying to access a field of nested JSON, maybe one of you can give me some insight. I'm using ayonsinha's version of the JSON parser which /should/ convert a nested structure to a map? Maybe I'm just missing something?
e.g.
grunt> x = load '/my_json_data' using ...JsonLoader() as (json:map[]);
grunt> y = foreach x generate json#'field1'#'field2' as field2;
grunt> describe y
2011-06-29 18:24:16,980 [main] WARN org.apache.pig.PigServer - Encountered Warning IMPLICIT_CAST_TO_MAP 1 time(s).
y: {user_id: bytearray}
grunt> dump y
...
ERROR 1081: Cannot cast to map. Expected bytearray but received: tuple
Thing error seems reasonable to me since getNext is returning a Tuple. Has anyone else ran into this, and if so did you get around it somehow?
Thanks.
Yes, I have ran into this issue and I have gotten around it is by extracting each value in one step and flatten. Each value is a Tuple that you have to flatten out.
e.g.
exp = foreach all_data generate flatten($0#'request_details') as recs, (long)$0#'timestamp' as ts, $0#'user_id' as uid;
exp = filter exp by ts is not null and ts != 0;
req = foreach exp generate recs#'request_type' as req_type, ts, uid, flatten(recs#'_checkin_request_details') as loc_details, flatten(recs#'award_details') as award;
Definitely useful contribution if made :-)