使用kafka改造分布式事务

文章目录

  • 1、kafka确保消息不丢失?
    • 1.1、生产者端确保消息不丢失
    • 1.2、kafka服务端确保消息不丢失
    • 1.3、消费者确保正确无误的消费
  • 2、生产者发送消息 KafkaService
  • 3、UserInfoServiceImpl -> login()
  • 4、service-account - > AccountListener.java

1、kafka确保消息不丢失?

1.1、生产者端确保消息不丢失

  1. 发送模式:发后即忘、同步阻塞确认、异步非阻塞确认
  2. 生产者acks模式:props.put(“acks”, “all”)、acks: all(-1)
  3. 配置重试:props.put(“retries”, 3)、retries: 3

1.2、kafka服务端确保消息不丢失

  1. kafka是文件型的消息中间件,不会单纯的因为服务器宕机导致消息丢失
  2. 消息的log日志文件损坏:搭建kafka集群(副本)

1.3、消费者确保正确无误的消费

  1. 偏移量提交
     自动提交:enable-auto-commit: true
     手动提交:ack-mode: manual_immediate:同步提交 异步提交(推荐)
  2. 偏移量重置:
     auto-offset-reset: earliest -> 如果有偏移量则继续消费,如果偏移量没了,从头重新进行消费,可能会存在幂等性问题
     auto-offset-reset: latest -> 如果有偏移量则继续消费,如果偏移量不存在,只消费新消息,旧消息没消费完就丢掉了
     auto-offset-reset: none -> 如果有偏移量则继续消费,如果偏移量不存在,抛出异常
  3. 消费者重试:重试主题和死信主题, @RetryableTopic()

2、生产者发送消息 KafkaService

package com.atguigu.tingshu.common.service;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;import java.util.concurrent.CompletableFuture;@Service
public class KafkaService {private static final Logger logger = LoggerFactory.getLogger(KafkaService.class);@Autowiredprivate KafkaTemplate kafkaTemplate;/*** 向指定主题发送消息* 此方法通过调用重载的sendMsg方法,向指定主题发送消息,使用默认的消息标签和消息键** @param topic 发送消息的主题* @param msg   需要发送的消息内容*/public void sendMsg(String topic, String msg){// 调用重载的sendMsg方法,传入默认值以简化调用this.sendMsg(topic, null, null, msg);}/*** 发送消息到指定的Kafka主题** @param topic 消息主题* @param partition 分区编号* @param key 消息键值* @param msg 消息内容*/public void sendMsg(String topic, Integer partition, String key, String msg){// 发生消息并返回异步结果CompletableFuture<SendResult> future = this.kafkaTemplate.send(topic, partition, key, msg);// 异步处理发送结果future.whenCompleteAsync((result, ex) -> {if (ex != null){// 如果发送过程中出现异常logger.error("生产者发送消息失败!原因:{}", ex.getMessage());}});}}
  • whenCompleteAsync:异步完成时的处理、当异步操作完成时
    在这里插入图片描述

3、UserInfoServiceImpl -> login()

  • 此时 service-user 是生产者 发送消息

在这里插入图片描述

@Slf4j
@Service
@SuppressWarnings({"unchecked", "rawtypes"})
public class UserInfoServiceImpl extends ServiceImpl<UserInfoMapper, UserInfo> implements UserInfoService {@Autowiredprivate WxMaService wxMaService;@Autowiredprivate RedisTemplate redisTemplate;@Autowiredprivate UserAccountFeignClient userAccountFeignClient;@Autowiredprivate KafkaService kafkaService;/*** 根据微信返回的code进行用户登录* @param code 微信登录凭证* @return 返回包含登录令牌的Map对象*///@GlobalTransactional//@Transactional@Overridepublic Map<String, Object> login(String code) {// 创建一个HashMap对象用于存放返回的数据HashMap<String, Object> map = new HashMap<>();try {// 通过微信服务获取用户的会话信息WxMaJscode2SessionResult sessionInfo = this.wxMaService.getUserService().getSessionInfo(code);// 获取用户的openidString openid = sessionInfo.getOpenid();// 查询数据库中是否存在该openid对应的用户信息UserInfo userInfo = this.getOne(new LambdaQueryWrapper<UserInfo>().eq(UserInfo::getWxOpenId, openid));if (userInfo == null) {// 如果用户不存在,则创建一个新的UserInfo对象userInfo = new UserInfo();// 设置用户的openiduserInfo.setWxOpenId(openid);// 设置用户的昵称,其中包含一个随机生成的IDuserInfo.setNickname("这家伙太懒"+ IdWorker.getIdStr());// 设置用户的头像URLuserInfo.setAvatarUrl("https://img0.baidu.com/it/u=1633409170,3159960019&fm=253&fmt=auto&app=138&f=JPEG?w=500&h=500");// 保存用户信息到数据库this.save(userInfo);// 初始化用户账号信息//userAccountFeignClient.initAccount(userInfo.getId());this.kafkaService.sendMsg(KafkaConstant.QUEUE_USER_REGISTER,userInfo.getId().toString());//int i = 1 / 0;}// 生成一个随机的登录令牌String token = UUID.randomUUID().toString();// 创建一个UserInfoVo对象,用于存放用户信息UserInfoVo userInfoVo = new UserInfoVo();// 将UserInfo对象的属性复制到UserInfoVo对象中BeanUtils.copyProperties(userInfo, userInfoVo);// 将用户信息存储到Redis中,设置过期时间为30分钟this.redisTemplate.opsForValue().set(RedisConstant.USER_LOGIN_KEY_PREFIX + token, userInfoVo,RedisConstant.USER_LOGIN_KEY_TIMEOUT, TimeUnit.SECONDS);// 将生成的登录令牌放入Map对象中map.put("token", token);// 返回包含登录令牌的Map对象return map;} catch (WxErrorException e) {// 如果发生微信错误异常,抛出自定义的异常throw new GuiguException(ResultCodeEnum.LOGIN_AUTH);}}}

在这里插入图片描述

4、service-account - > AccountListener.java

  • 此时 service-account 是消费者 接收消息

在这里插入图片描述

@Slf4j
@Component
public class AccountListener {@Autowiredprivate UserAccountService userAccountService;@RetryableTopic(backoff = @Backoff(2000))@KafkaListener(topics = KafkaConstant.QUEUE_USER_REGISTER)public void listen(String userId, Acknowledgment ack){// 如果是空消息直接确认掉,后续不用再执行if (StringUtils.isBlank(userId)) {ack.acknowledge();return;}// 初始化账户this.userAccountService.saveAccount(Long.valueOf(userId));ack.acknowledge();// 手动确认}
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/877991.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

day31-测试之性能测试工具JMeter的功能概要、元件作用域和执行顺序

目录 一、JMeter的功能概要 1.1.文件目录介绍 1).bin目录 2).docs目录 3).printable_docs目录 4).lib目录 1.2.基本配置 1).汉化 2).主题修改 1.3.基本使用流程 二、JMeter元件作用域和执行顺序 2.1.名称解释 2.2.基本元件 2.3.元件作用域 1).核心 2).提示 3).作用域的原则 2.…

Redis 实现哨兵模式

目录 1 哨兵模式介绍 1.1 什么是哨兵模式 1.2 sentinel中的三个定时任务 2 配置哨兵 2.1 实验环境 2.2 实现哨兵的三条参数&#xff1a; 2.3 修改配置文件 2.3.1 MASTER 2.3.2 SLAVE 2.4 将 sentinel 进行备份 2.5 开启哨兵模式 2.6 故障模拟 3 在整个架构中可能会出现的问题 …

go中 panicrecoverdefer机制

go的defer机制-CSDN博客 常见panic场景 数组或切片越界&#xff0c;例如 s : make([]int, 3); fmt.Println(s[5]) 会引发 panic: runtime error: index out of range空指针调用&#xff0c;例如 var p *Person; fmt.Println(p.Name) 会引发 panic: runtime error: invalid m…

网络通信tcp

一、udp案例 二、基于tcp: tcp //c/s tcp 客户端: 1.建立连接 socket bind connect 2.通信过程 read write close tcp服务器: 1.建立连接 socket bind listen accept 2.通信过程 read write close connect函数 int connect(int sockfd, con…

Git克隆仓库太大导致拉不下来的解决方法 fatal: fetch-pack: invalid index-pack output

一般这种问题是因为某个文件/某个文件夹/某些文件夹过大导致整个项目超过1G了导致的 试过其他教程里的设置depth为1,也改过git的postBuffer,都不管用 最后还是靠克隆指定文件夹这种方式成功把项目拉下来 1. Git Bash 输入命令 git clone --filterblob:none --sparse 项目路径…

探索Unity3D URP后处理在UI控件Image上的应用

探索Unity3D URP后处理在UI控件Image上的应用 前言初识URP配置后处理效果将后处理应用于UI控件方法一&#xff1a;自定义Shader方法二&#xff1a;RenderTexture的使用 实践操作步骤一&#xff1a;创建RenderTexture步骤二&#xff1a;UI渲染至RenderTexture步骤三&#xff1a;…

视频如何转gif?分享这几款软件!

在这个快节奏、高创意的互联网时代&#xff0c;动图&#xff08;GIF&#xff09;以其独特的魅力成为了社交媒体、聊天软件中的宠儿。它们不仅能瞬间抓住眼球&#xff0c;还能让信息传递更加生动有趣。然而&#xff0c;你是否曾为如何将精彩瞬间从视频中精准截取并转换成GIF而苦…

​北斗终端:无人驾驶领域的导航新星

一、北斗终端在无人驾驶领域的应用 北斗终端&#xff0c;作为我国自主研发的北斗卫星导航系统的重要组成部分&#xff0c;其在无人驾驶领域中的应用正逐步显现其独特魅力。北斗系统的高精度、高可靠性和良好的抗干扰性能&#xff0c;为无人驾驶车辆提供了精确的定位和导航服务…

