Created
May 4, 2011 17:09
-
-
Save kimsterv/955587 to your computer and use it in GitHub Desktop.
CSVLoader for Pig that includes path of file being processed
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.simplegeo.elephantgeo.pig.load; | |
/* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
* | |
* Based on earlier version for Pig 0.3 which was Copyright 2009 James Kebinger | |
* http://github.com/jkebinger/pig-user-defined-functions | |
* and on built-in PigStorage | |
* | |
*/ | |
import java.io.IOException; | |
import java.nio.ByteBuffer; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.List; | |
import java.util.Properties; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.InputFormat; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.RecordReader; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.input.FileSplit; | |
import org.apache.pig.FileInputLoadFunc; | |
import org.apache.pig.LoadPushDown; | |
import org.apache.pig.PigException; | |
import org.apache.pig.backend.executionengine.ExecException; | |
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; | |
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat; | |
import org.apache.pig.bzip2r.Bzip2TextInputFormat; | |
import org.apache.pig.data.DataByteArray; | |
import org.apache.pig.data.Tuple; | |
import org.apache.pig.data.TupleFactory; | |
import org.apache.pig.impl.logicalLayer.FrontendException; | |
import org.apache.pig.impl.util.ObjectSerializer; | |
import org.apache.pig.impl.util.UDFContext; | |
/** | |
* A load function based on PigStorage that implements part of the CSV "standard" | |
* This loader properly supports double-quoted fields that contain commas and other | |
* double-quotes escaped with backslashes. | |
* | |
* The following fields are all parsed as one tuple, per each line | |
* "the man, he said ""hello""" | |
* "one,two,three" | |
* | |
* This version supports pig 0.7+ | |
* | |
*/ | |
public class CSVLoaderWithFilename extends FileInputLoadFunc implements LoadPushDown { | |
@SuppressWarnings("rawtypes") | |
protected RecordReader in = null; | |
protected static final Log LOG = LogFactory.getLog(CSVLoaderWithFilename.class); | |
private static final byte DOUBLE_QUOTE = '"'; | |
private static final byte FIELD_DEL = ','; | |
private static final byte RECORD_DEL = '\n'; | |
long end = Long.MAX_VALUE; | |
private ArrayList<Object> mProtoTuple = null; | |
private TupleFactory mTupleFactory = TupleFactory.getInstance(); | |
private String signature; | |
private String loadLocation; | |
private boolean[] mRequiredColumns = null; | |
private boolean mRequiredColumnsInitialized = false; | |
private PigSplit mSplit; | |
public CSVLoaderWithFilename() { | |
} | |
@Override | |
public Tuple getNext() throws IOException { | |
mProtoTuple = new ArrayList<Object>(); | |
boolean inField = false; | |
boolean inQuotedField = false; | |
boolean evenQuotesSeen = true; | |
if (!mRequiredColumnsInitialized) { | |
if (signature != null) { | |
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass()); | |
mRequiredColumns = (boolean[])ObjectSerializer.deserialize(p.getProperty(signature)); | |
} | |
mRequiredColumnsInitialized = true; | |
} | |
try { | |
if (!in.nextKeyValue()) { | |
return null; | |
} | |
Text value; | |
if (mSplit != null) { | |
FileSplit fs = (FileSplit) mSplit.getWrappedSplit(); | |
String fn = fs.getPath().toString(); | |
value = new Text(in.getCurrentValue() + "," + fn); | |
} else { | |
value = (Text) in.getCurrentValue(); | |
} | |
byte[] buf = value.getBytes(); | |
int len = value.getLength(); | |
int fieldID = 0; | |
ByteBuffer fieldBuffer = ByteBuffer.allocate(len); | |
for (int i = 0; i < len; i++) { | |
byte b = buf[i]; | |
inField = true; | |
if (inQuotedField) { | |
if (b == DOUBLE_QUOTE) { | |
evenQuotesSeen = !evenQuotesSeen; | |
if (evenQuotesSeen) { | |
fieldBuffer.put(DOUBLE_QUOTE); | |
} | |
} else | |
if (!evenQuotesSeen && | |
(b == FIELD_DEL || b == RECORD_DEL)) { | |
inQuotedField = false; | |
inField = false; | |
readField(fieldBuffer, fieldID++); | |
} else { | |
fieldBuffer.put(b); | |
} | |
} else if (b == DOUBLE_QUOTE) { | |
inQuotedField = true; | |
evenQuotesSeen = true; | |
} else if (b == FIELD_DEL) { | |
inField = false; | |
readField(fieldBuffer, fieldID++); // end of the field | |
} else { | |
evenQuotesSeen = true; | |
fieldBuffer.put(b); | |
} | |
} | |
if (inField) readField(fieldBuffer, fieldID++); | |
} catch (InterruptedException e) { | |
int errCode = 6018; | |
String errMsg = "Error while reading input"; | |
throw new ExecException(errMsg, errCode, | |
PigException.REMOTE_ENVIRONMENT, e); | |
} | |
Tuple t = mTupleFactory.newTupleNoCopy(mProtoTuple); | |
return t; | |
} | |
private void readField(ByteBuffer buf, int fieldID) { | |
if (mRequiredColumns==null || (mRequiredColumns.length>fieldID && mRequiredColumns[fieldID])) { | |
byte[] bytes = new byte[buf.position()]; | |
buf.rewind(); | |
buf.get(bytes, 0, bytes.length); | |
mProtoTuple.add(new DataByteArray(bytes)); | |
} | |
buf.clear(); | |
} | |
@Override | |
public void setLocation(String location, Job job) throws IOException { | |
loadLocation = location; | |
FileInputFormat.setInputPaths(job, location); | |
} | |
@SuppressWarnings("rawtypes") | |
@Override | |
public InputFormat getInputFormat() throws IOException { | |
if(loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) { | |
return new Bzip2TextInputFormat(); | |
} else { | |
return new PigTextInputFormat(); | |
} | |
} | |
@Override | |
public void prepareToRead(@SuppressWarnings("rawtypes") RecordReader reader, PigSplit split) | |
throws IOException { | |
mSplit = split; | |
in = reader; | |
} | |
@Override | |
public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList) throws FrontendException { | |
if (requiredFieldList == null) | |
return null; | |
if (requiredFieldList.getFields() != null) | |
{ | |
int lastColumn = -1; | |
for (RequiredField rf: requiredFieldList.getFields()) | |
{ | |
if (rf.getIndex()>lastColumn) | |
{ | |
lastColumn = rf.getIndex(); | |
} | |
} | |
mRequiredColumns = new boolean[lastColumn+1]; | |
for (RequiredField rf: requiredFieldList.getFields()) | |
{ | |
if (rf.getIndex()!=-1) | |
mRequiredColumns[rf.getIndex()] = true; | |
} | |
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass()); | |
try { | |
p.setProperty(signature, ObjectSerializer.serialize(mRequiredColumns)); | |
} catch (Exception e) { | |
throw new RuntimeException("Cannot serialize mRequiredColumns"); | |
} | |
} | |
return new RequiredFieldResponse(true); | |
} | |
@Override | |
public void setUDFContextSignature(String signature) { | |
this.signature = signature; | |
} | |
@Override | |
public List<OperatorSet> getFeatures() { | |
return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment