Last active
December 13, 2015 17:48
-
-
Save omnisis/4950540 to your computer and use it in GitHub Desktop.
Weekly Partitioner for M/R Text keys with prefix long timestamps
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package examples.mapred.partitioning; | |
import org.apache.commons.lang.time.DateUtils; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Partitioner; | |
import java.util.Calendar; | |
import java.util.TimeZone; | |
public class PartitionFunctionSnippet { | |
/** | |
* Calculates the number of partitions based on a date range span | |
* bounded by start and end. | |
* @param start Earliest start date | |
* @param end Latest end date | |
* @return | |
*/ | |
public int calcNumPartitions(long start, long end) { | |
long diff = end - start; | |
// this is not an exact number because it doesn't use a reference calendar | |
// but if we are by one in either direction it's not a huge deal | |
int numWeeks = (int) ((diff) / (DateUtils.MILLIS_PER_DAY*7)) + 1; | |
return numWeeks; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package examples.mapred.partitioning; | |
import org.apache.commons.lang.time.DateUtils; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Partitioner; | |
import java.util.Calendar; | |
import java.util.TimeZone; | |
public class WeeklyPartitioner extends Partitioner<Text,StockQuote> { | |
/* Note that this is not thread-safe for calls such as roll(),add(),setTimeXX(), etc */ | |
private Calendar c = Calendar.getInstance(TimeZone.getTimeZone("GMT")); | |
/** | |
* Calculates a partition based on year and week in year of the timestamp portion of | |
* the key. In other words: | |
* | |
* partition function = f(key_year,key_week_in_year) | |
* | |
* Partition "0" is the partition corresponding to the current year and week in year. The | |
* partition numbers increase from there going backwards in time. | |
* | |
* NOTE: This is *NOT* thread-safe, not that we are worrying about that in M/R land. | |
* | |
* @param text | |
* @param numPartitions | |
* @return | |
*/ | |
@Override | |
public int getPartition(Text text, StockQuote stockQuote, int numPartitions) { | |
Long ts = Long.valueOf(text.toString().substring(0, 13)); | |
c.setTimeInMillis(System.currentTimeMillis()); | |
int currYr = c.get(Calendar.YEAR) - 1900; | |
int currWk = c.get(Calendar.WEEK_OF_YEAR); | |
c.setTimeInMillis(ts); | |
int keyYr = c.get(Calendar.YEAR) - 1900; | |
int keyWk = c.get(Calendar.WEEK_OF_YEAR); | |
int partNum = (currYr*52 + currWk) - (keyYr*52 + keyWk); | |
return partNum % numPartitions; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment