目录
- Kafka核心API 及 生产者API讲解
- ★ Kafka的核心API
- Kafka包含如下5类核心API:
- ★ 生产者API
- Kafka 的API 文档
- ★ 使用生产者API发送消息
Kafka核心API 及 生产者API讲解
官方文档
★ Kafka的核心API
Kafka包含如下5类核心API:
Producer API(生产者API):
应用程序通过该API向主题发布消息。
Consumer API(消费者API):
应用程序通过该API订阅一个或多个主题,并从所订阅的主题中拉取消息(记录)
Streams API(流API):
应用程序可通过该API实现流处理器,可以将一个主题的消息“导流”到另一个主题,并能地对消息进行任意自定义的转换。
类似于 RabbitMQ 的 Exchange
Connector API(连接器API):
应用程序可通过这套API来实现连接器,这些连接器不断地从源系统或应用程序导入数据到Kafka,反过来也可将Kafka消息不断地导入某个接收系统或应用程序。
通过这个API,可以让应用程序和Kafka这个消息系统进行一个实时的交互,我们的系统可以不断的接收来自Kafka的消息,也可以让我们的程序不断的把数据导入到Kafka的消息系统中,就像是一个通道,所以叫连接API。
应用场景:我们的应用程序要和Kafka之间保持实时的数据流的时候,就可以用这个连接API。
AdminAPI(管理API):
应用程序可通过该API管理和检查主题、Broker和其他Kafka实体。
这5套API中,只有流API使用的是专门的JAR包。
其他都用的是org.apache.kafka:kafka-clients依赖库。
而流API用的是org.apache.kafka:kafka-streams依赖库。
★ 生产者API
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.1</version>
</dependency>
生产者API 的核心类是 KafkaProducer,它提供了一个 send()方法 来发送消息,该方法需要传入一个 ProducerRecord<K,V>对象。
ProducerRecord 代表了一条消息,Kafka 的消息是包含了key、value、timestamp。
ProducerRecord定义了如下6个构造器:
- ProducerRecord(String topic, Integer partition, K key, V value):创建一条发送到指定主题和指定分区的消息。- ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers):创建一条发送到指定主题和指定分区的消息,且包含多个消息头。- ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value):创建一条发送到指定主题和指定分区的消息,且使用给定的时间戳。- ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers):创建一条发送到指定主题和指定分区的消息、使用给定的时间戳,且包含多个消息头。- ProducerRecord(String topic, K key, V value):创建一条发送到指定主题的消息。- ProducerRecord(String topic, V value):创建一条发送到指定主题的、只带value,不带key的消息。
通过查 API 文档可看这个 ProducerRecord 消息对象 的6个构造器:
Kafka 的API 文档
Kafka 的API 文档
★ 使用生产者API发送消息
使用生产者API发送消息很简单,基本只要两步:
1、创建KafkaProducer对象,创建该对象时要传入Properties对象,用于对该生产者进行配置。
2、调用KafkaProducer对象的send()方法发送消息,调用ProducerRecord的构造器即可创建不同的消息。
3、发送完成后,关闭KafkaProducer对象。
为何Kafka的KafkaProducer需要一个Properties来来创建KafkaProducer?
因为Kafka的Producer API提供了海量的配置选项——如果你将这些配置选项每个都定义成方法,那将是一件让人无比痛苦的事情。
所以Kafka在设计该API时,就直接用了一个Properties来封装所有的配置属性。