Kafka - 异步/同步发送API

文章目录

  • 异步发送
    • 普通异步发送
      • 异步发送流程
      • Code
    • 带回调函数的异步发送
      • 带回调函数的异步发送流程
      • Code
  • 同步发送API

在这里插入图片描述


异步发送

普通异步发送

需求:创建Kafka生产者,采用异步的方式发送到Kafka broker

异步发送流程

在这里插入图片描述

Code

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version>
</dependency>
package com.artisan.pc;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class CustomProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");// key,value序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {RecordMetadata art = kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-" + i)).get();System.out.println(art.offset());System.out.println("over - " + i);}// 5. 关闭资源kafkaProducer.close();}}

输出

31
over - 0
32
over - 1
33
over - 2
34
over - 3
35
over - 4
36
over - 5
37
over - 6
38
over - 7
39
over - 8
40
over - 9

忽略我这个offset … 我都发了好多次了…

看控制台的吧

在这里插入图片描述


带回调函数的异步发送

回调函数callback()会在producer收到ack时调用,为异步调用。

该方法有两个参数分别是RecordMetadata(元数据信息)和Exception(异常信息)。

  • 如果Exception为null,说明消息发送成功,
  • 如果Exception不为null,说明消息发送失败

带回调函数的异步发送流程

在这里插入图片描述

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

Code

package com.artisan.pc;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class CustomProducerWithCallBack {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");// key,value序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 添加回调// 该方法在Producer收到ack时调用,为异步调用kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-callback-" + i), (recordMetadata, e) -> {// 没有异常,输出信息到控制台System.out.println("主题" + recordMetadata.topic() + ", 分区:" + recordMetadata.partition() + ", 偏移量:" + recordMetadata.offset());});}// 5. 关闭资源kafkaProducer.close();}}

在这里插入图片描述

控制台

在这里插入图片描述


同步发送API

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。
由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可

在这里插入图片描述

package com.artisan.pc;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");// key,value序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 通过Future接口的get实现同步阻塞kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-get-" + i)).get() ;}// 5. 关闭资源kafkaProducer.close();}}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/121445.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据结构与算法之矩阵: Leetcode 48. 旋转矩阵 (Typescript版)

旋转图像 https://leetcode.cn/problems/rotate-image/ 描述 给定一个 n n 的二维矩阵 matrix 表示一个图像。请你将图像顺时针旋转 90 度。你必须在 原地 旋转图像&#xff0c;这意味着你需要直接修改输入的二维矩阵。请不要 使用另一个矩阵来旋转图像。 示例 1 输入&…

【Unity数据交互】JsonUtility的“爱恨情仇“

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;Uni…

宝塔面板安装Python和Flask(新版Python项目)

&#xff08;一&#xff09;宝塔面板的项目菜单&#xff0c;打开Python项目的“项目版本管理” 安装Python版本3.10.0。 会创建一个Python版本的文件夹www/server/pyproject_evn/versions/ 会创建一个Python虚拟环境的文件夹www/server/pyproject_evn/python_venv/ &#xf…

USB学习(3):USB描述符和USB类设备

文章目录 1 USB描述符(Descriptors)1.1 设备描述符(Device Descriptor)1.2 配置描述符(Configuration Descriptor)1.3 接口关联描述符(Interface Association Descriptor)1.4 接口描述符(Interface Descriptor)1.5 端点描述符(Endpoint Descriptor)1.6 字符串描述符(String Des…

极米科技H6 Pro 4K、H6 4K高亮定焦版——开启家用投影4K普及时代

智能投影产业经过几年发展&#xff0c;市场规模正在快速扩大。洛图数据显示&#xff0c;预计今年中国投影出货量有望超700万台&#xff0c;2027年达950万台&#xff0c;可见智能投影产业规模将逐渐壮大&#xff0c;未来可期。2023年&#xff0c;投影行业呈现出全新面貌&#xf…

【观察】Dell APEX云平台:引领多云时代上云新范式

毫无疑问&#xff0c;过去十多年是云计算发展的黄金十年&#xff0c;云计算理念不断被市场所接受&#xff0c;但随着企业上云深入和认知度的不断增加&#xff0c;摆在很多企业面前的选择题也发生了新变化&#xff0c;即从过去企业上云或不上云的纠结&#xff0c;转变成今天如何…

文件混淆-界面介绍

目录 文件混淆-界面介绍 顶部介绍 中间文件列表区介绍 底部功能介绍 介绍文件混淆界面功能选项和操作流程 文件混淆-界面介绍 文件混淆功能区域包括3个功能区&#xff1a;顶部显示过滤区、中间文件列表区、底部的是否混淆开关 顶部介绍 显示控制区域&#xff0c;这个区…

【漏洞复现】酒店宽带运营系统RCE

漏洞描述 安美数字 酒店宽带运营系统 server_ping.php 远程命令执行漏洞 免责声明 技术文章仅供参考&#xff0c;任何个人和组织使用网络应当遵守宪法法律&#xff0c;遵守公共秩序&#xff0c;尊重社会公德&#xff0c;不得利用网络从事危害国家安全、荣誉和利益&#xff…

一文了解GC垃圾回收

一文了解GC垃圾回收 1 判断一个对象为垃圾对象的方法 引用计数法(弃用) 可达性分析算法 是否有指向GC root 的引用链&#xff0c;如果有&#xff0c;不是垃圾对象 ---->GC roo:即rt.jar包中内容 2 内存泄漏与内存溢出区别 泄漏&#xff1a;原本需要被回收的对象&#…

C++系列之list的模拟实现

&#x1f497; &#x1f497; 博客:小怡同学 &#x1f497; &#x1f497; 个人简介:编程小萌新 &#x1f497; &#x1f497; 如果博客对大家有用的话&#xff0c;请点赞关注再收藏 &#x1f31e; list的节点类 template struct list_Node { public: list_Node* _prev; list_…

关于本地项目上传到gitee的详细流程

如何上传本地项目到Gitee的流程&#xff1a; 1.Gitee创建项目 2. 进入所在文件夹&#xff0c;右键点击Git Bash Here 3.配置用户名和邮箱 在gitee的官网找到命令&#xff0c;注意这里的用户名和邮箱一定要和你本地的Git相匹配&#xff0c;否则会出现问题。 解决方法如下&…

linux上java -jar方式运行项目及输出文件nohup.out的清理, linux上定时器的用法

linux上java -jar方式运行项目及输出文件nohup.out的清理&#xff0c; linux上定时器的用法 linux上java -jar方式运行定期自动清理nohup.out文件的内容**验证**定时器crontab使用时注意事项 linux上java -jar方式运行 参考&#xff1a;https://blog.csdn.net/qq_42169450/arti…

2023 MathorCup(妈妈杯) 数学建模挑战赛B题完整解题思路+模型+代码

2023妈妈杯数学建模B题完整版思路、模型代码已出&#xff01;&#xff01;&#xff01; 云顶数模最新完整版解题思路、模型代码&#xff0c;供大家参考~~ B题目 解题思路 详细模型解析&#xff1a;

开启CETOS 裸奔了一年的服务器开启firewall防火墙

记录一下关于firewall&#xff0c;博主非运维专家或服务器专家。 背景 客户有一台裸奔运行了一年多的系统有公网但发现没有开防火墙&#xff0c;iptables和firewall均是关闭状态&#xff0c;通过扫描发现很多漏洞。根据客户要求对端口进行重新梳理且关闭不必要或有潜在风险的…

ubuntu安装nps客户端

Ubuntu安装nps客户端 1.什么是nps内网穿透&#xff1f;2.设备情况3.下载客户端3.链接服务端3.1、无配置文件模式3.2、注册到系统服务(启动启动、监控进程) 1.什么是nps内网穿透&#xff1f; nps是一款轻量级、高性能、功能强大的内网穿透代理服务器。目前支持tcp、udp流量转发…

云端代码编辑器Atheos

什么是 Atheos &#xff1f; Atheos是一个基于 Web 的 IDE 框架&#xff0c;占用空间小且要求最低&#xff0c;构建于 Codiad 之上&#xff0c;不过 Atheos 已从原始 Codiad 项目完全重写&#xff0c;以利用更现代的工具、更简洁的代码和更广泛的功能。 注意事项 群晖内核版本太…

浅谈中国汽车充电桩行业市场状况及充电桩选型的介绍

安科瑞虞佳豪 车桩比降低是完善新能源汽车行业配套的一大重要趋势&#xff0c;目前各国政府都在努力推进政策&#xff0c;通过税收减免、建设补贴等措施提升充电桩建设速度&#xff0c;以满足新能源汽车需求。 近年来&#xff0c;在需求和技术的驱动下&#xff0c;充电桩的平…

Mac怎么删除文件和软件?苹果电脑删除第三方软件方法

Mac删除程序这个话题为什么一直重复说或者太多人讨论呢&#xff1f;因为如果操作不当&#xff0c;可能会导致某些不好的影响。因为Mac电脑如果有太多无用的应用程序&#xff0c;很有可能会拖垮Mac系统的运行速度。或者如果因为删除不干净&#xff0c;导致残留文件积累在Mac电脑…

Jmeter的接口自动化测试

在去年实施了一年的三端&#xff08;PC、无线M站、无线APP【Android、IOS】&#xff09;后&#xff0c;今年7月份开始&#xff0c;我们开始进行接口自动化的实施&#xff0c;目前已完成了整个框架的搭建以及接口的持续测试集成。今天做个简单的分享。 在开始自动化投入前&#…

第14期 | GPTSecurity周报

GPTSecurity是一个涵盖了前沿学术研究和实践经验分享的社区&#xff0c;集成了生成预训练 Transformer&#xff08;GPT&#xff09;、人工智能生成内容&#xff08;AIGC&#xff09;以及大型语言模型&#xff08;LLM&#xff09;等安全领域应用的知识。在这里&#xff0c;您可以…