高可用环境kafka消息未按顺序消费问题

目录

1、背景

2、问题排查

3、问题解决


1、背景

质检任务是异步执行,正常情况下任务状态扭转是    等待中》运行中》成功(失败)。在质量平台生成任务实例,此时状态是等待中,生成实例之后把具体的任务sql给到大数据平台执行,大数据平台会发运行中、成功、失败状态的kafka消息,正常情况下状态是顺序下发。

升级部署某个项目,生产环境突然出现很多任务,一直是运行中状态。

2、问题排查

(1)怀疑大数据平台,任务没有正常执行完成,所以任务一直是运行中

1、在yarn平台以及数据库中,都没有发现正在运行中的质量sql任务

2、排查质量平台服务器日志(kafka消息打印接收消息日志很必要,出问题利于排查),发现这个某个sqlId,正常返回了kafka消息,包括运行中、成功、失败等消息。

通过上面的排查,大数据平台没问题,正常执行了任务,正常按顺序给质量平台发了kafka消息

(2)排查质量平台处理kafka消息逻辑

kafka按顺序返回了状态,质量平台没按顺序消费,看质量平台代码如下。

@Slf4j
@Component("dsjAdapterListen")
public class DsjAdapterListen {

    @Autowired
    ZyslCljgService zyslCljgService;

    /**
     * kafka消费消息,需要配置kafka
     */
    @KafkaListener(groupId = "${spring.kafka.consumer.group-id:dquality}", topics =
            {"status_dquality_" + CommonConstant.ZYJH_CHANNEL})
    public void topicConsumer(String message) {
        StatusInfo statusInfo = JSON.parseObject(message, StatusInfo.class);
        log.warn("==============>KafkaListener:start作业实例结果处理,sqlId:{},zyslId:{},状态:{}", statusInfo.getSqlId(),
                statusInfo.getTaskId(), statusInfo.getStatus());
        zyslCljgService.procZyslJobData(statusInfo);
    }
}

@Slf4j
@Component("zyslCljgServiceImpl")
public class ZyslCljgServiceImpl implements ZyslCljgService {

    @Override
    @Async("zyslClThreadPool")
    public void procZyslJobData(StatusInfo statusInfo) {
        try {
            String zyslId = statusInfo.getTaskId();
            String sqlId = statusInfo.getSqlId();
          -dosomething();
            //运行中任务把检核状态更新成运行中
            if (StatusEnum.RUNNING.getCode().equals(statusInfo.getStatus().getCode())) {
                if (ZyslYxztEnum.WAITING_SUBMIT.getBm().equals(oldGxZysl.getYxzt())
               || ZyslYxztEnum.WAITING.getBm().equals(oldGxZysl.getYxzt())) {
                  //运行中
                oldGxZysl.setYxzt(ZyslYxztEnum.RUNNING.getBm());


                    gxZyslMapper.updateById(oldGxZysl);
                }
                //运行中状态
               oldGxZyrzUpdate.setYxzt(ZyslYxztEnum.RUNNING.getBm());
                gxZyrzMapper.updateById(oldGxZyrzUpdate);
                return;
            }
            if (StatusEnum.FAILED.getCode().equals(statusInfo.getStatus().getCode())) {
                log.error("大数据job错误,执行任务失败:{},zyslid:{},参数:{}", statusInfo.getMessage(), zyslId,
                        JSON.toJSONString(statusInfo));
                //处理失败
                procFail(statusInfo, oldGxZyrzUpdate);
            }
            if (StatusEnum.FINISH.getCode().equals(statusInfo.getStatus().getCode())) {
                //正常sql和异常sql都执行完成
                oldGxZyrzUpdate.setYxzt(ZyslYxztEnum.SUCCESS.getBm());
            }
            // 先更新状态,后处理事件
             dosomething2();
        } catch (Exception e) {
            log.error("大数据作业实例结果处理报错:{}", e.getMessage());
        }
    }
    
     public void  dosomething2(GxZyrz gxZyrz, SsZyxx zyxx) {
        if (Constants.CODE_SUCCESS.equals(gxZyrz.getYxzt())) {
              //成功状态查询es、发邮件等
             dosomething3();
        } else if (Constants.CODE_FAILED.equals(gxZyrz.getYxzt())) {
            gxZyrz.setYcs(0L);
            gxZyrz.setZyl(0L);
            gxZyrz.setSfgj("N");
        }
        gxZyrzMapper.updateById(gxZyrz);
    }

}

(1)kafka这个topic只有一个分区,数据质量服务器kafka消息是设置了消费者组,即使是高可用,现场部署多台服务器,也会只有一台服务器会消费这个topic的消息数据,所有排除是因为部署了多台服务器的原因。

(2)kafka消息被多线程异步处理了

1、如果任务sql执行成功,kafka返回运行中、执行成功消息

线程1 - 处理待运行任务

线程2- 处理成功状态

通过代码分析,线程1更新任务之前会更新另外一张表状态,假如gxZyslMapper.updateById是2秒时间,   线程2更新状态之前会dosomething3()查询es、查询数据表、发邮件,肯定超过5秒,然后更新sql任务状态。

结论: 如果sql是执行成功,这种情况,应该不会出现线程2先把任务状态更新成成功,然后线程1把状态更新是运行中。

2、如果任务sql执行失败,kafka返回运行中、执行失败消息

线程1 - 处理待运行任务

线程2- 处理失败状态

通过代码分析,线程1更新任务之前会更新另外一张表状态,假如update是2秒时间,   线程2直接更新sql任务状态。

结论: 如果sql是失败成功,这种情况,如果运行中、运行失败状态消息时间建个在2秒内,应该会出现线程2先把任务状态更新成失败,然后线程1把状态更新是运行中。

代码分析之后,带着结论去现场验证,发现确实是失败状态任务状态被逆写了。

按这个结论,按理来讲部署的所有现场都会出现问题,为什么只有这个现场有问题呢?

大数据那边升级了代码,以前执行失败的任务,运行中和运行失败,他们发消息间隔至少耗时在5秒,改了逻辑之后直接失败的任务,发信息间隔在2秒内。这就验证了这个问题

3、问题解决

1、运行中状态,先更新sqlId对应的任务状态,然后更新别的数据表状态;2、更新运行中的状态不直接更新,带着状态更新    update zyrz set yxzt = '2' where id = 'xxx' and yxzt not in('0','1')

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/1644.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Git 原理及使用 (带动图演示)

文章目录 🌈 Ⅰ Git 安装🌙 01. Linux - centos 🌈 Ⅱ Git 工作区、暂存区和版本库🌙 01. 认识工作区、暂存区和版本库🌙 02. 使用 Git 管理工作区的文件 🌈 Ⅲ Git 基本操作🌙 01. 创建本地仓库…

动态Web项目讲解+Demo

web流程演示 请求路径 请求路径明确要请求的是哪个servlet 请求方式 servlet含有两种请求方式:doGet和doPost doGet&doPost 返回数据就是httpResponse,返回给success 参数 包含在request当中 成功 上述流程任何一步都没出问题,就会…

SpringBoot+layuimini实现左侧菜单动态展示

