kafka(四)消息类型

一、同步消息

1、生产者

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同 步发送的效果,只需在调用 Future 对象的 get 方发即可。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 默认为异步发送kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i));// 末尾加get为同步发送kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i)).get();}// 5. 关闭资源kafkaProducer.close();}
}

二、异步消息

1、生产者

异步消息有两种:

1.1、普通异步
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducer {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first", "wtyy"));}// 5. 关闭资源kafkaProducer.close();}
}
1.2、带回调函数的异步发送

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class CustomProducerCallBack {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):// 序列化器的serialization是一个接口,找到他的实现类// 我们一般都是使用Stringproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i),new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {//(1)消息发送成功  exception == null  接受到服务端ack消息   调用该方法//(2)消息发送失败  exception != null  也会调用该方法if (exception == null) {System.out.println(metadata);//使用打印演示}else{exception.printStackTrace();//打印异常信息}}});}// 5. 关闭资源kafkaProducer.close();}
}

三、顺序消息

以订单为例,

  • 生产者将相同的key的订单状态事件推送到kafka的同一分区
  • kafka 消费者接收消息
  • 消费者将消息提交给线程池
  • 线程池根据接收到的消息,将订单状态事件使用路由策略选择其中一个线程,将具有相同路由key的事件发送到同一个线程的阻塞队列中
  • 单个线程不停的从阻塞队列获取订单状态消息消费

@RestController
public class OrderController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/send")public String send() throws InterruptedException {int size = 1000;for (int i = 0; i < size; i++) {OrderDto orderDto = new InterOrderDto();orderDto.setOrderNo(i + "");orderDto.setPayStatus(getStatus(0));orderDto.setTimestamp(System.currentTimeMillis());//相同的key发送到相同的分区kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(1));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(2));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));}return "success";}private String getStatus(int status){return status == 0 ? "待支付" : status == 1 ? "已支付" : "支付失败";}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/36936.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【数据结构】计数排序等排序

&#x1f4e2;博客主页&#xff1a;https://blog.csdn.net/2301_779549673 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01; &#x1f4e2;本文由 JohnKi 原创&#xff0c;首发于 CSDN&#x1f649; &#x1f4e2;未来很长&#…

Ubuntu系统中创建桌面快捷方式和添加Favorites

一. Ubuntu系统中创建软件的桌面快捷方式 Ubuntu桌面创建某个软件的桌面快捷方式&#xff0c;一个直观的方法。 方法1. 在图像界面下&#xff0c;一层一层地打开文件目录软件快捷方式/usr/share/applications/ 方法2. 或者在终端运行$ nautilus /usr/share/applications/ …

MQ - RabbitMQ、SpringAMQP --学习笔记

什么是MQ&#xff1f; MQ 是消息队列&#xff08;Message Queue&#xff09;的缩写&#xff0c;它是一种应用程序间异步通信的技术。消息队列允许应用程序或服务间通过发送消息来交换数据&#xff0c;而不是直接调用对方&#xff0c;从而实现解耦、异步处理和负载均衡等目的。…

零成本打造精品宣传册

​随着互联网的发展&#xff0c;企业和个人对宣传册的需求日益增长&#xff0c;然而&#xff0c;高质量的宣传册制作往往需要不菲的成本。那么&#xff0c;如何零成本打造精品宣传册呢&#xff1f; 一、明确定位和目标群体 在制作宣传册之前&#xff0c;首先要明确其定位和目标…

qt pro文件常用配置

概述 记录一下常用的项目pro文件的一些常用配置 常用配置 QT core gui concurrent#添加concurrent并行处理模块 CONFIG windeployqt#打包部署&#xff0c;项目->构建步骤->Make参数 添加windeployqt&#xff0c;编译自动打包greaterThan(QT_MAJOR_VERSION, 4):…

Kafka入门到精通(三)-Kafka

Kafka简介 Kafka是由Apache软件基金会开发的一个开源流处理平台&#xff0c;由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统&#xff0c;它可以处理消费者在网站中的所有动作流数据。 这种动作&#xff08;网页浏览&#xff0c;搜索和其他用户的行动&#xf…

JeecgBoot新建模块

引言 jeecg-boot设置了demo, system等默认模块。在二次开发中&#xff0c;常常需要进行模块扩展。比如新增一个订单模块或支付模块。如何准确的新增模块&#xff0c;在此文进行记录。 步骤 新建模块 在项目点击右键&#xff0c;新建模块。 如下图。 注意&#xff1a;报名需…

鸿蒙NEXT开发知识:工具常用命令—ohpm config

设置ohpm用户级配置项。 命令格式 ohpm config set <key> <value> ohpm config get <key> ohpm config delete <key> ohpm config list 说明 配置文件中信息以键值对<key> <value>形式存在。 功能描述 ohpm 从命令行和 .ohpmrc 文件中…

Linux命令----wc,uniq,sort的用法

1.wc的用法&#xff1a;wc 命令用于计算文件中的行数、单词数和字节数。 常用选项 -l&#xff1a;只显示行数-w&#xff1a;只显示单词数-c&#xff1a;只显示字节数-m&#xff1a;只显示字符数&#xff08;与 -c 类似&#xff0c;但处理多字节字符&#xff09;-L&#xff1a…

day22--77. 组合+216.组合总和III+17.电话号码的字母组合

一、77. 组合 题目链接&#xff1a;https://leetcode.cn/problems/combinations/ 文章讲解&#xff1a;https://programmercarl.com/0077.%E7%BB%84%E5%90%88.html 视频讲解&#xff1a;https://www.bilibili.com/video/BV1ti4y1L7cv 1.1 初见思路 组合问题用回溯学会使用剪…

SpringBoot:SpringBoot中调用失败如何重试

一、引言 在实际的应用中&#xff0c;我们经常需要调用第三方API来获取数据或执行某些操作。然而&#xff0c;由于网络不稳定、第三方服务异常等原因&#xff0c;API调用可能会失败。为了提高系统的稳定性和可靠性&#xff0c;我们通常会考虑实现重试机制。 Spring Retry为Spri…

基于uni-app与图鸟UI的移动应用模板构建研究

摘要 随着移动互联网技术的迅猛发展&#xff0c;移动端应用已成为企业展示形象、提供服务的重要窗口。本文基于uni-app框架和图鸟UI设计&#xff0c;深入探讨了如何高效构建覆盖多个领域的移动端应用模板。通过对商城、办公、投票、生活服务等多种类型模板的详细介绍&#xff…

Educational Codeforces Round 112 (Rated for Div. 2) C. Coin Rows(构造 + 贪心 + 前缀和)

可以知道爱丽丝的路径是拐两次弯的折线 那么我们知道鲍勃能够选择的位置只有两段黄线中的一段 所以可以求出来第二行的后缀和&#xff0c;然后求出来第一行的前缀行&#xff0c;这样鲍勃在爱丽丝分割之后的情况下就会选择这两者中最大的一段&#xff0c;然而爱丽丝也会阻碍鲍…

Open AI Stream Completion Set Variable Inside Function PHP With Openai-php SDK

题意&#xff1a;使用 OpenAI 的 PHP SDK&#xff08;例如 openai-php&#xff09;来在函数内部设置和完成一个流&#xff08;stream&#xff09;相关的变量 问题背景&#xff1a; How to set variable inside this openai-php sdk function in stream completion ? I am usi…

华为HCIA综合实验(结合前几期所有内容)

第一章 实验目的 &#xff08;1&#xff09;配置Telnet&#xff0c;要求所有网络设备支持远程管理&#xff0c;密码为admin&#xff08;2&#xff09;配置Trunk&#xff0c;交换机之间的链路均为Trunk模式&#xff08;3&#xff09;配置VLAN&#xff0c;在SW2和SW3上创建相关…

Qt6.6编译Qt二维图形编辑器QVGE源码

QVGE是一个开源的多平台QtC编写的图形编辑器&#xff0c;可以用来画网络节点图&#xff0c;或者其他作用。 QVGE可以轻松创建和参数设定的小型到中型图形(1000节点/边缘)&#xff0c;共同的视觉特性的节点和边缘&#xff1a;形状、尺寸、颜色、标签等。定义(用户定义)属性的图表…

深度学习Week18——学习残差网络和ResNet-50算法

文章目录 深度学习Week18——学习残差网络和ResNet-50算法 一、前言 二、我的环境 三、前期工作 1、配置环境 2、导入数据 2.1 加载数据 2.2 配置数据集 2.3 数据可视化 2.4 再次检查数据 四、构建ResNet-50网络模型 五、编译模型 六、训练模型 七、模型评估 八、指定图片预测 …

leetCode.92. 反转链表 II

leetCode.92. 反转链表 II 题目思路 代码 /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode() : val(0), next(nullptr) {}* ListNode(int x) : val(x), next(nullptr) {}* ListNode(int x, ListNode …

【Python数据分析与可视化】:使用【Matplotlib】实现销售数据的全面分析 ——【Matplotlib】数模学习

目录 安装Matplotlib 1.打开PyCharm&#xff1a; 2.打开终端&#xff1a; 3.安装Matplotlib&#xff1a; 4.确认安装&#xff1a; 导入Matplotlib 创建简单的折线图 代码解析&#xff1a; 创建子图 代码解析&#xff1a; 创建柱状图 代码解析&#xff1a; 创建散点…

Vite响应Ajax请求

Vite响应Ajax请求 陈拓 2024/06/20-2024/06/24 1. 概述 http-server、live-server 等常用于本地测试和开发的http服务器不能很好的支持 ES 模块&#xff0c;在测试ES 模块时浏览器控制台经常显示错误&#xff1a; Failed to load module script: Expected a JavaScript modu…