kafka复习:(23)事务

一、生产者,开启事务。

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;public class KafkaTest22 {public static void main(String[] args) {Properties properties= new Properties();properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");properties.setProperty(ProducerConfig.RETRIES_CONFIG, "2");properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "4");properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "myTransaction");KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(properties);ProducerRecord<String,String> producerRecord=new ProducerRecord<>("study2024",0,"fff","hello sister,now is: "+ new Date());kafkaProducer.initTransactions();kafkaProducer.beginTransaction();try{Future<RecordMetadata> future = kafkaProducer.send(producerRecord);long offset = 0;try {offset = future.get().offset();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}System.out.println(offset);Thread.sleep(60000);kafkaProducer.commitTransaction();} catch (Exception ex){kafkaProducer.abortTransaction();}kafkaProducer.close();}
}

二、消费者,设置隔离级别为"read_committed"

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.time.temporal.TemporalUnit;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;public class KafkaTest23 {private static Properties getProperties(){Properties properties=new Properties();properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"testGroup");properties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");//properties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted");return properties;}public static void main(String[] args) {KafkaConsumer<String,String> myConsumer=new KafkaConsumer<String, String>(getProperties());String topic="study2024";myConsumer.subscribe(Arrays.asList(topic));while(true){ConsumerRecords<String,String> consumerRecords=myConsumer.poll(Duration.ofMillis(5000));for(ConsumerRecord record: consumerRecords){System.out.println(record.value());System.out.println("record offset is: "+record.offset());}}}
}

三、运行结果,按照上述配置,当生产者发送消息并从kafka broker获取到offset后就会sleep,在生产者sleep的时候,消费者是获取不到消息的,只有sleep完成并提交事务之后,消费者才会获取到消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/63790.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【GoLang】go入门:go语言执行过程分析 常见数据类型(基本数据类型)

1、go语言执行过程分析 【1】执行流程分析 通过 go build 进行编译 运行上一步生成的可执行文件 通过 go run 命令直接运行 【2】上述两种执行流程的区别 在编译时&#xff0c;编译器会将程序运行时依赖的库文件包含在可执行文件中&#xff0c;所以可执行文件会变大很多通过g…

RPA技术介绍与应用价值