layuimini左侧菜单动态显示 首先我们看一下layuimini的原有菜单显示格式 {"homeInfo": {"title": "首页","href": "page/welcome-2.html?t2"},"logoInfo": {"title": "LAYUI MINI","…

Thinkphp5+mysql批量筛选varchar字段默认值为null的数据

荆轲刺秦王 sql server数据库转mysql之后,遇到: CREATE TABLE q_bk_date (daid int(11) NOT NULL AUTO_INCREMENT,partno varchar(200) CHARACTER SET utf8 NOT NULL DEFAULT ,Bdate date DEFAULT NULL,bkno varchar(25) CHARACTER SET utf8 NOT NULL DEFAULT ,bvar varchar(…

ts中函数形状有几种定义方式

在TypeScript(TS)中,函数形状(即函数的类型)可以通过多种方式定义。以下是一些主要的定义方式: 类型别名定义函数形状: 使用 type 关键字为函数定义类型别名。 type MyFunction (a: number, …

cv2技术原理-图像旋转原理及手动实现

cv2技术原理-图像旋转原理及手动实现 1、图像旋转opencv实现2、cv2.getRotationMatrix2D函数解释3、数学原理推导旋转矩阵M4、手动计算旋转矩阵M5、旋转矩阵M的使用6、使用旋转矩阵M手动实现旋转功能 1、图像旋转opencv实现 图像旋转在对数据集数据增强(主要是随机…

Java语言——封装

一.封装的定义 在面向对象程式设计方法中,封装(英语:Encapsulation)是指一种将抽象性函式接口的实现细节部分包装、隐藏起来的方法。 封装可以被认为是一个保护屏障,防止该类的代码和数据被外部类定义的代码随机访问…

C++ //练习 12.31 如果用vector代替set保存行号,会有什么差别?哪种方法更好?为什么?

C Primer(第5版) 练习 12.31 练习 12.31 如果用vector代替set保存行号,会有什么差别?哪种方法更好?为什么? 环境:Linux Ubuntu(云服务器) 工具:vim 解释 …

车机电源管理设计

电源电压 汽车正常电压是12 V,但整车厂会要求在9V~16V这个范围内所有零部件必须能够正常工作。 在启动时,电池电压会突降,特别天气寒冷的时候,电压可能会瞬间降到6V左右。 当汽车电池严重亏电而无法启动时,可能会用…

linux|将用户加入白名单

一 用root用户找到etc\sudoers文件 cd etc 二 修改etc\sudoers 文件的权限 默认是只读的 修改为可写的 chmod uw sudoers 三 打开 sudoers文件,在Allow root to run any commands anywhere 后面 添加一条(把上面的一条内容复制下来 修改用户名即…

什么是程控电源?以及程控电源的工作原理与应用。

一、程控电源的简介: 程控电源是一种具有编程功能的电源设备,它可以通过外部控制来设定输出电压、电流的稳压、稳流或稳压/稳流模式,因此可以进行电压、电流、相位、频率、功率等参数的试验和检定。一些具体的产品特性包括微机控制、高精度、…

python实现假设检验-t检验

一. 什么是t检验 设总体 X ∼ N ( μ , δ 2 ) X\sim N(\mu,\delta^2) X∼N(μ,δ2),其中 μ , δ 2 \mu, \delta^2 μ,δ2未知,统计量 t X ‾ − μ S / n t \frac{\overline{X} - \mu}{S/\sqrt{n}} tS/n ​X−μ​服从标准正太分布,可以…

表达式求值(后缀表达式)(数据结构)

一、概念 算术表达式是由操作数(运算数)、运算符(操作符)、和界线符(括号)三部分组成,在计算机中进行算术表达式的计算是通过堆栈来实现的。 二后缀表达式的逻辑和实现方式(逆波兰…

电商平台数据有哪些?如何进行电商平台数据分析?(内附模板及工具)

在电商日常的贩卖工作中会产生大量的数据,如果你还不知道如何利用这些宝贵的数据指导未来的销售策略、增长销售额的话,就和我一起看下去吧!电商数据采集API接口包含哪些数据? 电商平台数据可以大致分为以下几个组成部分&#xff…

C#:直接调用 OpenFileDialog

C# 直接调用 OpenFileDialog,打开文件夹,选择视频文件,并播放。 编写 openvideo.cs 如下 // open a video file using System; using System.Diagnostics; using System.Windows.Forms;public class OpenVideoFile {[STAThread]public st…

Java中的封装

package day32; ​ public class Person {private String name;private int age; ​public String getName() {return name;} ​public void setName(String name) {this.name name;} ​public int getAge() {return age;} ​public void setAge(int age) {if (age>120 || …

蚓链数字化营销系统与数字资产的关系

蚓链数字化营销系统是一种利用数字技术来实现营销目标的系统。它集成了多种数字营销工具和渠道,以收集、分析和利用客户数据,优化营销活动,并提高营销效果。 数字资产是一种新型的资产类别,它们以电子数据的形式存在,可…

笔试狂刷--Day3

大家好,我是LvZi,今天带来笔试狂刷--Day3 一.牛牛的快递 1.题目链接:牛牛的快递 2.分析: 简单的模拟 3.代码实现: import java.util.Scanner;// 注意类名必须为 Main, 不要有任何 package xxx 信息 public class Main {public static void main(String[] args) {Scanner i…

计算机经典黑皮书分享

计算机经典黑皮书是一套计算机科学丛书,其中包含了多本计算机科学领域的经典教材 提供了全面的知识体系:黑皮书涵盖了计算机科学的多个领域,如计算机组成与设计、操作系统、数据库、人工智能等。它们深入浅出地介绍了相关领域的基本概念、原…

HTTP/HTTPS详解

HTTP/HTTPS详解 1. HTTP1.1 HTTP基础知识1.2 HTTP建立和断开连接 2. HTTPS 1. HTTP 1.1 HTTP基础知识 HTTP是互联网上应用最为广泛的一种网络协议,是一个客户端和服务器端请求和应答的标准(TCP),用 于从WWW服务器传输超文本到本…