Kafka - 异步/同步发送API

文章目录

  • 异步发送
    • 普通异步发送
      • 异步发送流程
      • Code
    • 带回调函数的异步发送
      • 带回调函数的异步发送流程
      • Code
  • 同步发送API

在这里插入图片描述


异步发送

普通异步发送

需求:创建Kafka生产者,采用异步的方式发送到Kafka broker

异步发送流程

在这里插入图片描述

Code

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version>
</dependency>
package com.artisan.pc;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class CustomProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");// key,value序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {RecordMetadata art = kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-" + i)).get();System.out.println(art.offset());System.out.println("over - " + i);}// 5. 关闭资源kafkaProducer.close();}}

输出

31
over - 0
32
over - 1
33
over - 2
34
over - 3
35
over - 4
36
over - 5
37
over - 6
38
over - 7
39
over - 8
40
over - 9

忽略我这个offset … 我都发了好多次了…

看控制台的吧

在这里插入图片描述


带回调函数的异步发送

回调函数callback()会在producer收到ack时调用,为异步调用。

该方法有两个参数分别是RecordMetadata(元数据信息)和Exception(异常信息)。

  • 如果Exception为null,说明消息发送成功,
  • 如果Exception不为null,说明消息发送失败

带回调函数的异步发送流程

在这里插入图片描述

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

Code

package com.artisan.pc;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class CustomProducerWithCallBack {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");// key,value序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 添加回调// 该方法在Producer收到ack时调用,为异步调用kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-callback-" + i), (recordMetadata, e) -> {// 没有异常,输出信息到控制台System.out.println("主题" + recordMetadata.topic() + ", 分区:" + recordMetadata.partition() + ", 偏移量:" + recordMetadata.offset());});}// 5. 关闭资源kafkaProducer.close();}}

在这里插入图片描述

控制台

在这里插入图片描述


同步发送API

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。
由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可

在这里插入图片描述

package com.artisan.pc;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");// key,value序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 通过Future接口的get实现同步阻塞kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-get-" + i)).get() ;}// 5. 关闭资源kafkaProducer.close();}}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/121445.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据结构与算法之矩阵: Leetcode 48. 旋转矩阵 (Typescript版)

旋转图像 https://leetcode.cn/problems/rotate-image/ 描述 给定一个 n n 的二维矩阵 matrix 表示一个图像。请你将图像顺时针旋转 90 度。你必须在 原地 旋转图像&#xff0c;这意味着你需要直接修改输入的二维矩阵。请不要 使用另一个矩阵来旋转图像。 示例 1 输入&…

【Unity数据交互】JsonUtility的“爱恨情仇“

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;Uni…

react中使用监听

在 React 中&#xff0c;您可以使用 addEventListener 函数来监听事件。以下是一个示例&#xff1a; import React, { useRef, useEffect } from react;function App() {const inputRef useRef(null);useEffect(() > {inputRef.current.addEventListener(input, handleInp…

宝塔面板安装Python和Flask(新版Python项目)

&#xff08;一&#xff09;宝塔面板的项目菜单&#xff0c;打开Python项目的“项目版本管理” 安装Python版本3.10.0。 会创建一个Python版本的文件夹www/server/pyproject_evn/versions/ 会创建一个Python虚拟环境的文件夹www/server/pyproject_evn/python_venv/ &#xf…

USB学习(3):USB描述符和USB类设备

文章目录 1 USB描述符(Descriptors)1.1 设备描述符(Device Descriptor)1.2 配置描述符(Configuration Descriptor)1.3 接口关联描述符(Interface Association Descriptor)1.4 接口描述符(Interface Descriptor)1.5 端点描述符(Endpoint Descriptor)1.6 字符串描述符(String Des…

极米科技H6 Pro 4K、H6 4K高亮定焦版——开启家用投影4K普及时代

智能投影产业经过几年发展&#xff0c;市场规模正在快速扩大。洛图数据显示&#xff0c;预计今年中国投影出货量有望超700万台&#xff0c;2027年达950万台&#xff0c;可见智能投影产业规模将逐渐壮大&#xff0c;未来可期。2023年&#xff0c;投影行业呈现出全新面貌&#xf…

0032【Edabit ★☆☆☆☆☆】【每秒帧数】Frames Per Second

0032【Edabit ★☆☆☆☆☆】【每秒帧数】Frames Per Second algorithms language_fundamentals math numbers Instructions Create a function that returns the number of frames shown in a given number of minutes for a certain FPS. Examples frames(1, 1) // 60 fra…

【观察】Dell APEX云平台:引领多云时代上云新范式

毫无疑问&#xff0c;过去十多年是云计算发展的黄金十年&#xff0c;云计算理念不断被市场所接受&#xff0c;但随着企业上云深入和认知度的不断增加&#xff0c;摆在很多企业面前的选择题也发生了新变化&#xff0c;即从过去企业上云或不上云的纠结&#xff0c;转变成今天如何…

全景环视AVM标定

目录 一、前言 二、鱼眼模型 三、标定流程 四、角点提取 4.1 亚像素坐标计算

文件混淆-界面介绍

目录 文件混淆-界面介绍 顶部介绍 中间文件列表区介绍 底部功能介绍 介绍文件混淆界面功能选项和操作流程 文件混淆-界面介绍 文件混淆功能区域包括3个功能区&#xff1a;顶部显示过滤区、中间文件列表区、底部的是否混淆开关 顶部介绍 显示控制区域&#xff0c;这个区…

Go 怎么操作 OSS 阿里云对象存储

1 介绍 在项目开发中&#xff0c;我们经常会使用对象存储&#xff0c;比如 Amazon 的 S3&#xff0c;腾讯云的 COS&#xff0c;阿里云的 OSS 等。本文我们以阿里云 OSS 为例&#xff0c;介绍怎么使用 Go 操作对象存储。 阿里云 OSS 提供了 REST Api 和 OSS Go SDK&#xff0…

Day38 力扣动态规划 :70.爬楼梯 |322. 零钱兑换 |279. 完全平方数

Day38 力扣动态规划 :70.爬楼梯 &#xff5c;322. 零钱兑换 &#xff5c;279. 完全平方数 70. 爬楼梯 &#xff08;进阶&#xff09;第一印象看完题解的思路实现中的困难感悟代码 322. 零钱兑换第一印象dp数组递推公式初始化遍历顺序如果凑不出来返回 -1 看完题解的思路实现中…

【漏洞复现】酒店宽带运营系统RCE

漏洞描述 安美数字 酒店宽带运营系统 server_ping.php 远程命令执行漏洞 免责声明 技术文章仅供参考&#xff0c;任何个人和组织使用网络应当遵守宪法法律&#xff0c;遵守公共秩序&#xff0c;尊重社会公德&#xff0c;不得利用网络从事危害国家安全、荣誉和利益&#xff…

一文了解GC垃圾回收

一文了解GC垃圾回收 1 判断一个对象为垃圾对象的方法 引用计数法(弃用) 可达性分析算法 是否有指向GC root 的引用链&#xff0c;如果有&#xff0c;不是垃圾对象 ---->GC roo:即rt.jar包中内容 2 内存泄漏与内存溢出区别 泄漏&#xff1a;原本需要被回收的对象&#…

C++系列之list的模拟实现

&#x1f497; &#x1f497; 博客:小怡同学 &#x1f497; &#x1f497; 个人简介:编程小萌新 &#x1f497; &#x1f497; 如果博客对大家有用的话&#xff0c;请点赞关注再收藏 &#x1f31e; list的节点类 template struct list_Node { public: list_Node* _prev; list_…

关于本地项目上传到gitee的详细流程

如何上传本地项目到Gitee的流程&#xff1a; 1.Gitee创建项目 2. 进入所在文件夹&#xff0c;右键点击Git Bash Here 3.配置用户名和邮箱 在gitee的官网找到命令&#xff0c;注意这里的用户名和邮箱一定要和你本地的Git相匹配&#xff0c;否则会出现问题。 解决方法如下&…

linux上java -jar方式运行项目及输出文件nohup.out的清理, linux上定时器的用法

linux上java -jar方式运行项目及输出文件nohup.out的清理&#xff0c; linux上定时器的用法 linux上java -jar方式运行定期自动清理nohup.out文件的内容**验证**定时器crontab使用时注意事项 linux上java -jar方式运行 参考&#xff1a;https://blog.csdn.net/qq_42169450/arti…

2023 MathorCup(妈妈杯) 数学建模挑战赛B题完整解题思路+模型+代码

2023妈妈杯数学建模B题完整版思路、模型代码已出&#xff01;&#xff01;&#xff01; 云顶数模最新完整版解题思路、模型代码&#xff0c;供大家参考~~ B题目 解题思路 详细模型解析&#xff1a;

android中集成ffmpeg

在java程序中集成ffmpeg库&#xff1a; 编写JNI接口编译 FFmpeg 库和 JNI 接口在 Java 代码中加载 JNI 接口 在android程序中也需要一样的步骤。或者用一些别人已经编译好的一些库&#xff0c;比如android中的ffmpeg-android-java或者mobile-ffmpeg

原型、原型对象、原型链

1、什么是原型&#xff08;隐式原型、显式原型&#xff09; JavaScript 的所有对象中都有一个私有属性&#xff0c;我们一般称之为隐式原型&#xff08;__proto__&#xff09;&#xff0c;它指向的是构建出这个实例的类的显式原型&#xff08;prototype&#xff09;&#xff0c…