Last active
August 29, 2015 13:55
-
-
Save mlehman/8772354 to your computer and use it in GitHub Desktop.
Secondary Sort in Avro using Pair
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.avro.Schema; | |
import org.apache.avro.io.BinaryData; | |
import org.apache.avro.mapred.AvroKey; | |
import org.apache.avro.mapred.Pair; | |
import org.apache.avro.mapreduce.AvroJob; | |
import org.apache.avro.reflect.ReflectData; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.io.RawComparator; | |
import java.util.ArrayList; | |
import java.util.List; | |
/** | |
* Example of Secondary Sort in Avro using Pair with org.apache.avro.mapreduce to order values in the reducer. | |
* | |
* Pair by default sorts by key only. By using our own version of the Pair Schema in a sort Comparator | |
* that uses both the key and value, we can create a comparator usable for doing a secondary sort. | |
* | |
* With a job with a Mapper producing output of keys (AvroKey<Pair<K,V>>) and values (AvroValue<T>), this will result | |
* in the Reducer receiving the keys grouped by the Pair key (K) and the values AvroValue<T> sorted by the Pair value (V). | |
* | |
* In your job | |
* <pre>{@code | |
* // Usage: Set the MapOutput for your pair | |
* // AvroJob by default will set the sort and group comparators to AvroKeyComparator which will group and sort by the Pair's key only. | |
* AvroJob.setMapOutputKeySchema(job,Pair.getPairSchema(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.INT))); | |
* | |
* // Then set the SortComparator (overriding AvroKeyComparator set by AvroJob for sort only) | |
* job.setSortComparatorClass(AvroSecondarySortComparator.class); | |
* | |
* }</pre> | |
* */ | |
public class AvroSecondarySortComparator<K,V> extends Configured implements RawComparator<AvroKey<Pair<K,V>>> { | |
public static final String KEY = "key"; | |
public static final String VALUE = "value"; | |
Schema schema; | |
@Override | |
public void setConf(Configuration conf) { | |
super.setConf(conf); | |
if (conf != null) { | |
Schema pairSchema = AvroJob.getMapOutputKeySchema(conf); | |
schema = getSortableValuePair(pairSchema.getField(KEY).schema(), | |
pairSchema.getField(VALUE).schema()); | |
} | |
} | |
private Schema getSortableValuePair(Schema key, Schema value) { | |
Schema pair = Schema.createRecord(Pair.class.getName(), null, null, false); | |
List<Schema.Field> fields = new ArrayList<Schema.Field>(); | |
fields.add(new Schema.Field(KEY, key, "", null)); | |
fields.add(new Schema.Field(VALUE, value, "", null)); //Change from Pair's Schema.Field.Order.IGNORE | |
pair.setFields(fields); | |
return pair; | |
} | |
@Override | |
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { | |
return BinaryData.compare(b1, s1, l1, b2, s2, l2, schema); | |
} | |
public int compare(AvroKey<Pair<K,V>> x, AvroKey<Pair<K,V>> y) { | |
return ReflectData.get().compare(x.datum(), y.datum(), schema); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment