基于MQTT协议实现微服务架构事件总线

一、场景描述

昨天在博客《客户端订阅服务端事件的实现方法》中提出了利用websocket、服务端EventEmitter和客户端mitt实现客户端订阅服务端事件,大大简化了客户端对服务端数据实时响应的逻辑。上述方案适用于单服务节点的情形。

对于由服务集群支撑的微服务架构,websocket提供的点对点通信已无法满足前端订阅后端集群事件的需求,升级方案是使用基于消息总线的通信方式。

在这里插入图片描述

二、几种消息总线适用性比较

常用的消息总线包括Kafka、Redis和基于MQTT协议实现的EMQX。

Kafka和MQTT都是从发布/订阅系统演化而来,但发展侧重点不同。Kafka通过分布式架构提供了海量数据流的存储,并保证数据流顺序,它的设计目标是支持数据发布、订阅和存储。而MQTT用于网络中传输小型数据包,其设计目的是实现简单、可靠的设备间通信。

而Redis是从内存数据库系统演化而来,发布/订阅功能是把消息保存在内存中。与Kafka相比,其只能提供半持久化;与MQTT相比,其通信效率较低。

由于应用场景没有对消息持久化的需求,且考虑到产业大脑平台未来会接入工业互联网,使用MQTT协议来搭建事件总线更利于平台在工业互联网环境下的扩展。

三、MQTT简介

几年前,本人曾写过MQTT简介。本文摘抄其中重要概念。

(一)MQTT协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是基于“订阅/发布”模式的轻量级通信协议,该协议基于TCP/IP,能以极低的带宽为海量(百万级)跨域设备提供可靠的消息服务,因此在物联网、小型移动终端、边缘计算方面有广泛应用。
所谓可靠的消息传输,体现为可配置消息的服务质量(QoS),有三种服务质量可选:

  • 至多一次:
    消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。应用场景如环境传感器的数据采集,丢失一次记录无所谓,因为不久后还会有第二次发送。
  • 至少一次:
    确保消息送达订阅者,但消息可能重复,适用于幂等性操作。
  • 只有一次:
    最严格的消息服务质量,确保消息到达且仅到达一次订阅者。应用场景如计费系统等。

MQTT协议中存在三种身份:消息总线(Broker)、发布者(Publish)和订阅者(Subscribe),其中消息总线属于服务器,后两者都属于客户端。发布者和订阅者可以是各种物联网设备和小型终端,消息发布者可以同时也是消息订阅者,如下图所示。

MQTT.png

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

  • Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
  • payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

订阅消息时,可以在订阅表达式中使用通配符筛选器对主题进行筛选,可同时订阅所匹配的多个主题。
MQTT协议中主要有以下5个方法:

  • connect:客户端建立与服务器的连接
  • disconnect:等待客户端完成工作后,端口与总线的会话
  • subscribe:客户端向消息总线注册订阅主题
  • unsubscribe:客户端等待消息总线取消所注册的订阅
  • publish:客户端向消息总线发送某主题的消息

(二)开源消息总线EMQX

EMQX(Erlang/Enterprise/Elastic MQTT Broker),是基于Erlang语言开发的开源物联网MQTT消息总线。其是一款由前华为员工开发的开源软件,软件主页为https://www.emqx.io/。可根据操作系统类别选择不同版本下载安装,或通过docker部署。

软件安装后,通过 emqx start以后台方式启动。启动后将会开放两个端口:

  • 18083端口为控制台端口,可通过浏览器访问该端口,首次登录的用户名和密码为admin和public。控制台提供了总线监控、用户权限管理、在线客户端订阅/发布等功能。
  • 8083端口为通信端口,MQTT客户端可通过该端口与EMQX消息总线通信。

(三)MQTT.js客户端

MQTT.js是MQTT客户端Nodejs SDK,可在浏览器(ES模块)和Node.js环境(CommonJS模块)下使用,前者可通过MQTT over WebSocket使用,后者既可以通过MQTT over WebSocket使用,也可以直接使用MQTT。区别仅仅是连接参数的协议头不同。

1. 安装和帮助文件

$ pnpm i mqtt -S #安装
$ npx mqtt help  #帮助
MQTT.js command line interface, available commands are:* publish     publish a message to the broker* subscribe   subscribe for updates from the broker* version     the current MQTT.js version* help        help about commandsLaunch 'mqtt help [command]' to know more about the commands.

2. 使用方法

// const mqtt = require('mqtt') //ES模块
import mqtt from 'mqtt'  //CommonJS模块// 连接选项
const options = {clean: true, // true: 清除会话, false: 保留会话connectTimeout: 4000, // 超时时间// 认证信息clientId: 'user_id', // 要保证唯一性// 若在控制台配置了用户名和密码:// username: 'xxx',// password: 'xxx',
}// 连接字符串, 通过协议指定使用的连接方式
// ws 未加密 WebSocket 连接
// wss 加密 WebSocket 连接
// mqtt 未加密 TCP 连接
// mqtts 加密 TCP 连接
// wxs 微信小程序连接
// alis 支付宝小程序连接
const connectUrl = 'ws://localhost:8084/mqtt'
const client = mqtt.connect(connectUrl, options)client.on('reconnect', error => {console.error('正在重连:', error)
})client.on('error', error => {console.error('连接失败:', error)
})//收到消息
client.on('message', (topic, message) => {console.log('收到消息:', topic, message.toString()) //message是二进制流,需要转换成字符串
})//订阅主题
const topic='/user_id/#'
const qos=0  //0:最多交付1次;1:至少交付1次;2:只交付1次
client.subscribe(topic, qos, error=>{  //订阅user_id主题下所有消息if(error){console.error('订阅主题失败:', error)return}console.log('订阅成功')
})//发布消息
client.publish('user_id/a',JSON.stringify({a:123}),qos,error=>{if(error){console.error('发布消息失败:', error)}
})//取消订阅
client.unsubscrib(topic, qos, error=>{if(error){console.error('取消订阅失败', error)return}
})//断开连接
if(client.connected){try{client.end(false,()=>{console.log('成功断开连接')})catch(error){console.error('断开连接失败:', error)}}
}

(四)安全性

1. 排它订阅

排它订阅是 EMQX 支持的 MQTT 扩展功能。排它订阅允许对主题进行互斥订阅,一个主题同一时刻仅被允许存在一个订阅者,在当前订阅者未取消订阅前,其他订阅者都将无法订阅对应主题。

2. JWT认证

系统整体采用JWT认证方式,通过一台认证服务器颁发JWT Token。MQTT客户端访问EMQX总线时,携带由认证服务器颁发的JWT Token。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/714381.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis 之七:穿透、击穿、雪崩

(本内容部分来自知乎网等网络) Redis 缓存的使用,极大的提升了应用程序的性能和效率,特别是数据查询方面。但同时,它也带来了一些问题。其中,最要害的问题,就是数据的一致性问题,从严…

Educational Codeforces Round 132 (Rated for Div. 2) E. XOR Tree(启发式合并+贪心)

题目 n(n<2e5)个点的树&#xff0c;点i权值ai&#xff08;1<ai<2^30&#xff09; 修改最少的点的权值&#xff0c;使得树上不存在异或和为0的简单路径&#xff0c;输出最少的点数 权值可以被修改成任意正整数&#xff08;可以是无限大&#xff09; 思路来源 官方…

【leetcode】环形链表✚环形链表II

大家好&#xff0c;我是苏貝&#xff0c;本篇博客带大家刷题&#xff0c;如果你觉得我写的还不错的话&#xff0c;可以给我一个赞&#x1f44d;吗&#xff0c;感谢❤️ 目录 1.环形链表解题拓展&#xff1a; 2.环形链表II 1.环形链表 点击查看题目 解题 思路: bool hasCycle…

【算法集训】基础算法:基础排序 - 插入排序

一、基本理解 插入排序(nsertion Sort)&#xff0c;一般也被称为直接插入排序&#xff0c;是一种简单直观的排序算法。 **工作原理&#xff1a;**将待排列元素划分为「已排序」和「未排序」两部分&#xff0c;每次从「未排序的」元素中选 择一个插入到「已排序的」元素中的正确…

剑指offer58—II 左旋转字符串 c++

题目 字符串的左旋转操作是把字符串前面的若干个字符转移到字符串的尾部。请定义一个函数实现字符串左旋转操作的功能。比如,输入字符串"abcdefg"和数字2,该函数将返回左旋转两位得到的结果"cdefgab"。 示例 1: 输入: s = “abcdefg”, k = 2 输出: “…

MySQL 多表查询 连接查询 内连接

介绍 内连接查询是两张表中交集的部分 连接模式 隐式内连接 SELECT 字段列表 FROM 表1,表2 WHERE 条件显式内连接 SELECT 字段列表 FROM 表1 [INNER] JOIN 表2 ON 连接条件案例 有两张表一个表为学生表&#xff0c;另一个表为班级表&#xff0c;现在需要查询学生时候在查…

接口测试(全)

&#x1f345; 视频学习&#xff1a;文末有免费的配套视频可观看 &#x1f345; 关注公众号【互联网杂货铺】&#xff0c;回复 1 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 大多数人对于接口测试都觉得是一种高大上的测试&#xff0c;觉得…

羊大师分析,羊奶粉适合什么样的人群喝

羊大师分析&#xff0c;羊奶粉适合什么样的人群喝 羊奶粉适合多种人群食用&#xff0c;包括儿童、老年人、孕妇以及身体虚弱或处于疾病康复期的人群。 对于儿童来说&#xff0c;羊奶粉是一种很好的营养品。它含有丰富的蛋白质、脂肪、矿物质和维生素&#xff0c;能够满足儿童…

【前端素材】推荐优质后台管理系统网页Star admin平台模板(附源码)

一、需求分析 1、系统定义 后台管理系统是一种用于管理和控制网站、应用程序或系统的管理界面。它通常被设计用来让网站或应用程序的管理员或运营人员管理内容、用户、数据以及其他相关功能。后台管理系统是一种用于管理网站、应用程序或系统的工具&#xff0c;通常由管理员使…

三种图片预览插件viewer、vue-photo-preview、vue-picture-preview

第一种&#xff1a;viewerjs使用介绍 1、先安装依赖 npm install v-viewer --save2、main.js内引用并注册调用 //main.js import Viewer from ‘v-viewer’ import ‘viewerjs/dist/viewer.css’ Vue.use(Viewer); Viewer.setDefaults({ Options: { “inline”: true, “butt…

王志亮出席海尔智慧楼宇发酵行业的低碳节能解决方案

演讲嘉宾&#xff1a;王志亮 食品医药用户群总监 青岛海尔空调电子有限公司 演讲题目&#xff1a;海尔智慧楼宇在发酵行业的低碳、节能解决方案 会议简介 “十四五”规划中提出&#xff0c;提高工业、能源领城智能化与信息化融合&#xff0c;明确“低碳经济”新的战略目标&…

System Verilog学习笔记(十一)——数组(1)

System Verilog学习笔记&#xff08;十一&#xff09;——数组&#xff08;1&#xff09; 非组合型&#xff08;unpacked&#xff09; 成员之间存储数据都是相互独立的可以索引非组合型数组或者数组片段的能力声明方式&#xff1a; logic [31&#xff1a;0] data [1024]; lo…

黑马JUC笔记

黑马JUC笔记 1.概览 2.进程与线程 2.1 进程与线程 进程 程序由指令和数据组成&#xff0c;但这些指令要运行&#xff0c;数据要读写&#xff0c;就必须将指令加载至 CPU&#xff0c;数据加载至内存。在 指令运行过程中还需要用到磁盘、网络等设备。进程就是用来加载指令、管…

Cisco Secure ACS 5.8.0.32 安装 + Crack 教程

Cisco Secure ACS 5.8.0.32 安装 Crack 教程 前言系统环境开始安装 开始破解导入授权文件 前言 在ESXi 6.7 上经历过无数次的安装尝试 测试了各种兼容版本都没有安装成功,记最后一次安装成功的过程. 系统环境 服务器 : Dell R720xd CPU : E5-2620 v2 系统 : ESXi 6.7…

简单控件属性设置

1、设置文本的内容 <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas.android.com/apk/res/android"android:layout_width"match_parent"android:layout_height"match_parent"…

十四、Qt主机信息与网络编程

一、主机信息 1、主机信息接口 QHostInfo&#xff1a;获取主机名称和IP地址QNetWorkInterface&#xff1a;获取主机的所有网络接口&#xff0c;包括子网掩码和广播地址等 &#xff08;1&#xff09;使用 项目添加模块QT network2、实现程序 &#xff08;1&#xff0…

【01】openEuler 源码安装 PostgreSQL

openEuler 源码安装 PostgreSQL 部署环境说明Shell 前端软件包管理器基础概念YUM 简介DNF 简介 源码安装 PostgreSQL环境变量&#xff08;env&#xff09;设置临时环境变量设置永久环境变量设置 初始化数据库&#xff08;initdb&#xff09; 数据库基本操作数据库基本配置&…

WiFi协议的调制技术介绍

调制技术是WiFi协议的核心部分&#xff0c;它负责将数据转换成可以在无线信道中传输的信号。WiFi协议采用正交频分复用&#xff08;OFDM&#xff09;调制技术&#xff0c;该技术通过将数据分成多个子载波进行传输&#xff0c;提高了信道利用率和抗干扰能力。 OFDM调制的工作原…

推特API(Twitter API)V2 用户关注

前面章节已经介绍使用code换取Token的整个流程了&#xff0c;这里不再重复阐述了&#xff0c;下面我们获取到用户token以后如何帮用户自动关注别人。需要参数关注者的用户ID&#xff08;token授权用户&#xff09;以及关注的目标用户ID。用户ID如何获取可以看上一章节获取用户信…

c++结构体内存对齐

结构体内存对齐 试试运行下面的例子 #include <stdio.h> #include <stdlib.h>using namespace std;struct A{char c;int i; };struct B{char c; int i; double d; };struct C{char c;int i;double d;char c1; };int main(){printf("sizeof(A): %d\n"…