分区器接口分析
package org.apache.kafka.clients.producer; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.Cluster; import java.io.Closeable; public interface Partitioner extends Configurable, Closeable { /** * 计算分区的核心方法 * 参数:1、主题 2、key 3、序列化后的key 4、消息 5、序列化后的消息 6、集群元数据 */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); /** * This is called when partitioner is closed. */ public void close(); }默认分区器源码分析
package org.apache.kafka.clients.producer.internals; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.utils.Utils; /** * *默认分区策略: * 如果在记录中指定了一个分区,那么就使用它 * 如果没有指定分区,但是存在一个key,那么根据该键的散列选择一个分区 * 如果没有分区或密钥,则以轮询方式选择一个分区 */ public class DefaultPartitioner implements Partitioner { private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>(); public void configure(Map<String, ?> configs) {} /** * 如果key为null: * if{可用分区数大于零,计数器对可用分区数取余}else{计数器对总分区数取余} * 如果key不为null:使用murmur2算法hash后对分区总数取余 */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } //如果第一次写入分区则生成随机数(即计数器为null),否则获取值自增1 private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement(); } public void close() {} }自定义分区器
public class PartitionerDemo implements Partitioner { private final AtomicInteger counter = new AtomicInteger(); @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { Integer partitions = cluster.partitionCountForTopic(topic); int paritionNum; if(keyBytes == null){ paritionNum = counter.getAndIncrement() % partitions; }else { paritionNum = Utils.toPositive(Utils.murmur2(keyBytes)) % partitions; } return paritionNum; } @Override public void close() {} @Override public void configure(Map<String, ?> configs) {} }指定分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,DefinePartitioner.class.getName())