-
-
Save kimsterv/601331 to your computer and use it in GitHub Desktop.
import java.io.IOException; | |
import java.util.Map; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.InputFormat; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.RecordReader; | |
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; | |
import org.apache.pig.LoadFunc; | |
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigFileInputFormat; | |
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; | |
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat; | |
import org.apache.pig.data.Tuple; | |
import org.apache.pig.data.TupleFactory; | |
import org.json.simple.JSONObject; | |
import org.json.simple.parser.JSONParser; | |
import org.json.simple.parser.ParseException; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import com.google.common.collect.Maps; | |
public class PigJsonLoader extends LoadFunc { | |
private static final Logger LOG = LoggerFactory.getLogger(PigJsonLoader.class); | |
private static final TupleFactory tupleFactory_ = TupleFactory.getInstance(); | |
private final JSONParser jsonParser_ = new JSONParser(); | |
private LineRecordReader in = null; | |
public PigJsonLoader() { | |
} | |
@SuppressWarnings("unchecked") | |
@Override | |
public InputFormat getInputFormat() throws IOException { | |
return new PigTextInputFormat(); | |
} | |
@Override | |
public Tuple getNext() throws IOException { | |
boolean notDone = in.nextKeyValue(); | |
if (!notDone) { | |
return null; | |
} | |
String line; | |
Text val = in.getCurrentValue(); | |
if (val == null) { | |
return null; | |
} | |
line = val.toString(); | |
if (line.length() > 0) { | |
Tuple t = parseStringToTuple(line); | |
if (t != null) { | |
return t; | |
} | |
} | |
return null; | |
} | |
protected Tuple parseStringToTuple(String line) { | |
try { | |
Map<String, String> values = Maps.newHashMap(); | |
JSONObject jsonObj = (JSONObject) jsonParser_.parse(line); | |
for (Object key : jsonObj.keySet()) { | |
Object value = jsonObj.get(key); | |
values.put(key.toString(), value != null ? value.toString() | |
: null); | |
} | |
return tupleFactory_.newTuple(values); | |
} catch (ParseException e) { | |
LOG.warn("Could not json-decode string: " + line, e); | |
return null; | |
} catch (NumberFormatException e) { | |
LOG.warn("Very big number exceeds the scale of long: " + line, e); | |
return null; | |
} | |
} | |
@SuppressWarnings("unchecked") | |
@Override | |
public void prepareToRead(RecordReader reader, PigSplit split) | |
throws IOException { | |
in = (LineRecordReader) reader; | |
} | |
@Override | |
public void setLocation(String location, Job job) throws IOException { | |
PigFileInputFormat.setInputPaths(job, location); | |
} | |
} |
--example rolling up logs by day/user/method
--jar containing jsonpigloader
register shadoop-0.0.1.jar;
register guava-r06.jar;
register json-simple-1.1.jar;
register piggybank.jar;
register joda-time-1.6.jar;
DEFINE PigJsonLoader com.simplegeo.elephantgeo.pig.load.PigJsonLoader();
DEFINE UnixToISO org.apache.pig.piggybank.evaluation.datetime.convert.UnixToISO();
DEFINE ISOToDay org.apache.pig.piggybank.evaluation.datetime.truncate.ISOToDay();
logs = LOAD '/logs' using PigJsonLoader as (json: map[]);
logs = FOREACH data GENERATE UnixToISO((long)((double)json#'timestamp')) AS ISOTime:chararray, json#'method' as method, json#'user:id' as user, json#'path' as path;
logs_by_day_user = FOREACH logs GENERATE (chararray) user, (chararray) ISOToDay(ISOTime) as day, method;
grouped_by_day_user = GROUP logs_by_day_user by ((chararray) user, day, (chararray) method) PARALLEL 32;
counted_by_day_user = FOREACH grouped_by_day_user GENERATE FLATTEN(logs_by_day_user), COUNT(logs_by_day_user);
STORE counted_by_day_user INTO '/output' using PigStorage(',');
Thanks!
Can this be added to the Piggy Bank or core PIg? It looks really useful.
Thanks again Kim. I've compiled this into my own piggybank and got a basic json file loaded. Is it supposed to deal with nested json data structures? I see you have json#'user:id' in the example above - but I can't get anything like that to work. In my case I try something like
samplecols = FOREACH fewlogs GENERATE json#'user' , json#'text' ;
and json#'user' is itself something like
{"location":"Chicago","statuses_count":57,"profile_background_tile":true,"lang":"en","profile_link_color":"f51883","id":237881805,"following":null,"protected":false,......}
And I can't access any entries in there.
Do I need to take your code and turn it into another UDF which works on fields already loaded rather than only parsing JSOn on loading?
Kim, nice parser. One enhancement that makes accessing nested Json accessible would be to return back a Map if a value is also JSON. I have this working with:
public Map<String, Object> parse(String line) throws ParseException {
JSONParser parser = new JSONParser();
return parse(line, parser);
}
private Map<String, Object> parse(String line, JSONParser jsonParser) throws ParseException {
Map<String, Object> resultMap = new HashMap<String, Object>();
JSONObject jsonObj = null;
try {
jsonObj = (JSONObject) jsonParser.parse(line);
} catch (ClassCastException e) {
throw new ParseException(1, "unexpected object");
}
if (jsonObj.isEmpty()) {
return resultMap;
}
for (Object key : jsonObj.keySet()) {
String keyStr = String.valueOf(key);
String valueStr = String.valueOf(jsonObj.get(key));
try {
resultMap.put(keyStr, parse(valueStr));
} catch (ParseException e) {
// Nope not json
resultMap.put(keyStr, valueStr);
}
return resultMap;
}
So for accessing valB from X = {"keyA", {"keyB":"valB", "keyC","valC"}}, one can use x#'keyA'#'keyB' where X is the record being read.
}
}
Thanks again.
We should probably put this stuff into piggybank. I just haven't had the time to figure out how yet :-/
Definitely useful contribution if made :-)
Hi Folks,
I have a version of this which is nearly ready for Piggybank. I'd appreciate some hand holding - or I can post it as a gist for someone else to submit.
I have gotten it creating a nested tree of maps AND also inserts arrays as new Tuples. (If arrays are found in the above code then it gets inserted as one big bytearray)
I have removed the reference to Google Collections /Guava as that is one extra jar needed.
If I submit it to piggybank it will make sense to use the Jackson JSON parser instead of JSon Simple because that is already packaged with piggybank/pig.
alexmc6,
There are docs here on contributing to piggybank: http://wiki.apache.org/pig/PiggyBank
I've never done it so I can't give you any lessons learned. If you need additional help, I would email the pig forum.
[on a related topic]
I have forked elephant-bird (gerritjvv's repository) to implement some very preliminary support for nested JSON structures (that does not rely on re-parsing values as in the code snippet above):
https://github.com/ccattuto/elephant-bird
com.twitter.elephantbird.mapreduce.input.LzoJsonRecordReader now returns a nested MapWritable (with Writable values) that mirrors the parsed JSONObject.
com.twitter.elephantbird.pig8.load.LzoJsonLoader now rewrites the MapWritable returned by the LzoJsonRecordReader into a Map<String,Object> that Pig can use.
This allows one to access deep fields in JSON records like this:
X = LOAD .... AS (json: map[]);
Y = FOREACH X GENERATE json#'xxx'#'yyy' as yyy;
I am no expert, so the code is very rough and currently it only walks nested dictionaries,
filtering out Bags/Tuples.
JsonStringToMap in elephant-bird will handle nested json:
Here's elephant-bird's jsonLoader:
I modified this code to convert JSONArray to Bags and nested Structures to Maps.
protected Tuple parseStringToTuple(String line) {
try {
Map<String, Object> values = Maps.newHashMap();
JSONObject jsonObj = (JSONObject) jsonParser_.parse(line);
flatten_value(jsonObj, values);
return tupleFactory_.newTuple(values);
} catch (ParseException e) {
LOG.warn("Could not json-decode string: " + line, e);
return null;
} catch (NumberFormatException e) {
LOG.warn("Very big number exceeds the scale of long: " + line, e);
return null;
}
}
private void flatten_value(JSONObject jsonObj, Map<String, Object> values) {
for (Object key : jsonObj.keySet()) {
String pref = key.toString();
Object value = jsonObj.get(key);
if(value instanceof JSONArray) {
JSONArray array = (JSONArray)value;
DataBag bag = DefaultBagFactory.getInstance().newDefaultBag();
int i = 0;
for(Object innervalue :array) {
flatten_array(innervalue, bag);
}
values.put(pref, bag);
} else if (value instanceof JSONObject){
Map<String, Object> values2 = Maps.newHashMap();
flatten_value((JSONObject)value, values2);
values.put(pref, tupleFactory_.newTuple(values2));
} else {
values.put(pref, value != null ? value.toString(): null);
}
}
}
private void flatten_array(Object value, DataBag bag) {
if(value instanceof JSONArray) {
JSONArray array = (JSONArray)value;
DataBag b = DefaultBagFactory.getInstance().newDefaultBag();
int i = 0;
for(Object innervalue :array) {
flatten_array(innervalue, b);
i++;
}
bag.addAll(b);
} else if (value instanceof JSONObject){
Map<String, Object> values2 = Maps.newHashMap();
flatten_value((JSONObject)value, values2);
bag.add(tupleFactory_.newTuple(values2));
} else {
if(value !=null) {
bag.add( tupleFactory_.newTuple(value));
}
}
}
Hi - Thank you for this, it's great! Nice work.
Just curious, any reason why you chose to use the Google collections version of HashMap instead of the standard java implementation? When I used this I modified the Google collections HashMap to be just a java.util.HashMap in the interest of making the jar small and depending on as little external libraries as possible.
Also, any word on if/when this will be included in PiggyBank? As far as I can see it's not in there. :(
Thanks!
Pig already has guava, and so does ElephantBird, so there is no real extra dependency.
Well, that's weird....When I try to run this script without registering the google-collections (guava) jar, I get a ClassNotFoundException: com.google.common.collect.Maps. I am doing something wrong?
I see. I'm running 0.8.0
Cool.
If you guys are cool with it, I'd be more than happy to roll up this little class into a patch and get it submitted into PiggyBank for all of you. I've never done it before, but it seems straight-forward enough from the directions. The only thing I would need to add are some unit tests (shouldn't be a big deal). There is a jira issue pertaining to just this at: https://issues.apache.org/jira/browse/PIG-1914
Basically, if the authors (kimsterv and ayonsinha, or anyone else who helped write this that I left out) are cool with the apache license then I'd love to go ahead and get a patch going. I think this is definitely a must-have UDF and would be appreciated by the Pig user community.
Just let me know,
Thanks
I'm running into an error when trying to access a field of nested JSON, maybe one of you can give me some insight. I'm using ayonsinha's version of the JSON parser which /should/ convert a nested structure to a map? Maybe I'm just missing something?
e.g.
grunt> x = load '/my_json_data' using ...JsonLoader() as (json:map[]);
grunt> y = foreach x generate json#'field1'#'field2' as field2;
grunt> describe y
2011-06-29 18:24:16,980 [main] WARN org.apache.pig.PigServer - Encountered Warning IMPLICIT_CAST_TO_MAP 1 time(s).
y: {user_id: bytearray}
grunt> dump y
...
ERROR 1081: Cannot cast to map. Expected bytearray but received: tuple
Thing error seems reasonable to me since getNext is returning a Tuple. Has anyone else ran into this, and if so did you get around it somehow?
Thanks.
Yes, I have ran into this issue and I have gotten around it is by extracting each value in one step and flatten. Each value is a Tuple that you have to flatten out.
e.g.
exp = foreach all_data generate flatten($0#'request_details') as recs, (long)$0#'timestamp' as ts, $0#'user_id' as uid;
exp = filter exp by ts is not null and ts != 0;
req = foreach exp generate recs#'request_type' as req_type, ts, uid, flatten(recs#'_checkin_request_details') as loc_details, flatten(recs#'award_details') as award;
Can you show a pig script sample that uses your this JSONLoader?
Thanks for sharing!