Created
March 22, 2012 14:52
-
-
Save tf0054/2158772 to your computer and use it in GitHub Desktop.
Hack#48
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.hadoophacks.pig; | |
import org.apache.pig.LoadFunc; | |
import org.apache.pig.data.Tuple; | |
import org.apache.pig.data.DataByteArray; | |
import org.apache.pig.data.TupleFactory; | |
import org.apache.pig.backend.executionengine.ExecException; | |
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.RecordReader; | |
import org.apache.hadoop.mapreduce.InputFormat; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.fs.Path; | |
import java.io.IOException; | |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; | |
import org.apache.hadoop.io.LongWritable; | |
public class SequenceFileLoader extends LoadFunc{ | |
private RecordReader<Text,Text> reader; | |
private TupleFactory tupleFactory = TupleFactory.getInstance(); | |
private String delim = " "; | |
public SequenceFileLoader(){} | |
public SequenceFileLoader(String delim){ | |
this.delim = delim; | |
} | |
@Override | |
public void setLocation(String location , Job job)throws IOException{ | |
// FileInputFormat.addInputPath(job , new Path(location)); | |
FileInputFormat.setInputPaths(job , location); | |
} | |
@Override | |
public InputFormat getInputFormat(){ | |
return new SequenceFileInputFormat<Text , Text>(); | |
} | |
@SuppressWarnings("unchecked") | |
@Override | |
public void prepareToRead(RecordReader reader, PigSplit split){ | |
this.reader = (RecordReader<Text , Text>)reader; | |
} | |
@Override | |
public Tuple getNext()throws IOException{ | |
try{ | |
if(!reader.nextKeyValue()){ | |
return null; | |
} | |
Text key = (Text)reader.getCurrentKey(); | |
Text value = (Text)reader.getCurrentValue(); | |
String line = value.toString(); | |
String[] elements = line.split(delim); | |
Tuple tuple = tupleFactory.newTuple(elements.length + 1); | |
tuple.set(0 , new DataByteArray(key.toString())); | |
for(int i = 0;i < elements.length; i++){ | |
tuple.set(i+1 , new DataByteArray(elements[i])); | |
} | |
return tuple; | |
}catch(InterruptedException e){ | |
throw new ExecException(e); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment