Skip to content

Instantly share code, notes, and snippets.

@jayunit100
Last active December 25, 2015 05:18
Show Gist options
  • Save jayunit100/6923234 to your computer and use it in GitHub Desktop.
Save jayunit100/6923234 to your computer and use it in GitHub Desktop.
An example of a generating input format for map/reduce jobs with no hard coded input
/**
* An input split that generates (firstname lastname product) tuples.
* These simulate transactions at a pet store by clients.
* First iteration of the BigPetStore blueprint project.
*/
public static class TransactionGeneratorInputFormat extends
FileInputFormat<Text,Text> {
static final Integer TRANSACTIONS = 100;
static String[] FIRSTNAMES = new String[]{
"jay",
"john",
"jim",
"diana",
"duane",
"david",
"peter",
"paul",
"matthias",
"hyacinth",
"jacob",
"andrew",
"andy",
"mischa",
"enno",
"sanford",
"shawn"
};
static String[] LASTNAMES = new String[]{
"vyas",
"macleroy",
"watt",
"childress",
"shaposhnick",
"bukatov",
"govind",
"jones",
"stevens",
"yang",
"fu",
"ghandi",
"watson",
"walbright",
"samuelson"
};
static String[] PRODUCTS = new String[]{
"dog food",
"cat food",
"fish food",
"little chew toy",
"big chew toy",
"dog treats (hard)",
"dog treats (soft)",
"premium dog food",
"premium cat food",
"pet deterrent",
"flea collar",
"turtle food",
};
@Override
public RecordReader<Text, Text> createRecordReader(
InputSplit arg0, TaskAttemptContext arg1) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return new RecordReader() {
@Override
public void close() throws IOException {
}
Text name, transaction;
Random r = new Random();
@Override
public Text getCurrentKey() throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return name;
}
@Override
public Text getCurrentValue() throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return transaction;
}
@Override
public void initialize(InputSplit arg0, TaskAttemptContext arg1)
throws IOException, InterruptedException {
}
int soFar=0;
@Override
public boolean nextKeyValue() throws IOException,
InterruptedException {
name=new Text(FIRSTNAMES[r.nextInt(FIRSTNAMES.length-1)]+"_"+LASTNAMES[r.nextInt(LASTNAMES.length-1)]);
transaction=new Text(PRODUCTS[r.nextInt(PRODUCTS.length-1)]);
//continue returning a new mock transaction
//until we exceed the number of transactions.
System.out.println(name + " " + transaction);
return soFar++ < TRANSACTIONS;
}
@Override
public float getProgress() throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return (float)soFar/(float)TRANSACTIONS;
}
};
}
@Override
public List<InputSplit> getSplits(JobContext arg) throws IOException {
int transactionsPerSplit = arg.getConfiguration().getInt("transactions",10);
int splits = arg.getConfiguration().getInt("transaction_files",2);
List<InputSplit> l = Lists.newArrayList();
for(int i = 0 ; i < splits ; i++){
log.info(i+ " Adding a new input split of size " + transactionsPerSplit);
l.add(new PetStoreTransactionGeneratorJob.TransactionInputSplit(/*transactionsPerSplit*/));
}
return l;
}
}
public static class TransactionInputSplit extends InputSplit implements Writable {
public void readFields(DataInput arg0) throws IOException {
}
public void write(DataOutput arg0) throws IOException {
}
@Override
public String[] getLocations() throws IOException, InterruptedException {
return new String[] {};
}
@Override
public long getLength() throws IOException, InterruptedException {
return 100;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment