Skip to content

Instantly share code, notes, and snippets.

@tf0054
Created March 22, 2012 14:52
Show Gist options
  • Save tf0054/2158772 to your computer and use it in GitHub Desktop.
Save tf0054/2158772 to your computer and use it in GitHub Desktop.
Hack#48
package org.hadoophacks.pig;
import org.apache.pig.LoadFunc;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.io.LongWritable;
public class SequenceFileLoader extends LoadFunc{
private RecordReader<Text,Text> reader;
private TupleFactory tupleFactory = TupleFactory.getInstance();
private String delim = " ";
public SequenceFileLoader(){}
public SequenceFileLoader(String delim){
this.delim = delim;
}
@Override
public void setLocation(String location , Job job)throws IOException{
// FileInputFormat.addInputPath(job , new Path(location));
FileInputFormat.setInputPaths(job , location);
}
@Override
public InputFormat getInputFormat(){
return new SequenceFileInputFormat<Text , Text>();
}
@SuppressWarnings("unchecked")
@Override
public void prepareToRead(RecordReader reader, PigSplit split){
this.reader = (RecordReader<Text , Text>)reader;
}
@Override
public Tuple getNext()throws IOException{
try{
if(!reader.nextKeyValue()){
return null;
}
Text key = (Text)reader.getCurrentKey();
Text value = (Text)reader.getCurrentValue();
String line = value.toString();
String[] elements = line.split(delim);
Tuple tuple = tupleFactory.newTuple(elements.length + 1);
tuple.set(0 , new DataByteArray(key.toString()));
for(int i = 0;i < elements.length; i++){
tuple.set(i+1 , new DataByteArray(elements[i]));
}
return tuple;
}catch(InterruptedException e){
throw new ExecException(e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment