文章目录
- 前言
- 创建 kafka 生产者
- 同步与异步发送消息
- 同步发送
- 异步发送
- 生产者参数配置
- client.id
- acks
- 消息传递时间
- 序列化器
- 在Kafka中使用Avro记录
- 分区
- 标头
- 拦截器
- 配额和节流
前言
kafka 对外提供的 API 主要有两类:生产者 API 和 消费者 API,本文将从Kafka生产者的设计和组件讲起,学习如何使用Kafka生产者。将首先演示如何创建KafkaProducer对象和ProducerRecords对象、如何将记录发送给Kafka,以及如何处理Kafka返回的错误响应。然后介绍用于控制生产者行为的重要配置参数。最后深入探讨如何使用不同的分区方法和序列化器,以及如何自定义序列化器和分区器。
kafka 生产者概述
下图展示了 kafka 生产者往 broker 发送消息的流程交互图:
- 先从创建一个 ProducerRecord 对象开始,其中需要包含目标主题和要发送的内容。另外,还可以指定键、分区、时间戳或标头。在发送 ProducerRecord 对象时,生产者需要先把键和值对象序列化成字节数组,这样才能在网络上传输。
- 接下来,如果没有显式地指定分区,那么数据将被传给分区器。分区器通常会基于 ProducerRecord 对象的键选择一个分区。选好分区以后,生产者就知道该往哪个主题和分区发送这条消息了。紧接着,该消息会被添加到一个消息批次里,这个批次里的所有消息都将被发送给同一个主题和分区。有一个独立的线程负责把这些消息批次发送给目标 broker。
- broker 在收到这些消息时会返回一个响应。如果消息写入成功,就返回一个 RecordMetaData 对象,其中包含了主题和分区信息,以及消息在分区中的偏移量。如果消息写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,重试几次之后如果还是失败,则会放弃重试,并返回错误信息。
创建 kafka 生产者
创建 Producer 必填参数说明:
bootstrap.servers
- broker的地址。可以由多个 host:port 组成,生产者用它们来建立初始的 Kafka 集群连接。它不需要包含所有的 broker 地址,因为生产者在建立初始连接之后可以从给定的 broker 那里找到其他 broker的信息。不过还是建议至少提供两个 broker 地址,因为一旦其中一个停机,则生产者仍然可以连接到集群。
key.serializer
- 一个类名,用来将发送消息的键序列化为字节。往生产者中发送消息的时候,如果指定了键,那么必须设置该值。
- key.serializer必须被设置为一个实现了
org.apache.kafka.common.serialization.Serializer
接口的类,生产者会用这个类把键序列化成字节数组。Kafka客户端默认提供了ByteArraySerializer、StringSerializer和IntegerSerializer
等。
value.serializer
- 一个类名,用来将发送消息的值序列化为字节。
下面的代码片段演示了如何创建一个生产者。这里只指定了必需的属性,其他属性使用默认值。
Properties kafkaProps = new Properties(); ➊
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); ➋
kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");producer = new KafkaProducer<String, String>(kafkaProps); ➌
因为 Kafka 只能处理字节数组,我们在往 kafka 发送数据的时候必须将数据序列化为字节,我们可以通过对应语言自己的序列化方式直接将键或值序列化,也可以通过实现序列化器,来自定义将语言的内置数据类型对象转换为字节发送给生产者,这个时候我们就需要将序列化器的类名作为 **key.serializer 和 value.serializer **参数在创建生产者的时候指定。
kafka 往生产者发送消息的三种方式
发送并忘记
- 不关心消息是否发送成功,发送了就不管了,生产环境不建议使用。
同步发送
- 一般来说,生产者是异步的——我们调用send()方法发送消息,它会返回一个Future对象。可以调用get()方法等待Future完成,这样就可以在发送下一条消息之前知道当前消息是否发送成功。
异步发送
- 调用send()方法,并指定一个回调函数,当服务器返回响应时,这个函数会被触发。
同步与异步发送消息
最简单的消息发送方式如下所示:
ProducerRecord<String, String> record =new ProducerRecord<>("CustomerCountry", "Precision Products","France"); ➊
try {producer.send(record); ➋
} catch (Exception e) {e.printStackTrace(); ➌
}
- 创建
ProducerRecord
,指定 Topic, 键,值对。 - 调用
send()
方法来发送 ProducerRecord 对象, 消息会先被放进缓冲区,然后通过单独的线程发送给服务器端。这里我们忽略了消息是否发送成功。 - 在发送至 broker 前,发送 API 也可能会产生异常,可能是SerializationException(序列化消息失败)、BufferExhaustedException或TimeoutException(缓冲区已满),或者InterruptException(发送线程被中断)。
同步发送
如果采用同步发送方式,那么发送线程在这段时间内就只能等待,什么也不做,甚至都不发送其他消息,这将导致糟糕的性能,通常不会被用在生产环境中,代码如下:
ProducerRecord<String, String> record =new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {producer.send(record).get(); ➊
} catch (Exception e) {e.printStackTrace(); ➋
}
- 调用Future.get()方法等待Kafka响应。如果消息没有发送成功,那么这个方法将抛出一个异常。如果没有发生错误,那么我们将得到一个RecordMetadata对象,并能从中获取消息的偏移量和其他元数据。
- 如果在消息发送之前或发送过程中发生了错误,那么我们将捕捉到一个异常。这里只是简单地把异常信息打印了出来。
异步发送
异步发送只需要指定回调方法即可,代码如下:
private class DemoProducerCallback implements Callback { ➊@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e != null) {e.printStackTrace(); ➋}}
}ProducerRecord<String, String> record =new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA"); ➌
producer.send(record, new DemoProducerCallback()); ➍
- 为了使用回调,需要一个实现了org.apache.kafka.clients.producer.Callback接口的类,这个接口只有一个onCompletion方法。
- 如果Kafka返回错误,那么onCompletion方法会收到一个非空(nonnull)异常。这里只是简单地把它打印了出来,但在生产环境中应该使用更好的处理方式。
- 指定发送消息。
- 在发送消息时将回调对象传进去。
回调的执行将在生产者主线程中进行,如果有两条消息被发送给同一个分区,则这可以保证它们的回调是按照发送的顺序执行的。这就要求回调的执行要快,避免生产者出现延迟或影响其他消息的发送。不建议在回调中执行阻塞操作,阻塞操作应该被放在其他线程中执行。
生产者参数配置
除了上边提到的必填参数外,还有一些选填参数,在内存使用、性能和可靠性方面对生产者影响比较大,接下来将详细介绍它们。
client.id
- 客户端标识符,它的值可以是任意字符串,broker用它来识别从客户端发送过来的消息。
- client.id 可以被用在日志、指标和配额中。选择一个好的客户端标识符可以让故障诊断变得更容易些。
acks
acks 指定了生产者在多少个分区副本收到消息的情况下才会认为消息写入成功。
acks 有三种设置:0,1,all.
acks=0
- 不等待接收 broker 的回复,不知道消息发送是否成功。
- 能够以网络可支持的最大速度发送消息,从而达到很高的吞吐量。
acks=1
- broker 集群的 leader 收到消息,生产者就会收到消息写入成功的响应。如果 leader 服务奔溃,选举过程中,可能会导致消息丢失。
acks=all
- 只有当所有副本全部收到消息时,生产者才会收到消息成功写入的响应。
- 这种模式是最安全的,它可以保证不止一个broker收到消息,就算有个别broker发生崩溃,整个集群仍然可以运行。
- 不过,它的延迟比acks=1高,因为生产者需要等待不止一个broker确认收到消息。
:::success
为acks设置的值越小,生产者发送消息的速度就越快。也就是说,我们通过牺牲可靠性来换取较低的生产者延迟。不过,端到端延迟是指从消息生成到可供消费者读取的时间,这对3种配置来说都是一样的。这是因为为了保持一致性,在消息被写入所有同步副本之前,Kafka不允许消费者读取它们。因此,如果关心的是端到端延迟,而不是生产者延迟,那么就不需要在可靠性和低延迟之间做权衡了:你可以选择最可靠的配置,但仍然可以获得相同的端到端延迟。
:::
消息传递时间
kafka 生产者往 broke 发送消息是如何判断超时失败的,这个时间是可配置的。我们将ProduceRecord的发送时间分成如下两个时间间隔,它们是被分开处理的。
- 异步调用
send()
所花费的时间。在此期间,调用send()
的线程将被阻塞。 - 从异步调用send()返回到触发回调(不管是成功还是失败)的时间,也就是从ProduceRecord被放到批次中直到Kafka成功响应、出现不可恢复异常或发送超时的时间。
生产环境一般使用异步发送,下边讨论的时间都是异步发送过程中的时间,下图展示了生产者的内部数据流以及不同的配置参数如何相互影响:
max.block.ms
- 生产者在调用 send() 或者显示地请求集群元数据时阻塞等待的时间。
delivery.timeout.ms
- 该参数表示从生产者准备发送到获得 broker 响应的完整时间。
- 如果我们的服务配置了超时重试,那么在到达该时间后,还未获得响应,将进行重试。
- broker 集群选举通常需要一些时间,因此该时间最好要大于选举时间,比如说集群选举需要30s, 该参数配置为 120 可能更加合理。
request.timeout.ms
- 生产者在发送消息时等待服务器响应的时间。
- 如果设置的值已触及,但服务器没有响应,那么生产者将重试发送,或者执行回调,并传给它一个TimeoutException。
retries 和retry.backoff.ms
- 发送失败重试次数和重试间隔,如果不想重试,直接设置为 0。
linger.ms
- 生产者在发送消息批次之前等待更多消息加入批次的时间,生产者会在批次被填满或等待时间达到linger.ms时把消息批次发送出去。
- 设置该参数,会有一点延迟,但是将会极大提升吞吐量,减少生产者和 broker 交互次数,降低网络延迟带来的开销。
buffer.memory
- 表示生产者在调用 send() 后可以往缓冲区写入的大小,生产者在调用 send() 后会先往缓冲区写一批数据,然后达到阈值后,批量发送数据。
- 如果生产者 send() 速度大于缓冲区内存释放速度,很容易导致内存不足,抛出异常。
compression.type
- 数据压缩算法,可设置的有snappy、gzip、lz4或zstd。
- snappy 占用较少的CPU时间,但能提供较好的性能和相当可观的压缩比。如果同时有性能和网络带宽方面的考虑,那么可以使用这种算法。
- gzip 占用较多的CPU时间,但提供了更高的压缩比。如果网络带宽比较有限,则可以使用这种算法。
batch.size
- 一个批次可以使用的内存大小,一批数据达到该内存大小就会发送,即使只有一条数据,只要达到该内存大小,会发发送。
- 把批次大小设置得很大,也不会导致延迟,只是会占用更多的内存而已。但如果把批次大小设置得太小,则会增加一些额外的开销,因为生产者需要更频繁地发送消息。
max.in.flight.requests.per.connection
- 这个参数指定了生产者在收到服务器响应之前可以发送多少个消息批次。它的值越大,占用的内存就越多,不过吞吐量也会得到提升。
max.request.size
- 限制可发送的单条最大消息的大小和单个请求的消息总量的大小。
- broker对可接收的最大消息也有限制(message.max.bytes),其两边的配置最好是匹配的,以免生产者发送的消息被broker拒绝。
receive.buffer.bytes和send.buffer.bytes
- 这两个参数分别指定了TCP socket接收和发送数据包的缓冲区大小。
- 如果它们被设为–1,就使用操作系统默认值。
- 如果生产者或消费者与broker位于不同的数据中心,则可以适当加大它们的值,因为跨数据中心网络的延迟一般都比较高,而带宽又比较低。
enable.idempotence
-
是否开启**精确一次性(exactly once), **如果设置为开启,需要同时配置 acks=all, 并将delivery.timeout.ms设置为一个比较大的数,允许进行尽可能多的重试。
-
当幂等生产者被启用时,生产者将给发送的每一条消息都加上一个序列号。如果broker收到具有相同序列号的消息,那么它就会拒绝第二个副本,而生产者则会收到DuplicateSequenceException,这个异常对生产者来说是不需要关心和无影响的。
如果要启用幂等性,那么max.in.flight.requests.per.connection应小于或等于5、retries应大于0,并且acks被设置为all。如果设置了不恰当的值,则会抛出ConfigException异常。
序列化器
kafka 生产者发送的消息必须是字节类型的,kafka 生产者提供了一些简单的内置序列化器,比如说将字符串序列化为字节,但是并不能满足大部分场景,目前已有几种比较成熟的序列化方案:比如JSON、Avro、Thrift或Protobuf。我们也可以自定义一些序列化方式,将消息对象序列化为字节,但是除了一些定制化场景外,不建议自定义序列化器。
下边说明下如何在 kafka 生产者中使用 Avro 序列化消息。
在Kafka中使用Avro记录
在使用 Avro 对消息进行序列化的时候,每条消息都会包含对应的序列化 schema 信息,当发送消息条数不多的时候,可以忽略消息大小的增加。
但是如果消息量大,就会导致由于 schema 信息导致消息数据量剧增,因此我们需要使用到 schema 注册表的架构设计模式,生产者和消费者在序列化和反序列化的时候,都从 schema 注册表中查询 schema 信息,无需将 schema 信息保存在发送消息中。
交互流程如下图所示:
下面的例子演示了如何把生成的Avro对象发送给Kafka:
Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer","io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer","io.confluent.kafka.serializers.KafkaAvroSerializer"); ➊
props.put("schema.registry.url", schemaUrl); ➋String topic = "customerContacts";Producer<String, Customer> producer = new KafkaProducer<>(props); ➌// 不断生成新事件,直到有人按下Ctrl-C组合键
while (true) {Customer customer = CustomerGenerator.getNext(); ➍System.out.println("Generated customer " +customer.toString());ProducerRecord<String, Customer> record =new ProducerRecord<>(topic, customer.getName(), customer); ➎producer.send(record); ➏
}
- 使用KafkaAvroSerializer来序列化对象。需要注意的是,KafkaAvroSerializer也可以处理原始类型,这也是为什么稍后可以用String作为记录的键并用Customer对象作为记录的值。
- schema.registry.url是生产者要传给序列化器的参数,其指向模式的存储位置。
- Customer是生成的对象,也就是记录的值。
- Customer类不是一个普通的Java类(POJO),而是基于模式生成的Avro对象。Avro序列化器只能序列化Avro对象,不能序列化POJO。
- 实例化一个ProducerRecord对象,指定值的类型为Customer,并传给它一个Customer对象。
- 把Customer对象作为记录发送出去,KafkaAvroSerializer会处理剩下的事情。
也可以使用通用的Avro对象,就像使用map那样,这与基于模式生成的带有getter方法和setter方法的Avro对象不同。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer","io.confluent.kafka.serializers.KafkaAvroSerializer"); ➊
props.put("value.serializer","io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", url); ➋String schemaString ="{\"namespace\": \"customerManagement.avro\","\"type\": \"record\", " + ➌"\"name\": \"Customer\"," +"\"fields\": [" +"{\"name\": \"id\", \"type\": \"int\"}," +"{\"name\": \"name\", \"type\": \"string\"}," +"{\"name\": \"email\", \"type\": " + "[\"null\",\"string\"], " +"\"default\":\"null\" }" +"]}";
Producer<String, GenericRecord> producer =new KafkaProducer<String, GenericRecord>(props); ➍Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(schemaString);for (int nCustomers = 0; nCustomers < customers; nCustomers++) {String name = "exampleCustomer" + nCustomers;String email = "example " + nCustomers + "@example.com"GenericRecord customer = new GenericData.Record(schema); ➎customer.put("id", nCustomers);customer.put("name", name);customer.put("email", email);ProducerRecord<String, GenericRecord> data =new ProducerRecord<>("customerContacts", name, customer);producer.send(data);
}
- 仍然使用KafkaAvroSerializer。
- 提供同样的模式注册表URI。
- 也需要提供Avro模式,因为没有使用Avro生成的对象也就没有模式。
- 对象类型是GenericRecord,我们用模式和要写入的数据来初始化它。
- ProducerRecord的值就是包含了模式和要发送的数据的一个GenericRecord对象。序列化器知道如何从记录里获取模式、把它保存到注册表中,以及用它序列化对象。
分区
Kafka消息就是一个个的键–值对,ProducerRecord对象可以只包含主题名称和值,键默认情况下是null。不过,大多数应用程序还是会用键来发送消息。键有两种用途:
- 作为消息的附加信息与消息保存在一起
- 用来确定消息应该被写入主题的哪个分区,具有相同键的消息将被写入同一个分区。
不指定键
- 如果键为null,并且使用了默认的分区器,那么记录将被随机发送给主题的分区。分区器使用轮询调度(round-robin)算法将消息均衡地分布到各个分区中。
键不为空且使用了默认的分区器
- Kafka会对键进行哈希,然后根据哈希值把消息映射到特定的分区。
- 使用该方式时同一个键总是被映射到同一个分区,如果写入分区时该键对应的分区不可用,可能会导致使用该键的消息一直发送失败。
如果使用了默认的分区器,那么只有在不改变主题分区数量的情况下键与分区之间的映射才能保持一致。
例如,只要分区数量保持不变,就可以保证用户045189的记录总是被写到分区34。这样就可以在从分区读取数据时做各种优化。但是,一旦主题增加了新分区,这个就无法保证了——旧数据仍然留在分区34,但新记录可能被写到了其他分区。
如果要使用键来映射分区,那么最好在创建主题时就把分区规划好,而且永远不要增加新分区。
标头
除了键和值,记录还可以包含标头。可以在不改变记录键–值对的情况下向标头中添加一些有关记录的元数据。标头指明了记录数据的来源,可以在不解析消息体的情况下根据标头信息来路由或跟踪消息(消息有可能被加密,而路由器没有访问加密数据的权限)。
标头由一系列有序的键–值对组成。键是字符串,值可以是任意被序列化的对象,就像消息里的值一样。
下面这个简单的示例演示了如何给ProduceRecord添加标头。
ProducerRecord<String, String> record =new ProducerRecord<>("CustomerCountry", "Precision Products", "France");record.headers().add("privacy-level","YOLO".getBytes(StandardCharsets.UTF_8));
拦截器
Kafka的ProducerInterceptor拦截器包含两个关键方法。ProducerRecord<K, V> onSend(ProducerRecord<K, V> record)
这个方法会在记录被发送给Kafka之前,甚至是在记录被序列化之前调用。如果覆盖了这个方法,那么就可以捕获到有关记录的信息,甚至可以修改它。只需确保这个方法返回一个有效的ProducerRecord对象。这个方法返回的记录将被序列化并发送给Kafka。void onAcknowledgement(RecordMetadata metadata, Exception exception)
这个方法会在收到Kafka的确认响应时调用。如果覆盖了这个方法,则不可以修改Kafka返回的响应,但可以捕获到有关响应的信息。
常见的生产者拦截器应用场景包括:捕获监控和跟踪信息、为消息添加标头,以及敏感信息脱敏。
下面是一个非常简单的生产者拦截器示例,它只是简单地统计在特定时间窗口内发送和接收的消息数量:
public class CountingProducerInterceptor implements ProducerInterceptor {ScheduledExecutorService executorService =Executors.newSingleThreadScheduledExecutor();static AtomicLong numSent = new AtomicLong(0);static AtomicLong numAcked = new AtomicLong(0);public void configure(Map<String, ?> map) {Long windowSize = Long.valueOf((String) map.get("counting.interceptor.window.size.ms")); ➊executorService.scheduleAtFixedRate(CountingProducerInterceptor::run,windowSize, windowSize, TimeUnit.MILLISECONDS);}public ProducerRecord onSend(ProducerRecord producerRecord) {numSent.incrementAndGet();return producerRecord; ➋}public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {numAcked.incrementAndGet(); ➌}public void close() {executorService.shutdownNow(); ➍}public static void run() {System.out.println(numSent.getAndSet(0));System.out.println(numAcked.getAndSet(0));}
}
- ProducerInterceptor继承了Configurable接口。
- 每当发送了一条记录,就增加记录的计数,并原封不动地将记录返回。
- 每当收到Kafka的确认响应,就增加确认的计数,不返回任何东西。
- 这个方法会在生产者被关闭时调用,我们可以借助这个机会清理拦截器的状态。在这个示例中,我们关闭了之前创建的线程。如果你打开了文件句柄、与远程数据库建立了连接,或者做了其他类似的操作,那么可以在这里关闭所有的资源,以免发生资源泄漏。
配额和节流
Kafka可以限制生产消息和消费消息的速率,这是通过配额机制来实现的。Kafka提供了3种配额类型:生产、消费和请求。
- 生产配额和消费配额限制了客户端发送和接收数据的速率(以字节 / 秒为单位)。
- 请求配额限制了broker用于处理客户端请求的时间百分比。
- 默认的生产配额和消费配额是broker配置文件的一部分。如果要限制每个生产者平均发送的消息不超过2 MBps,那么可以在broker配置文件中加入quota.producer.default=2M。
- 也可以覆盖broker配置文件中的默认配额来为某些客户端配置特定的配额,尽管不建议这么做。如果允许clientA的配额达到4 MBps、clientB的配额达到10 MBps,则可以这样配置:quota.producer.override=“clientA:4M,clientB:10M”。
在配置文件中指定的配额都是静态的,如果要修改它们,则需要重启所有的broker。因为随时都可能有新客户端加入,所以这种配置方式不是很方便。因此,特定客户端的配额通常采用动态配置。可以用kafka-config.sh或AdminClient API来动态设置配额:
bin/kafka-configs --bootstrap-server localhost:9092 --alter --add-config 'producer_
byte_rate=1024' --entity-name clientC --entity-type clients ➊bin/kafka-configs --bootstrap-server localhost:9092 --alter --add-config 'producer_
byte_rate=1024,consumer_byte_rate=2048' --entity-name user1 --entity-type users ➋bin/kafka-configs --bootstrap-server localhost:9092 --alter --add-config 'consumer_
byte_rate=2048' --entity-type users ➌
如果异步调用Producer.send(),并且发送速率超过了broker能够接受的速率(无论是由于配额的限制还是由于处理能力不足),那么消息将会被放入客户端的内存队列。如果发送速率一直快于接收速率,那么客户端最终将耗尽内存缓冲区,并阻塞后续的Producer.send()调用。如果超时延迟不足以让broker赶上生产者,使其清理掉一些缓冲区空间,那么Producer.send()最终将抛出TimeoutException异常。或者,批次里的记录因为等待时间超过了delivery.timeout.ms而过期,导致执行send()的回调,并抛出TimeoutException异常。因此,要做好计划和监控,确保broker的处理能力总是与生产者发送数据的速率相匹配。