关于超长字符串/文本对应的数据从excel导入到PL/SQL中的尝试

问题&#xff1a; 1.字符串太长 2.str绑定之的结尾null缺失 将csv文件导入到PL/SQL表中存在的一些问题 1.本来我是需要将exceL上的几十条数据导入到PL/SQL数据库的一张表中&#xff0c;结果我花了许多时间 去导入。 想想一般情况下也就几十条数据&#xff0c;直接复制粘贴就…

C语言-有两个磁盘文件A和B,各存放一行字母,今要求把这两个文件的信息合并(按字母顺序排列),输出到一个新文件C中去-深度代码解析

&#x1f30f;个人博客&#xff1a;尹蓝锐的博客 1、题目要求 有两个磁盘文件A和B&#xff0c;各存放一行字母&#xff0c;今要求把这两个文件的信息合并&#xff08;按字母顺序排列&#xff09;&#xff0c;输出到一个新文件C中去 2、准备工作 问题1&#xff1a;为什么不需要…

chrome打印dom节点不显示节点信息

正常直接console dom节点 代码改成 var parser new DOMParser(); var docDom parser.parseFromString(testHtml, text/html); console.log(docDom) let htmlHeader ref< HTMLElement | null>(null) let htmlBoby ref< HTMLElement | null>(null) htmlHeader.v…

# 利刃出鞘_Tomcat 核心原理解析(九)-- Tomcat 安全

利刃出鞘_Tomcat 核心原理解析&#xff08;九&#xff09;-- Tomcat 安全 一、Tomcat专题 - Tomcat安全 - 配置安全 1、 删除 tomcat 的 webapps 目录下的所有文件&#xff0c;禁用 tomcat 管理界面. 如下目录均可删除&#xff1a; D:\java-test\apache-tomcat-8.5.42-wind…

深度学习入门-06

基于小土堆学习 如何把数据集和Transform结合袭来 https://pytorch.org/ 上述网址是pytorch的官网 这里会有详细的使用介绍 下述是对图像处理的专门文档 单击后可查看详细介绍 选择CIFAR10数据集 CIFAR10 数据集是一个广泛使用的计算机视觉数据集&#xff0c;包含了60000…

UV LED供电为什么要选择使用恒流驱动电源

LED为何一定要恒流供电? 在讨论此议题之前&#xff0c;什么是电源的恒流恒压&#xff1f; 什么是电源的恒流恒压   恒流&#xff0c;就是输出电流是恒定的&#xff0c;但电源电流却不是固定的&#xff0c;标称的电压只是安全上限&#xff1b;恒压&#xff0c;就是输出电压是…

力扣面试经典算法150题:跳跃游戏

跳跃游戏 今天的题目是力扣面试经典150题中的数组的中等难度题&#xff1a;跳跃游戏。 题目链接&#xff1a;https://leetcode.cn/problems/jump-game/description/?envTypestudy-plan-v2&envIdtop-interview-150 题目描述 给定一个非负整数数组 nums&#xff0c;你最初…

uniapp/uniapp x总结

uni-app组成和跨端原理 上图所诉 App的渲染引擎&#xff1a;同时提供了2套渲染引擎&#xff0c;.vue页面文件由webview渲染&#xff0c;原理与小程序相同&#xff1b;.nvue页面文件由原生渲染&#xff0c;原理与react native相同。开发者可以根据需要自主选择渲染引擎。 uniapp…

微信小程序uni :class不支持xxx语法

问题代码&#xff1a; <view class"cellTop"><view>{{list.payTime}}</view><view :class"payStatusClass${list.payStatus}">{{payStatusDe[list.payStatus]}}</view></view> .payStatusClass1{color: rgb(246, 122,…

鸿蒙HarmonyOS开发:创建新的Lite工程

当开始开发一个应用/服务时&#xff0c;首先需要根据工程创建向导&#xff0c;创建一个新的工程&#xff0c;工具会自动生成对应的代码和资源模板。 说明 在运行DevEco Studio工程时&#xff0c;建议每一个运行窗口有2GB以上的可用内存空间。 创建和配置新工程 DevEco Studio提…

【图文并茂】ant design pro 如何对接后端个人信息接口

上一节我们有讲到如何对接登录接口的 【图文并茂】ant design pro 如何对接登录接口 仅仅能登录是最基本的&#xff0c;但是我们要进入后台还是需要另一个接口。 这个接口有两个作用&#xff1a; 来获取当前登录账号的信息&#xff0c;比如头像&#xff0c;用户名&#xff0…

Springsecurity中的Eureka报错:Cannot execute request on any known server

完整报错信息&#xff1a; com.netflix.discovery.shared.transport.TransportException: Cannot execute request on any known server 报错体现&#xff1a; 访问eureka控制面板&#xff1a; 访问测试地址&#xff1a; 控制台报错&#xff1a; 可能的报错原因&#xff…