分布式 - 消息队列Kafka:Kafka生产者发送消息的3种方式

文章目录

    • 1. Kafka 生产者
    • 2. kafaka 命令行操作
    • 3. Kafka 生产者发送消息流程
    • 4. Kafka 生产者发送消息的3种方式
      • 1. 发送即忘记
      • 2. 同步发送
      • 3. 异步发送
    • 5. Kafka 消息对象 ProducerRecord

1. Kafka 生产者

Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多个主题(topic)。生产者可以将消息发送到指定的主题,也可以根据分区策略将消息发送到多个分区中。生产者可以以异步或同步方式发送消息,并且可以配置消息的可靠性和持久性等属性。在 Kafka 中,生产者是消息的源头,它们将消息发送到 Kafka 集群中,供消费者消费。

2. kafaka 命令行操作

① 启动 Zookeeper 集群:

[root@master01 bin]# pwd
/root/ch/soft/zk/zk-01/bin
[root@master01 bin]# ./zkServer.sh start
[root@master01 bin]# pwd
/root/ch/soft/zk/zk-02/bin
[root@master01 bin]# ./zkServer.sh start
[root@master01 bin]# pwd
/root/ch/soft/zk/zk-03/bin
[root@master01 bin]# ./zkServer.sh start

② 启动 kafka 集群:

[root@master01 kafka01]# pwd
/root/ch/soft/kafka/kafka01
[root@master01 kafka01]# bin/kafka-server-start.sh config/server.properties
[root@master01 kafka02]# pwd
/root/ch/soft/kafka/kafka02
[root@master01 kafka02]# bin/kafka-server-start.sh config/server.properties
[root@master01 kafka03]# pwd
/root/ch/soft/kafka/kafka03
[root@master01 kafka03]# bin/kafka-server-start.sh config/server.properties

③ 创建主题 test:

[root@master01 kafka01]#  bin/kafka-topics.sh --zookeeper localhost:2183 --create --partitions 3 --replication-factor 2  --topic test
Created topic test.
[root@master01 kafka01]# bin/kafka-topics.sh --zookeeper localhost:2183 --describe --topic test
Topic:test      PartitionCount:3        ReplicationFactor:2     Configs:
Topic: test     Partition: 0    Leader: 2       Replicas: 2,1   Isr: 2,1
Topic: test     Partition: 1    Leader: 0       Replicas: 0,2   Isr: 0,2
Topic: test     Partition: 2    Leader: 1       Replicas: 1,0   Isr: 1,0

④ 生产者发送消息到主题test:

[root@master01 kafka01]# bin/kafka-console-producer.sh --broker-list 10.65.132.2:9093 --topic test
>hello
>你好,kafka!
>

⑤ 消费者消费主题test的消息:

[root@master01 kafka01]#  bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning
hello
你好,kafka!

3. Kafka 生产者发送消息流程

在这里插入图片描述

① 首先要构造一个 ProducerRecord 对象,该对象可以声明主题Topic、分区Partition、键 Key以及值 Value,主题和值是必须要声明的,分区和键可以不用指定。

② 调用send() 方法进行消息发送。

③ 因为消息要到网络上进行传输,所以必须进行序列化,序列化器的作用就是把消息的 key 和value对象序列化成字节数组。

④ 接下来数据传到分区器,如果之间的 ProducerRecord 对象指定了分区,那么分区器将不再做任何事,直接把指定的分区返回;如果没有,那么分区器会根据 Key 来选择一个分区,选择好分区之后,生产者就知道该往哪个主题和分区发送记录了。

⑤ 接着这条记录会被添加到一个记录批次里面,这个批次里所有的消息会被发送到相同的主题和分区。会有一个独立的线程来把这些记录批次发送到相应的 Broker 上。

⑥ Broker 服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka ,就返回一个RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败, 就会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败, 就返回错误信息。

4. Kafka 生产者发送消息的3种方式

发送消息主要有三种模式:发后即忘记、同步及异步。在同步模式下,程序会一直等待某个操作完成后才会继续执行下一个操作,在异步模式下,程序可以同时执行多个操作,不会阻塞其他操作。

