生产者和消费者的底层类

        无论是生产者还是消费者,在底层都要和Broker打交道,进行消息收发。在源码层面,底层的功能被抽象成同一个类,负责和Broker打交道,下面详细介绍这个类的情况。

1 MQClientInstance类的创建规则

MQClientInstance是客户端各种类型的Consumer和Producer的底层类。这个类首先从NameServer获取并保存各种配置信息,比如Topic的Route信息。同时MQClientInstance还会通过MQClientAPIImpl类实现消息的收发,也就是从Broker获取消息或者发送消息到Broker。

既然MQClientInstance实现的是底层通信功能和获取并保存元数据的功能,就没必要每个Consumer或Producer都创建一个对象,一个MQClientInstance对象可以被多个Consumer或Producer公用。RocketMQ通过一个工厂类达到共用MQClientInstance的目的。MQClientInstance的创建如代码清单11-12所示。

代码清单11-12 创建MQClientInstance

MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
 

注意,MQClientInstance是通过工厂类被创建的,并不是一个单例模式,有些情况下需要创建多个实例。首先来看看MQClientInstance的创建规则,如代码清单11-13所示。

代码清单11-13 MQClientInstance创建规则

public MQClientInstance getAndCreateMQClientInstance(
    final ClientConfig clientConfig, RPCHook rpcHook) {
    String clientId = clientConfig.buildMQClientId();
    MQClientInstance instance = this.factoryTable.get(clientId);
    if (null == instance) {
        instance =
            new MQClientInstance(clientConfig.cloneClientConfig(),
                this.factoryIndexGenerator.getAndIncrement(), clientId,
                rpcHook);
        MQClientInstance prev = this.factoryTable.putIfAbsent(clientId,
            instance);
        if (prev != null) {
            instance = prev;
            log.warn("Returned Previous MQClientInstance for " +
                "clientId:[{}]", clientId);
        } else {
            log.info("Created new MQClientInstance for clientId:[{}]",
                clientId);
        }
    }
    return instance;
}

 

系统中维护了ConcurrentMap<String/*clientId*/,MQClientInstance>factoryTable这个Map对象,每创建一个新的MQClientInstance,都会以clientId作为Key放入Map结构中。clientId的格式是“clientIp”+@+“InstanceName”,其中clientIp是客户端机器的IP地址,一般不会变,instancename有默认值,也可以被手动设置。

普通情况下,一个用到RocketMQ客户端的Java程序,或者说一个JVM进程只要有一个MQClientInstance实例就够了。这时候创建一个或多个Consumer或者Producer,底层使用的是同一个MQClientInstance实例。

在quick start文档中创建一个DefaultMQPushConsumer来接收消息,没有设置这个Consumer的InstanceName参数(通过setInstanceName函数进行设置),这个时候InstanceName的值是默认的“DEFAULT”。实际创建的MQClientInstance个数由设定的逻辑进行控制。InstanceName的生成逻辑如代码清单11-14所示。

代码清单11-14 InstanceName生成逻辑

if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
    this.defaultMQPushConsumer.changeInstanceNameToPID();
}
public void changeInstanceNameToPID() {
    if (this.instanceName.equals("DEFAULT")) {
        this.instanceName = String.valueOf(UtilAll.getPid());
    }
}

 

从InstanceName的创建逻辑就可以看出,如果创建Consumer或者Producer类型的时候不手动指定InstanceName,进程中只会有一个MQClientInstance对象。

有些情况下只有一个MQClientInstance对象是不够的,比如一个Java程序需要连接两个RoceketMQ集群,从一个集群读取消息,发送到另一个集群,一个MQClientInstance对象无法支持这种场景。这种情况下一定要手动指定不同的InstanceName,底层会创建两个MQClientInstance对象。

2 MQClientInstance类的功能

首先来看一下MQClientInstance类的Start函数,从Start函数中的逻辑能大致了解MQClientInstance类的功能,如代码清单11-15所示。

代码清单11-15 MQClientInstance类Start函数

public void start() throws MQClientException {
    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // Start request-response channel
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                this.startScheduledTask();
                // Start pull service
                this.pullMessageService.start();
                // Start rebalance service
                this.rebalanceService.start();
                // Start push service
                this.defaultMQProducer.getDefaultMQProducerImpl().start (false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
                break;
            case SHUTDOWN_ALREADY:
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

 

Start函数中的MQClientAPIImpl对象用来负责底层消息通信,然后启动pullMessageService和rebalanceService。在类的成员变量中,用topicRouteTable、brokerAddrTable等来存储从NameServer中获得的集群状态信息,并通过一个ScheduledTask来维护这些信息。MQClientInstance中定时执行的任务如代码清单11-16所示。

代码清单11-16 MQClientInstance中定时执行的任务

private void startScheduledTask() {
    if (null == this.clientConfig.getNamesrvAddr()) {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.mQClientAPIImpl
                        .fetchNameServerAddr();
                } catch (Exception e) {
                    log.error("ScheduledTask fetchNameServerAddr " +
                        "exception", e);
                }
            }
        }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
    }
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                MQClientInstance.this.updateTopicRouteInfoFromNameServer();
            } catch (Exception e) {
                log.error("ScheduledTask " +
                    "updateTopicRouteInfoFromNameServer exception", e);
            }
        }
    }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit
        .MILLISECONDS);
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                MQClientInstance.this.cleanOfflineBroker();
                MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
            } catch (Exception e) {
                log.error("ScheduledTask sendHeartbeatToAllBroker " +
                    "exception", e);
            }
        }
    }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit
        .MILLISECONDS);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset " +
                    "exception", e);
            }
        }
    }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(),
        TimeUnit.MILLISECONDS);
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                MQClientInstance.this.adjustThreadPool();
            } catch (Exception e) {
                log.error("ScheduledTask adjustThreadPool exception", e);
            }
        }
    }, 1, 1, TimeUnit.MINUTES);
}

 

从代码中可以看出,MQClientInstance会定时进行如下几个操作:获取NameServer地址、更新TopicRoute信息、清理离线的Broker和保存消费者的Offset。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/175816.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java 简易版王者荣耀

所有包和类 GameFrame类 package newKingOfHonor;import java.awt.*; import java.awt.event.ActionEvent; import java.awt.event.ActionListener; import java.awt.event.KeyAdapter; import java.awt.event.KeyEvent; import java.io.File; import java.util.ArrayList;im…

视频没有字幕怎么办,怎么给视频增加字幕

文章目录 视频没有字幕怎么办&#xff0c;怎么给视频增加字幕前言软件准备制作字幕1. 导入视频2. 将视频拖拽到轨道3. 生成字幕4. 导出字幕 字幕实时翻译1. 播放视频2. 显示字幕设置3. 双语字幕显示 总结 视频没有字幕怎么办&#xff0c;怎么给视频增加字幕 前言 有时候下载的…

云原生系列Go语言篇-泛型Part 2

类型推导和泛型 就像在使用​​:​​时支持类型推导一样&#xff0c;在调用泛型函数时Go同样支持类型推导。可在上面对​​Map​​、​​Filter​​和​​Reduce​​调用中看出。有些场景无法进行类型推导&#xff08;如类型参数仅用作返回值&#xff09;。这时&#xff0c;必…

血的教训------入侵redis之利用python来破解redis密码

血的教训------入侵redis之利用python来破解redis密码 利用强大的python来进行redis的密码破解&#xff0c;过程不亦乐乎&#xff0c;当然也可以用shell脚本 本篇文章只供学习交流&#xff0c;请勿他用&#xff0c;谢谢。 其他相关联的文章 [1]VMware安装部署kail镜像服务器【…

ESP32-Web-Server编程-JS 基础 2

ESP32-Web-Server编程-JS 基础 2 概述 上节介绍了 JS 编程的基础。如前所述&#xff0c;在 HTML 中&#xff0c;可以通过下述 两种方式使用 JS 程序&#xff1a; 直接在 HTML 文件中通过 script 标签中嵌入 JavaScript 代码。通过 src 元素引入外部的 JavaScript 文件。 在…

C#-创建用于测试的父类StartupBase用于服务注入

当写完C#代码&#xff0c;需要对某个方法进行测试。 创建一个XXXTests.cs文件之后&#xff0c;发现需要注入某个服务怎么办&#xff1f; 再创建一个StartupBase.cs文件&#xff1a; public abstract class StartupBase {public IConfiguration Configuration { get; }public …

西南科技大学电路分析基础实验A1(一阶电路的设计)

目录 一、实验目的 二、实验设备 三、预习内容(如:基本原理、电路图、计算值等) 四、实验数据及结果分析(预习写必要实验步骤和表格) 1. 观测一阶电

【香橙派】实战记录2——烧录安卓镜像及基本功能

文章目录 一、安卓烧录二、安卓基本功能1、蓝牙2、相机功能3、投屏 一、安卓烧录 检查环境&#xff1a;检查PC系统&#xff0c;确保有Microsoft Visual C 2008 Redistrbutable - x86&#xff0c;否则在官网下载的官方工具 - 安卓镜像烧录工具里运行vcredist_x86.exe。 插入存储…

spark学习一-------------------Spark算子最详细介绍

Spark学习–spark算子介绍 1.基本概念 spark算子&#xff1a;为了提供方便的数据处理和计算&#xff0c;spark提供了一系列的算子来进行数据处理。一般算子分为action&#xff08;执行算子&#xff09;算子Transformation&#xff08;懒执行&#xff09;算子。2.Transformatio…

鞋厂ERP怎么样?工厂要如何选项契合的ERP

鞋帽这类商品是我们的生活必需品&#xff0c;存在款式多、尺码多、用料复杂、营销渠道多、销售策略和价格策略灵活等情况&#xff0c;伴随电商等行业的发展&#xff0c;鞋帽行业的管理模式也在发生变化。 鞋厂规模的不同&#xff0c;遇到的管理问题各异&#xff0c;而如何解决…

十分钟搭建VScode C/C++运行环境

一、下载配置vscode 1.下载安装VScode 地址&#xff1a;https://code.visualstudio.com/download 下载后&#xff0c;运行安装程序 (VSCodeUserSetup-{version}.exe)。这只需要一分钟。安装程序会将 Visual Studio Code 添加到环境变量中%&#xff0c;可以使用CMD键入“code”…

Dockerfile语法和指令

简介 Dockerfile是由一系列指令和参数构成的脚本&#xff0c;一个Dockerfile里面包含了构建整个镜像的完整命令。通过docker build执行Dockerfile中一系列指令自动构建镜像。 常用指令 FROM&#xff1a;基础镜像&#xff0c;FROM领了必须是Dockerfile的首个命令。 LABEL&…

2023-2024-1-高级语言程序设计-字符数组

7-1 凯撒密码 为了防止信息被别人轻易窃取&#xff0c;需要把电码明文通过加密方式变换成为密文。输入一个以回车符为结束标志的字符串&#xff08;少于80个字符&#xff09;&#xff0c;再输入一个整数offset&#xff0c;用凯撒密码将其加密后输出。恺撒密码是一种简单的替换…

性价比高的照明品牌,考研考公必备护眼台灯推荐

据国家卫生健康委员会发布的调查数据显示,我国青少年儿童总体近视率为52.7%、高度近视人口超3000万。儿童是民族的未来和希望,青少年儿童眼健康问题更是牵动着每一个人的神经。遗传、双眼视功能不正常、用眼负荷过重等因素都是造成青少年近视的原因,其中,大量的电子产品侵入以及…

Flask Session 登录认证模块

Flask 框架提供了强大的 Session 模块组件&#xff0c;为 Web 应用实现用户注册与登录系统提供了方便的机制。结合 Flask-WTF 表单组件&#xff0c;我们能够轻松地设计出用户友好且具备美观界面的注册和登录页面&#xff0c;使这一功能能够直接应用到我们的项目中。本文将深入探…

动态网页从数据库取信息,然后展示。

把数据库的驱动放在bin目录下。 通过servlet 读取数据库的内容&#xff0c;生成session,然后跨页面传给展示页。 package src;import java.io.IOException; import java.io.PrintWriter; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSe…

大数据-之LibrA数据库系统告警处理(ALM-37008 MPPDB服务不可用)

告警解释 告警模块每30秒周期性检测MPPDB服务健康状态&#xff0c;当检测到MPPDB健康状态为“故障”时产生告警。 当检测到MPPDB健康状态为“良好”时告警恢复。 告警属性 告警ID 告警级别 可自动清除 37008 致命 是 告警参数 参数名称 参数含义 ServiceName 产生…

HJ92 在字符串中找出连续最长的数字串

题目&#xff1a; HJ92 在字符串中找出连续最长的数字串 题解&#xff1a; 找到第一个数字从第一个数字开始往后遍历&#xff0c;每走一步判断当前是否为数字&#xff0c;是数字就累加cnt如果当前位置不是数字&#xff0c;证明连续数字串已经断开&#xff0c;此时需要记录最…

ROC及曲线面积汇总学习

目录 ROC基础 生成模拟数据 率的计算 R语言计算测试 ROCR&#xff1a; pROC ROC绘制 单个ROC 两个ROC Logistic回归的ROC曲线 timeROC ROC基础 ROC曲线的横坐标是假阳性率&#xff0c;纵坐标是真阳性率&#xff0c;需要的结果是这个率表示疾病阳性的率&#xff08;…

QT基础开发笔记

用VS 写QT &#xff0c;设置exe图标的方法&#xff1a; 选定工程--》右键--》添加---》资源--》 QString 字符串用法总结说明 Qt QString 增、删、改、查、格式化等常用方法总结_qstring 格式化-CSDN博客 总结来说&#xff1a; QString 的 remove有两种用法&#xff0c;&am…