首先介绍一下 Kafka 生产者发送消息的过程:
- Kafka 会将发送消息包装为 ProducerRecord 对象, ProducerRecord 对象包含了目标主题和要发送的内容,同时还可以指定键和分区。在发送 ProducerRecord 对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。
- 接下来,数据被传给分区器。如果之前已经在 ProducerRecord 对象里指定了分区,那么分区器就不会再做任何事情。如果没有指定分区 ,那么分区器会根据 ProducerRecord 对象的键来选择一个分区,紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的 broker 上。
- 服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka,就返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,如果达到指定的重试次数后还没有成功,则直接抛出异常,不再重试。
二、创建生产者
2.1 项目依赖
本项目采用 Maven 构建,想要调用 Kafka 生产者 API,需要导入 kafka-clients 依赖,如下:
org.apache.kafka kafka-clients 2.2.0复制代码
2.2 创建生产者
创建 Kafka 生产者时,以下三个属性是必须指定的:
- bootstrap.servers :指定 broker 的地址清单,清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找 broker 的信息。不过建议至少要提供两个 broker 的信息作为容错;
- key.serializer :指定键的序列化器;
- value.serializer :指定值的序列化器。
创建的示例代码如下:
public class SimpleProducer { public static void main(String[] args) { String topicName = "Hello-Kafka"; Properties props = new Properties(); props.put("bootstrap.servers