KafkaProducer 的 send() 方法用于向 Kafka 集群发送消息。该方法的语法如下:

public interface Producer<K, V> extends Closeable {Future<RecordMetadata> send(ProducerRecord<K, V> record);Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
}

其中,ProducerRecord<K, V> 表示要发送的消息记录,K 和 V 分别表示键和值的类型。send() 方法返回一个 Future 对象,表示异步发送消息的结果。

1. 发送即忘记

发送即忘记,生产者发送消息后不会等待服务器的响应,直接发送下一条消息。它只管往Kafka中发送消息而并不关心消息是否正确到达。在大多数情况下,这种发送方式没有什么问题,不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。这种发送方式的性能最高,可靠性也最差。

public class CustomProducer01 {private static final String brokerList "10.65.132.2:9093";private static final String topic = "test";public static Properties initConfig(){Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());return properties;}public static void main(String[] args) {// kafka生产者属性配置Properties properties = initConfig();// kafka生产者发送消息,默认是异步发送方式KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, "你好,kafka!");try{// 发送消息kafkaProducer.send(producerRecord);}catch (Exception e){e.printStackTrace();}// 关闭资源kafkaProducer.close();}
}

cmd命令行窗口开启 kafka 消息者,观察消费者是否接收到消息:

[root@master01 kafka01]#  bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning
你好,kafka!

2. 同步发送

send()方法本身就是异步的,send()方法返回的Future对象可以使调用方稍后获得发送的结果。在执行send()方法之后可以调用 get()方法来阻塞等待Kafka的响应,直到消息发送成功,或者发生异常。如果发生异常,那么就需要捕获异常并交由外层逻辑处理。

Future 接口源码:

public interface Future<V> {boolean cancel(boolean mayInterruptIfRunning);boolean isCancelled();boolean isDone();V get() throws InterruptedException, ExecutionException;V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

Future接口是Java中用于表示异步计算结果的接口。它定义了一些方法,用于查询异步计算是否完成、获取计算结果等操作。

