标题是否不是很熟悉,面试不得必问啊
KafkaProducer
- 1、客户端暴露出来可以让开发人员调用的发送消息的方法send
- 2、send实际调用私有方法doSend获取集群信息(并且得到这条数据写哪个分区)
- 2.1获取kafka服务端集群某个topic的元数据方法waitOnMetadata
- 2.2根据消息是否指定分区或者实现Partitioner接口的实现类计算分区
1、客户端暴露出来可以让开发人员调用的发送消息的方法send
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {return this.send(record, (Callback)null);}public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);return this.doSend(interceptedRecord, callback);}
2、send实际调用私有方法doSend获取集群信息(并且得到这条数据写哪个分区)
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {TopicPartition tp = null;//......删除干扰理解的代码行KafkaProducer.ClusterAndWaitTime clusterAndWaitTime;try {clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), this.maxBlockTimeMs);} catch (KafkaException var20) {//.......删除干扰理解的代码行}//.......删除干扰理解的代码行Cluster cluster = clusterAndWaitTime.cluster;//.......删除干扰理解的代码行int partition = this.partition(record, serializedKey, serializedValue, cluster);tp = new TopicPartition(record.topic(), partition);//.......删除干扰理解的代码行//这里是把消息append,等待后续sender的线程被唤醒,统一发送,这篇文章不讲这里RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true);//.......删除干扰理解的代码行}
2.1获取kafka服务端集群某个topic的元数据方法waitOnMetadata
private KafkaProducer.ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {Cluster cluster = this.metadata.fetch();//.......删除干扰理解的代码行this.metadata.add(topic);Integer partitionsCount = cluster.partitionCountForTopic(topic);//它检查partitionsCount是否为非空(不为null)//如果partition为null或者小于partitionsCount//两个都为真则返回cluster,否则请求服务端更新集群的元数据if (partitionsCount != null && (partition == null || partition < partitionsCount)) {return new KafkaProducer.ClusterAndWaitTime(cluster, 0L);} else {//下面是请求服务端更新集群元数据的操作long begin = this.time.milliseconds();long remainingWaitMs = maxWaitMs;long elapsed;do {do {if (partition != null) {this.log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);} else {this.log.trace("Requesting metadata update for topic {}.", topic);}this.metadata.add(topic);int version = this.metadata.requestUpdate();//唤醒发送者线程,这样就会发送更新请求this.sender.wakeup();//下面等待响应数据返回try {this.metadata.awaitUpdate(version, remainingWaitMs);} catch (TimeoutException var15) {throw new TimeoutException(String.format("Topic %s not present in metadata after %d ms.", topic, maxWaitMs));}//集群topic最新元数据返回了,取出clustercluster = this.metadata.fetch();elapsed = this.time.milliseconds() - begin;if (elapsed >= maxWaitMs) {throw new TimeoutException(partitionsCount == null ? String.format("Topic %s not present in metadata after %d ms.", topic, maxWaitMs) : String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.", partition, topic, partitionsCount, maxWaitMs));}this.metadata.maybeThrowExceptionForTopic(topic);remainingWaitMs = maxWaitMs - elapsed;partitionsCount = cluster.partitionCountForTopic(topic);} while(partitionsCount == null);} while(partition != null && partition >= partitionsCount);//再把更新后的集群的元数据返回return new KafkaProducer.ClusterAndWaitTime(cluster, elapsed);}}
2.2根据消息是否指定分区或者实现Partitioner接口的实现类计算分区
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {//创建ProducerRecord时是否指定了分区,如果没有则嗲用org.apache.kafka.clients.producer.Partitioner的某个实现生成分区Integer partition = record.partition();return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);}
其中org.apache.kafka.clients.producer.Partitioner
接口现成的实现有三种,分别是DefaultPartitioner
、RoundRobinPartitioner
、UniformStickyPartitioner
、具体实现可以去看
其中默认的是DefaultPartitioner
KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer, ProducerMetadata metadata, KafkaClient kafkaClient, ProducerInterceptors interceptors, Time time) {ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer));//......删除干扰代码this.partitioner = (Partitioner)config.getConfiguredInstance("partitioner.class", Partitioner.class);//......删除干扰代码}
public class ProducerConfig extends AbstractConfig {private static final ConfigDef CONFIG;//......删除干扰代码static {//其他.define都删除了,防止理解有问题CONFIG = (new ConfigDef()).define("partitioner.class", Type.CLASS, DefaultPartitioner.class, Importance.MEDIUM, "Partitioner class that implements the <code>org.apache.kafka.clients.producer.Partitioner</code> interface.");} }
当然也可以指定其他的,也可以自定义实现org.apache.kafka.clients.producer.Partitioner接口