一、什么是RPA技术? RPA(Robotic Process Automation)机器人流程自动化,是一种能够模拟人类来执行重复性任务的新型技术。RPA可实现统筹安排、自动化业务处理,并提升业务工作流处理效率。用户只需通过图形方式显示的计算机操作界面对RPA软件进行动态设定即可。借助RPA (R…

多维时序 | Matlab实现BiLSTM-Adaboost和BiLSTM多变量时间序列预测对比

多维时序 | Matlab实现BiLSTM-Adaboost和BiLSTM多变量时间序列预测对比 目录 多维时序 | Matlab实现BiLSTM-Adaboost和BiLSTM多变量时间序列预测对比预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 多维时序 | Matlab实现BiLSTM-Adaboost和BiLSTM多变量时间序列预…

day-38 代码随想录算法训练营(19)动态规划part01

509.佩波纳契数列 思路&#xff1a; 1.dp存储第i个位置的数值2.递归公式&#xff1a;dp[i]dp[i-1]dp[i-2];3.初始化&#xff1a;dp[0]0,dp[1]14.遍历顺序&#xff1a;2-n 70.爬楼梯 思路&#xff1a; 1.dp存储的是&#xff0c;当前i位置&#xff0c;有dp[i]中方法到达2.dp[…

VScode 国内下载源 以及 nvm版本控制器下载与使用

VScode 国内下载源 进入官网 https://code.visualstudio.com/ 点击下载 复制下载链接到新的浏览器标签 将地址中的/stable前的az764295.vo.msecnd.net换成vscode.cdn.azure.cn&#xff0c;再回车就会直接在下载列表啦。 参考大神博客 2.使用nvm 对 node 和npm进行版本控制…

23062网络编程day7

网络聊天室编写&#xff08;基于UDP&#xff09; 服务器 #include <myhead.h>#define PORT 8888 //端口号&#xff1a;接收方绑定的端口号 #define IP "192.168.114.56" //本机IP#define ERR_MSG(msg) do{\fprintf(stderr, "__%d__:&…

uni-app 报错 navigateTo:fail page “/pages/.../...“ is not found

这个错误的意思是&#xff1a;识别不到该页面 错误可能一&#xff1a;没有在 pages.json【微信小程序是 app.json】中定义该页面的路径 如&#xff1a; pages.json "pages": [{"path": "pages/index/index" }, {"path": "pag…

为什么在有一些分类任务中,resnet50的准确率不如resnet18

在分类任务中&#xff0c;ResNet50和ResNet18是两种不同的卷积神经网络模型&#xff0c;它们的网络结构和参数量不同&#xff0c;因此可能导致它们在准确率上的表现有所差异。 网络深度&#xff1a;ResNet50相比于ResNet18更深&#xff0c;具有更多的层和更多的参数。更深的网络…

electron应用打包成功纪念一下

electron应用打包成功纪念一下&#xff0c;以前曾经行过后来打包各种报错&#xff0c;现在有空就尝试解决一下 首先安装nvm能够方便切换node版本 curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.34.0/install.sh | bash 顺利安装后你用nvm list查看node列表时会…

数据治理-数据管理成熟度评估

是什么&#xff1f; 能力成熟度评估是一种基于能力成熟度模型框架的能力提升方案&#xff0c;描述了数据管理能力初始状态发展到最优化的过程。 历史发展 能力成熟度评估概念源于美国国防部为评估软件承包商而建立的标准。20世纪80年代中期&#xff0c;卡内基梅隆大学软件工程…

c#垃圾回收(Garbage Collection)

在C#中&#xff0c;垃圾回收&#xff08;Garbage Collection&#xff09;是一种自动管理内存的机制。它负责跟踪和释放不再使用的内存&#xff0c;以便程序可以有效地使用内存资源。 C#中的垃圾回收器是由.NET运行时&#xff08;CLR&#xff09;提供和管理的。它使用了一种叫做…

Linux获取纳秒级别时间

在 Linux 系统中可以使用 gettimeofday 函数来获取时间&#xff0c;这个函数能够以毫秒的精度来获取时间 struct timeval tv;gettimeofday(&tv, NULL);time_t cur_time tv.tv_sec;long cur_time_ms tv.tv_usec/1000;printf(“cur_time %d \n”, cur_time);printf(“cur…

【八股】2023秋招八股复习笔记5(计算机网络-CN)

文章目录 八股目录目录1、应用层 & HTTP一些http题HTTPS 加密原理&#xff08;问过&#xff09;HTTP/1.1 新特性HTTP/2.0 与 RPC&#xff08;问过&#xff09;GET 和 POST 比较 2、传输层 & TCPTCP三次握手 & 四次挥手&#xff08;问过&#xff09;为什么每次TCP 连…

硬链接和软链接的区别?

分析&回答 硬链接 建立软链接和硬链接的语法 软链接&#xff1a;ln -s 源文件 目标文件硬链接&#xff1a;ln 源文件 目标文件源文件&#xff1a;即你要对谁建立链接 什么是软链接和硬链接 软链接可以理解成快捷方式。它和windows下的快捷方式的作用是一样的。硬链接等…

selenium鼠标操作方法

1.0 selenium新版本封装驱动 from selenium.webdriver import Chrome from selenium.webdriver.chrome.options import ChromiumOptions from selenium.webdriver.chrome.service import Servicedef get_chrome_driver(driver_path):chrome_options ChromiumOptions()chrome_…

密码算法、密钥体系---安全行业基础篇1

一、密码算法 密码算法是一种数学和计算方法&#xff0c;用于保护数据的机密性和安全性。不同的密码算法使用不同的数学原理和技术来加密和解密数据。以下是一些常见的密码算法类型&#xff1a; 1. **对称密码算法&#xff1a;** 特点&#xff1a;相同的密钥用于加密和解密数…

Java网络爬虫——jsoup快速上手,爬取京东数据。同时解决‘京东安全’防爬问题

文章目录 介绍jsoup使用1.解析url&#xff0c;获取前端代码2.解决京东安全界面跳转3.获取每一组的数据4.获取商品数据的具体信息4.最终代码 介绍 网络爬虫&#xff0c;就是在浏览器上&#xff0c;代替人类爬取数据&#xff0c;Java网络爬虫就是通过Java编写爬虫代码&#xff0…

Django学习笔记-AcApp端授权AcWing一键登录

笔记内容转载自 AcWing 的 Django 框架课讲义&#xff0c;课程链接&#xff1a;AcWing Django 框架课。 AcApp 端使用 AcWing 一键授权登录的流程与之前网页端的流程一样&#xff0c;只有申请授权码这一步有一点细微的差别&#xff1a; 我们在打开 AcApp 应用之后会自动向 AcW…

com.squareup.okhttp3:okhttp 组件安全漏洞及健康度分析

组件简介 维护者square组织许可证类型Apache License 2.0首次发布2016 年 1 月 2 日最新发布时间2023 年 4 月 23 日GitHub Star44403GitHub Fork9197依赖包5,582依赖存储库77,217 com.squareup.okhttp3:okhttp 一个开源的 HTTP 客户端库&#xff0c;可以用于 Android 和 Jav…

MySQL 使用开源审计插件

文章目录 前言1. 审计插件下载2. 审计插件参数2.1 server_audit_events2.2 server_audit_excl_users2.3 server_audit_output_type2.4 server_audit_file_path2.5 server_audit_file_rotate_now2.6 server_audit_file_rotate_size2.7 server_audit_file_rotations2.8 server_au…