  • cancel方法用于取消异步计算;
  • isCancelled方法用于判断异步计算是否已经被取消;
  • isDone方法用于判断异步计算是否已经完成。
  • get方法用于获取异步计算的结果,如果计算还没有完成,则该方法会阻塞直到计算完成。如果计算被取消,则该方法会抛出CancellationException异常。如果计算抛出异常,则该方法会抛出ExecutionException异常。
  • get(long timeout, TimeUnit unit)方法与get方法类似,但是它会在指定的时间内等待计算完成,如果超时则会抛出TimeoutException异常。

Future 表示一个任务的生命周期,并提供了相应的方法来判断任务是否已经完成或取消,以及获取任务的结果和取消任务等。既然KafkaProducer.send()方法的返回值是一个Future类型的对象,那么完全可以用Java语言层面的技巧来丰富应用的实现,比如使用Future中的 get(long timeout,TimeUnit unit)方法实现可超时的阻塞。

public class CustomProducer01 {private static final String brokerList = "10.65.132.2:9093";private static final String topic = "test";public static Properties initConfig(){Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());return properties;}public static void main(String[] args) {// kafka生产者属性配置Properties properties = initConfig();// kafka生产者发送消息,默认是异步发送方式KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, "你好,kafka,同步发送!");try{// 发送消息Future<RecordMetadata> future = kafkaProducer.send(producerRecord);// 获取异步计算的结果,如果计算还没有完成,则该方法会阻塞直到计算完成RecordMetadata recordMetadata = future.get();System.out.println("metadata.topic() = " + recordMetadata.topic());}catch (Exception e){e.printStackTrace();}// 关闭资源kafkaProducer.close();}
}
[root@master01 kafka01]#  bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning
你好,kafka!
你好,kafka,同步发送!

在RecordMetadata对象里包含了消息的一些元数据信息,比如当前消息的主题、分区号、分区中的偏移量(offset)、时间戳等。

3. 异步发送

生产者发送消息后不会等待服务器的响应,而是通过回调函数来处理服务器的响应。回调函数会在 producer 收到 ack 时调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

public class CustomProducer01 {private static final String brokerList = "10.65.132.2:9093";private static final String topic = "test";public static Properties initConfig(){Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());return properties;}public static void main(String[] args) {// kafka生产者属性配置Properties properties = initConfig();// kafka生产者发送消息,默认是异步发送方式KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, "你好,kafka,异步发送带返回值!");try{// 发送消息kafkaProducer.send(producerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {// 说明消息发送成功if(e==null){System.out.println("metadata.topic() = " + recordMetadata.topic());System.out.println("metadata.partition() = " + recordMetadata.partition());}}});}catch (Exception e){e.printStackTrace();}// 关闭资源kafkaProducer.close();}
}
[root@master01 kafka01]#  bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning
你好,kafka!
你好,kafka,同步发送!
你好,kafka,异步发送带回调函数!

Kafka生产者异步发送消息时,可以通过指定回调函数来处理发送结果。当消息发送完成后,回调函数会被调用,以通知应用程序消息发送的结果。具体来说,当生产者成功发送消息时,回调函数会被传递一个RecordMetadata对象,该对象包含了发送消息的相关信息,如消息所在的分区、消息在分区中的偏移量等。如果发送消息失败,则回调函数会被传递一个非空的Exception对象,以指示发送失败的原因。

需要注意的是,回调函数是在生产者的I/O线程中被调用的,因此应该尽量避免在回调函数中执行耗时的操作,以免影响生产者的性能。

5. Kafka 消息对象 ProducerRecord

① ProducerRecord 成员变量:

public class ProducerRecord<K, V> {// 消息要发送到的主题private final String topic;// 消息要发送到的分区号,如果为null,则由Kafka自动选择分区private final Integer partition;// 消息的键private final K key;// 消息的值private final V value;// 消息的时间戳,如果为null,则使用当前时间戳private final Long timestamp;// 消息的头部信息private final Headers headers;// .....
}
  • topic和partition字段分别代表消息要发往的主题和分区号。
  • key是用来指定消息的键,它不仅是消息的附加信息,还可以用来计算分区号进而可以让消息发往特定的分区。前面提及消息以主题为单位进行归类,而这个key可以让消息再进行二次归类,同一个key的消息会被划分到同一个分区中。
  • value是指消息体,一般不为空,如果为空则表示特定的消息。
  • timestamp是指消息的时间戳,它有CreateTime和LogAppendTime两种类型,前者表示消息创建的时间,后者表示消息追加到日志文件的时间。

② ProducerRecord 构造函数:

public class ProducerRecord<K, V> {private final String topic;private final Integer partition;private final Headers headers;private final K key;private final V value;private final Long timestamp;public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {if (topic == null)throw new IllegalArgumentException("Topic cannot be null.");if (timestamp != null && timestamp < 0)throw new IllegalArgumentException(String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));if (partition != null && partition < 0)throw new IllegalArgumentException(String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));this.topic = topic;this.partition = partition;this.key = key;this.value = value;this.timestamp = timestamp;this.headers = new RecordHeaders(headers);}public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {this(topic, partition, timestamp, key, value, null);}public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {this(topic, partition, null, key, value, headers);}public ProducerRecord(String topic, Integer partition, K key, V value) {this(topic, partition, null, key, value, null);}public ProducerRecord(String topic, K key, V value) {this(topic, null, null, key, value, null);}public ProducerRecord(String topic, V value) {this(topic, null, null, null, value, null);}
}

生产者发送消息的分区选择逻辑:

  • 若指定Partition ID,则消息发送至指定的Partition
  • 若未指定Partition ID,但指定了Key,则消息会按照 hasy(key) 发送至对应Partition
  • 若既未指定Partition ID也没指定Key,则消息会按照round-robin模式发送到每个Partition
  • 若同时指定了Partition ID和Key,则消息只会发送到指定的Partition (Key不起作用,代码逻辑决定)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/29980.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Pytorch深度学习-----神经网络模型的保存与加载(VGG16模型)

系列文章目录 PyTorch深度学习——Anaconda和PyTorch安装 Pytorch深度学习-----数据模块Dataset类 Pytorch深度学习------TensorBoard的使用 Pytorch深度学习------Torchvision中Transforms的使用&#xff08;ToTensor&#xff0c;Normalize&#xff0c;Resize &#xff0c;Co…

Git介绍及常用命令详解

一、Git的概述 Git是一个分布式版本控制工具&#xff0c;通常用来对软件开发过程中的源代码文件进行管理。 Git 会跟踪我们对文件所做的更改&#xff0c;因此我们可以记录已完成的工作&#xff0c;并且可以在需要时恢复到特定或以前的版本。Git 还使多人协作变得更加容易&…

Linux系统中的自旋锁(两幅图清晰说明)

总结&#xff1a; 多CPU下的自旋锁采取的是忙等待&#xff08;原地打转&#xff09;机制&#xff0c;虽然忙等待的线程占用了它所在的cpu&#xff0c;但其他线程仍可放到其他CPU上执行。所以自旋锁上锁和解锁之间的临界区代码要尽量的短&#xff0c;最好不要超过5行&#xff0c…

jenkins流水线

1.拉取代码 https://gitee.com/Wjc_project/yygh-parent.git2、项目编译 mvn clean package -Dmaven.test.skiptrue ls hospital-manage/target3、构建镜像 ls hospital-manage/target docker build -t hospital-manage:latest -f hospital-manage/Dockerfile ./hospital-ma…

AWD攻防学习总结(草稿状态,待陆续补充)

AWD攻防学习总结 防守端1、修改密码2、备份网站3、备份数据库4、部署WAF5、部署文件监控脚本6、部署流量监控脚本/工具7、D盾扫描&#xff0c;删除预留webshell8、代码审计&#xff0c;seay/fortify扫描&#xff0c;漏洞修复及利用9、时刻关注流量和积分信息&#xff0c;掉分时…

业绩难言乐观,皓泽电子撤回上市申请,小米等为其关联方

撰稿|行星 来源|贝多财经 8月8日&#xff0c;深圳证券交易所披露的信息显示&#xff0c;由于河南皓泽电子股份有限公司&#xff08;下称“皓泽电子”&#xff09;及其保荐人主动要求撤回申请文件&#xff0c;深交所终止了皓泽电子的发行注册程序。 据此前招股书披露&#xff…

python爬虫实战(1)--爬取新闻数据

想要每天看到新闻数据又不想占用太多时间去整理&#xff0c;萌生自己抓取新闻网站的想法。 1. 准备工作 使用python语言可以快速实现&#xff0c;调用BeautifulSoup包里面的方法 安装BeautifulSoup pip install BeautifulSoup完成以后引入项目 2. 开发 定义请求头&#xf…

Fast Tone Mapping for High Dynamic Range Images

Abstract 我们提出了一种快速、有效、灵活的色调再现方法&#xff0c;在低动态范围再现设备中保留了高动态范围场景的可视性和对比度印象。 一个单一的参数控制能见度和对比度在一个简单和优雅的方式和互动速度。 新方法使用简单&#xff0c;计算效率高。 实验表明&#xff0c…

Jenkins+Docker+SpringCloud微服务持续集成

JenkinsDockerSpringCloud微服务持续集成 JenkinsDockerSpringCloud持续集成流程说明SpringCloud微服务源码概述本地运行微服务本地部署微服务 Docker安装和Dockerfile制作微服务镜像Harbor镜像仓库安装及使用在Harbor创建用户和项目上传镜像到Harbor从Harbor下载镜像 微服务持…

RK3568蓝牙程序开发过程

1、搭建蓝牙开发环境 蓝牙开发可以使用C语言开发或python语言开发&#xff0c;使用的是蓝牙开发库为bluez库。 本文开发使用python语言开发&#xff0c;安装bluez库&#xff0c;可以使用pip install PyBluez来安装。 如果安装不上的话&#xff0c;可以使用sudo apt install pyt…

Kafka与Zookeeper版本对应关系

文章目录 了解版本对应Kafka安装包Kafka源码包 了解 比如&#xff1a; kafka_2.11-1.1.1.jar包 其中2.11表示的是Scala的版本&#xff0c;因为Kafka服务器端代码完全由Scala语音编写。”-“后面的1.1.1表示的kafka的版本信息。遵循一个基本原则&#xff0c;Kafka客户端版本和服…

无涯教程-Perl - getnetbyname函数

描述 此函数返回由NAME指定的网络信息(在列表context中)($name,$aliases,$addrtype,$net) 语法 以下是此函数的简单语法- getnetbyname NAME返回值 此函数在错误时返回undef,否则在标量context中返回网络地址,在错误时返回空列表,否则在列表context中返回网络记录(名称,别…

错误: XXXAdapter不是抽象的, 并且未覆盖Adapter中的抽象方法onBindViewHolder(ViewHolder,int)

一、问题描述 在学习Android可侧滑删除的RecyclerView的时候&#xff0c;遇到了下面的报错 错误: SwipeDelAdapter不是抽象的, 并且未覆盖Adapter中的抽象方法onBindViewHolder(ViewHolder,int) public class SwipeDelAdapter extends RecyclerView.Adapter { ^ 在上面的…

操作系统—调度算法

进程调度算法 进程调度算法也称CPU调度算法 调度发生时期 当进程从运行状态转到等待状态&#xff1b;当进程从运行状态转到就绪状态&#xff1b;当进程从等待状态转到就绪状态&#xff1b;当进程从运行状态转到终止状态&#xff1b; 其中发生在 1 和 4 两种情况下的调度称为…

物理层扩展以太网

扩展站点与集线器之间的距离&#xff1a;   在10BASE-T星型以太网中&#xff0c;可使用光纤和一对光纤调制解调器来扩展站点与集线器之间的距离。   为站点和集线器各增加一个用于电信号和光信息号转换的光纤调制解调器&#xff0c;以及他们之间的通信光纤。 扩展共享式以太…

Unity开发笔记:截取指定位置含有UI的场景截图并输出

学习记录整理&#xff0c;自用&#xff0c;也希望能帮助到有相同需求的人。 如果直接截全图&#xff1a; string screenshotName "Assets/Textures/UI/20230803/2.png";ScreenCapture.CaptureScreenshot(screenshotName);截取指定位置含有UI的场景截图&#xff1a; …

uniapp input输入框placeholder文本右对齐

input输入框placeholder文本右对齐 给input标签加上placeholder-class&#xff0c;这个是给placeholder设置样式&#xff0c;右对齐这就是text-align:right;字体颜色之类依次编辑即可。

flutter开发实战-TextPainter计算文本内容的宽度

flutter开发实战-TextPainter计算文本内容的宽度 最近开发过程中根据Text文本的大小判断是否需要进行显示跑马灯效果&#xff0c;获取文本的大小&#xff0c;需要TextPainter来获取Size 一、TextPainter TextPainter主要用于实现文本的绘制。TextPainter类可以将TextSpan渲染…

Babylon.js着色器简明简称【Shader】

推荐&#xff1a;用 NSDT设计器 快速搭建可编程3D场景 为了生成 BabylonJS 场景&#xff0c;需要用 Javascript 编写代码&#xff0c;BabylonJS 引擎会处理该代码并将结果显示在屏幕上。 场景可以通过改变网格、灯光或摄像机位置来改变。 为了及时显示可能的变化&#xff0c;屏…

【架构设计】如何设计一个高性能短链系统

一、前言 所谓系统设计&#xff0c;就是给一个场景&#xff0c;让你给出对应的架构设计&#xff0c;需要考虑哪些问题&#xff0c;采用什么方案解决。很多面试官喜欢出这么一道题来考验你的知识广度和逻辑思考能力。 虽然各个系统千差万别&#xff0c;但是设计思想基本一致&a…