Created
January 19, 2011 08:11
-
-
Save alienrobotwizard/785858 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.hadoop.hbase.mapreduce; | |
import java.io.ByteArrayOutputStream; | |
import java.io.DataOutputStream; | |
import java.io.IOException; | |
import java.util.Arrays; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Iterator; | |
import java.util.TreeSet; | |
import org.apache.commons.cli.CommandLine; | |
import org.apache.commons.cli.CommandLineParser; | |
import org.apache.commons.cli.GnuParser; | |
import org.apache.commons.cli.HelpFormatter; | |
import org.apache.commons.cli.Options; | |
import org.apache.commons.cli.ParseException; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.hbase.HBaseConfiguration; | |
import org.apache.hadoop.hbase.client.HTable; | |
import org.apache.hadoop.hbase.client.Put; | |
import org.apache.hadoop.hbase.client.Result; | |
import org.apache.hadoop.hbase.client.Scan; | |
import org.apache.hadoop.hbase.filter.BinaryComparator; | |
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; | |
import org.apache.hadoop.hbase.filter.FilterList; | |
import org.apache.hadoop.hbase.filter.RowFilter; | |
import org.apache.hadoop.hbase.KeyValue; | |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; | |
import org.apache.hadoop.hbase.mapreduce.TableInputFormat; | |
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; | |
import org.apache.hadoop.hbase.util.Base64; | |
import org.apache.hadoop.hbase.util.Bytes; | |
import org.apache.hadoop.mapreduce.InputFormat; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.OutputFormat; | |
import org.apache.hadoop.mapreduce.RecordReader; | |
import org.apache.hadoop.mapreduce.RecordWriter; | |
import org.apache.pig.LoadCaster; | |
import org.apache.pig.LoadFunc; | |
import org.apache.pig.LoadPushDown; | |
import org.apache.pig.ResourceSchema; | |
import org.apache.pig.ResourceSchema.ResourceFieldSchema; | |
import org.apache.pig.StoreFuncInterface; | |
import org.apache.pig.StoreFunc; | |
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; | |
import org.apache.pig.builtin.Utf8StorageConverter; | |
import org.apache.pig.data.DataBag; | |
import org.apache.pig.data.DataByteArray; | |
import org.apache.pig.data.DataType; | |
import org.apache.pig.data.Tuple; | |
import org.apache.pig.data.TupleFactory; | |
import org.apache.pig.impl.logicalLayer.FrontendException; | |
import org.apache.pig.impl.util.Utils; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import com.google.common.collect.Lists; | |
// | |
// This can work. Note: 1. Need to run a total sort over the whole | |
// data set. 2. Need to sort the keyvalues before writing. It may be | |
// that simply calling ORDER with no other arguments in the pig script | |
// itself will be fine. | |
// | |
public class HFileStorage extends StoreFunc { | |
protected RecordWriter writer = null; | |
private String tableURI; | |
private byte[] tableName; | |
private byte[] columnFamily; | |
private String[] columnNames; | |
/** | |
* Constructor. Construct a HFile StoreFunc to write data out as HFiles. These | |
* HFiles will then have to be imported with the hbase/bin/loadtable.rb tool. | |
* @param tN The HBase table name the data will ultimately wind up in. It does not need to exist ahead of time. | |
* @param cF The HBase column family name for the table the data will wind up it. It does not need to exist ahead of time. | |
* @param columnNames A comma separated list of column names descibing the fields in a tuple. | |
*/ | |
public HFileStorage(String tN, String cF, String names) { | |
this.tableName = Bytes.toBytes(tN); | |
this.columnFamily = Bytes.toBytes(cF); | |
this.columnNames = names.split(","); | |
} | |
// | |
// getOutputFormat() | |
// | |
// This method will be called by Pig to get the OutputFormat | |
// used by the storer. The methods in the OutputFormat (and | |
// underlying RecordWriter and OutputCommitter) will be | |
// called by pig in the same manner (and in the same context) | |
// as by Hadoop in a map-reduce java program. If the | |
// OutputFormat is a hadoop packaged one, the implementation | |
// should use the new API based one under | |
// org.apache.hadoop.mapreduce. If it is a custom OutputFormat, | |
// it should be implemented using the new API under | |
// org.apache.hadoop.mapreduce. The checkOutputSpecs() method | |
// of the OutputFormat will be called by pig to check the | |
// output location up-front. This method will also be called as | |
// part of the Hadoop call sequence when the job is launched. So | |
// implementations should ensure that this method can be called | |
// multiple times without inconsistent side effects. | |
public OutputFormat getOutputFormat() throws IOException { | |
HFileOutputFormat outputFormat = new HFileOutputFormat(); | |
return outputFormat; | |
} | |
// | |
// setStoreLocation() | |
// This method is called by Pig to communicate the store location | |
// to the storer. The storer should use this method to communicate | |
// the same information to the underlying OutputFormat. This | |
// method is called multiple times by pig - implementations should | |
// bear in mind that this method is called multiple times and should | |
// ensure there are no inconsistent side effects due to the multiple | |
// calls. | |
public void setStoreLocation(String location, Job job) throws IOException { | |
job.getConfiguration().set("mapred.textoutputformat.separator", ""); | |
FileOutputFormat.setOutputPath(job, new Path(location)); | |
} | |
// | |
// prepareToWrite() | |
// | |
// In the new API, writing of the data is through the OutputFormat provided | |
// by the StoreFunc. In prepareToWrite() the RecordWriter associated with | |
// the OutputFormat provided by the StoreFunc is passed to the StoreFunc. | |
// The RecordWriter can then be used by the implementation in putNext() to | |
// write a tuple representing a record of data in a manner expected by the | |
// RecordWriter. | |
public void prepareToWrite(@SuppressWarnings("rawtypes") RecordWriter writer) throws IOException { | |
this.writer = writer; | |
} | |
// | |
// Here we are going to get the following: | |
// | |
// (row_key, {(field1),(field2),...}) | |
// | |
// we must iterate through the tuples in the | |
// bag and insert them into a TreeSet for | |
// sorting. Then we need to iterate through | |
// the sorted set and serialize each column | |
// | |
// | |
@SuppressWarnings("unchecked") | |
public void putNext(Tuple t) throws IOException { | |
try { | |
byte[] rowKey = Bytes.toBytes(t.get(0).toString()); // use zeroth field as row key | |
//byte[] rowKey = ((DataByteArray)t.get(0)).get(); | |
DataBag columns = (DataBag)t.get(1); | |
ImmutableBytesWritable hbaseRowKey = new ImmutableBytesWritable(rowKey); | |
TreeSet<KeyValue> map = sortedKeyValues(rowKey, columns); | |
for (KeyValue kv: map) { | |
writer.write(hbaseRowKey, kv); | |
} | |
} catch (InterruptedException e) { | |
throw new IOException("Interrupted"); | |
} catch (NullPointerException e) { | |
System.out.println("@('_')@ Null pointer exception."); | |
} | |
} | |
private TreeSet<KeyValue> sortedKeyValues(byte[] rowKey, DataBag columns) throws IOException { | |
TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR); | |
long ts = System.currentTimeMillis(); | |
int idx = 0; | |
Iterator<Tuple> tupleIter = columns.iterator(); | |
while(tupleIter.hasNext()) { | |
byte[] columnName = Bytes.toBytes(columnNames[idx]); | |
byte[] value = Bytes.toBytes(tupleIter.next().get(0).toString()); | |
KeyValue kv = new KeyValue(rowKey, columnFamily, columnName, ts, value); | |
map.add(kv.clone()); | |
idx += 1; | |
} | |
return map; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
twuid = LOAD '/tmp/streamed/twitter_user_id/part-00000' AS (rsrc:chararray,uid:chararray,scrat:chararray,sn:chararray,prot:chararray,foll:chararray,friend:chararray,status:chararray,favo:chararray,crat:chararray,sid:chararray,is_full:chararray,health:chararray); | |
cut_fields = FOREACH twuid GENERATE uid, scrat, sn, prot,foll,friend,status,favo,crat,sid,is_full,health; | |
grpd = GROUP cut_fields BY uid PARALLEL 1; | |
DESCRIBE grpd; | |
rmf /tmp/pig_test/out | |
STORE grpd INTO '/tmp/pig_test/out' USING org.apache.hadoop.hbase.mapreduce.HFileStorage('Jacob', 'pig_test', 'screen_name,protected,followers_count,friends_count,statuses_count,favorites_count,created_at,search_id,is_full,health'); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment