Last active
April 9, 2018 10:04
-
-
Save t0mmyt/6558b18d5e6ffb656ddcf1aaf0731592 to your computer and use it in GitHub Desktop.
Custom Partitioner for specifying partition in key
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.kafka.clients.producer.Partitioner; | |
import org.apache.kafka.common.Cluster; | |
import org.apache.kafka.common.PartitionInfo; | |
import org.apache.kafka.common.utils.Utils; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.ConcurrentMap; | |
import java.util.concurrent.ThreadLocalRandom; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.regex.Matcher; | |
import java.util.regex.Pattern; | |
public class CustomPartitioner implements Partitioner { | |
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>(); | |
private final Pattern pattern = Pattern.compile("^partition-\\d+$"); | |
/** | |
* Compute the partition for the given record. | |
* | |
* @param topic The topic name | |
* @param key The key to partition on (or null if no key) | |
* @param keyBytes serialized key to partition on (or null if no key) | |
* @param value The value to partition on or null | |
* @param valueBytes serialized value to partition on or null | |
* @param cluster The current cluster metadata | |
*/ | |
@Override | |
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { | |
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); | |
int numPartitions = partitions.size(); | |
if (keyBytes == null) { | |
int nextValue = nextValue(topic); | |
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); | |
if (availablePartitions.size() > 0) { | |
int part = Utils.toPositive(nextValue) % availablePartitions.size(); | |
return availablePartitions.get(part).partition(); | |
} else { | |
// no partitions are available, give a non-available partition | |
return Utils.toPositive(nextValue) % numPartitions; | |
} | |
} else { | |
Matcher matcher = pattern.matcher(key.toString()); | |
if (matcher.matches()) { | |
return Integer.parseInt(matcher.group(1)) % numPartitions; | |
} | |
// hash the keyBytes to choose a partition | |
return getPartitionByHash(keyBytes, numPartitions); | |
} | |
} | |
private static int getPartitionByHash(byte[] keyBytes, int numPartitions) { | |
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; | |
} | |
@Override | |
public void close() { | |
} | |
@Override | |
public void configure(Map<String, ?> configs) { | |
} | |
private int nextValue(String topic) { | |
AtomicInteger counter = topicCounterMap.get(topic); | |
if (null == counter) { | |
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); | |
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); | |
if (currentCounter != null) { | |
counter = currentCounter; | |
} | |
} | |
return counter.getAndIncrement(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment