Skip to content

Instantly share code, notes, and snippets.

@omnisis
Last active December 13, 2015 17:48
Show Gist options
  • Save omnisis/4950540 to your computer and use it in GitHub Desktop.
Save omnisis/4950540 to your computer and use it in GitHub Desktop.
Weekly Partitioner for M/R Text keys with prefix long timestamps
package examples.mapred.partitioning;
import org.apache.commons.lang.time.DateUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import java.util.Calendar;
import java.util.TimeZone;
public class PartitionFunctionSnippet {
/**
* Calculates the number of partitions based on a date range span
* bounded by start and end.
* @param start Earliest start date
* @param end Latest end date
* @return
*/
public int calcNumPartitions(long start, long end) {
long diff = end - start;
// this is not an exact number because it doesn't use a reference calendar
// but if we are by one in either direction it's not a huge deal
int numWeeks = (int) ((diff) / (DateUtils.MILLIS_PER_DAY*7)) + 1;
return numWeeks;
}
}
package examples.mapred.partitioning;
import org.apache.commons.lang.time.DateUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import java.util.Calendar;
import java.util.TimeZone;
public class WeeklyPartitioner extends Partitioner<Text,StockQuote> {
/* Note that this is not thread-safe for calls such as roll(),add(),setTimeXX(), etc */
private Calendar c = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
/**
* Calculates a partition based on year and week in year of the timestamp portion of
* the key. In other words:
*
* partition function = f(key_year,key_week_in_year)
*
* Partition "0" is the partition corresponding to the current year and week in year. The
* partition numbers increase from there going backwards in time.
*
* NOTE: This is *NOT* thread-safe, not that we are worrying about that in M/R land.
*
* @param text
* @param numPartitions
* @return
*/
@Override
public int getPartition(Text text, StockQuote stockQuote, int numPartitions) {
Long ts = Long.valueOf(text.toString().substring(0, 13));
c.setTimeInMillis(System.currentTimeMillis());
int currYr = c.get(Calendar.YEAR) - 1900;
int currWk = c.get(Calendar.WEEK_OF_YEAR);
c.setTimeInMillis(ts);
int keyYr = c.get(Calendar.YEAR) - 1900;
int keyWk = c.get(Calendar.WEEK_OF_YEAR);
int partNum = (currYr*52 + currWk) - (keyYr*52 + keyWk);
return partNum % numPartitions;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment