网页端HTML使用MQTTJs订阅RabbitMQ数据

  最近在做一个公司的日志组件时有一个问题难住了我。今天问题终于解决了。由于在解决问题中,在网上也查了很多资料都没有一个完整的实例可以参考。所以本着无私分享的目的记录一下完整的解决过程和实例。

  需求:做一个统一日志系统可以查看日志列表和一个可以订阅最新日志的页面。通过提供一个封装好日志记录方法的sdk文件将日志统一收集。

  通过上面的需求进行我们使用RabbitMQ+Mongodb来实现系统。

  使用C#封装一个SDK大家都会这里就不说了。C#连接RabbitMQ示例代码也是一堆堆的也没什么好说的。下面重点说一下网页端如何使用JS去订阅RabbitMQ收到的最新日志信息。

  后端都是使用RabbitMQ的AMQP协议,而前端要求在网页HTML上显示数据。我们选择了使用MQTT协议从RabbitMQ中订阅数据。

  具体步骤:

1、先准备好相关JS库。MQTT有一个叫browserMqtt.js看名字就知道是为浏览器提供的JS库。还有一个封装了操作MQ的JS库 mqfactory.js。最后还要一个jquery.js文件。这样工具就准备好了。JS文件下载

2、HTML端代码。

<script type="text/javascript" src="~/js/MqJs/jquery.js"></script>
<script type="text/javascript" src="~/js/MqJs/browserMqtt.min.js"></script>
<script type="text/javascript" src="~/js/MqJs/mqfactory.js"></script>
<body><div><lable>Host: </lable><input id="txtHost" placeholder="192.168.1.88" value="10.1.0.7" /><br /><lable>Port: </lable><input id="txtPort" placeholder="15675" value="15675" /><br /><label>UserName: </label><input id="txtUserName" placeholder="username" value="admin" /><br /><label>Password: </label><input id="txtPassword" placeholder="password" value="admin" /><br /><label>Protocol: </label><input id="txtProtocol" placeholder="ws" value="ws" /><br /><input id="btnConnect" type="button" value="Connect RabbitMQ" /></div><div><input id="btnSubscribe" type="button" value="Subscribe" /><input id="btnPublish" type="button" value="Publish" /><br /><input id="btnSSHuanjing" type="button" value="Subscribe Huanjing" /><input id="hdnIsSubscribed" type="hidden" value="" /><input id="btnPubHuanjing" type="button" value="Publish Huanjing"><br />路由:<input id="btnRoutingKey" type="text" value="Dcon/Logs/Client"><br /><input id="txtMessage" type="text" placeholder="Please enter message" /></div><div><label>log:</label><br /><ul id="lstLog"></ul><input id="btnClearLog" type="button" value="Clear Log" /></div>
</body>
<script type="text/javascript">$(function () {var mqclient;//var routingKey = 'Dcon.Logs.ServerWebShow';var message;$('#btnSubscribe').attr('disabled', 'disabled');$('#btnPublish').attr('disabled', 'disabled');$('#btnSSHuanjing').attr('disabled', 'disabled');$('#btnPubHuanjing').attr('disabled', 'disabled');$('#btnConnect').click(function () {var mqttOpts = {host: (() => $('#txtHost').val())(),port: (() => $('#txtPort').val())(),username: (() => $('#txtUserName').val())(),password: (() => $('#txtPassword').val())(),//transformWsUrl方法用于在浏览器中使用MQTT的场景,默认情况下,MQTT自动生成的url为ws://ip:port形式,//然而服务器要求的格式是ws://ip:port/ws,所以MQTT提供了此接口用于在生成url时自定义url格式transformWsUrl: (url, opts, client) => { return opts.protocol && opts.protocol == 'ws' ? url + 'ws' : url; },clientId: (() => { return 'mqttjs_' + Math.random().toString(16).substr(2, 8); })()};var biz = {huanjing: function (handler, isOn) {if (isOn !== false) {this.ss(this.topics.huanjing, handler);} else {this.sus(this.topics.huanjing, handler);}},topics: {huanjing: '/hyj/huanjing/monitor'}};//系统初始化时注入连接选项mqfactory.inject(mqttOpts, biz);//创建mqclient单例 mqclient = mqfactory.create();//注册mqclient的连接成功事件mqclient.on('connect', mqconnected);});$('#btnSubscribe').click(function () {if ($(this).val() == 'Subscribe') {//订阅成功后,仅注册一次事件(要考虑每次注册事件时,事件处理器调用的次数,如果仅用一次,就用once方法)//routingKey = $("#btnRoutingKey").val();mqclient.once('onss', mqSubscribeSuccess);//简单订阅mqclient.ss($("#btnRoutingKey").val());} else {mqclient.once('onsus', mqUnsubscribeSuccess)mqclient.sus($("#btnRoutingKey").val());}});$('#btnPublish').click(function () {var msg = $('#txtMessage').val().length > 0 ? $('#txtMessage').val() : guid();if (message === msg) {msg = guid();}message = msg;$('#txtMessage').val(message);//发送消息mqclient.pub($("#btnRoutingKey").val(), message);$('#lstLog').append('<li>Send Message: ' + message + '</li>');});$('#btnSSHuanjing').click(function () {if ($(this).val() == 'Subscribe Huanjing') {mqclient.once('onss', mqHJSubscribeSuccess);mqclient.huanjing(onHuanjingMessageArrived);} else {mqclient.once('onsus', mqHJUnsubscribeSuccess);mqclient.huanjing(onHuanjingMessageArrived, false);}});$('#btnPubHuanjing').click(function () {var msg = $('#txtMessage').val().length > 0 ? $('#txtMessage').val() : guid();if (message === msg) {msg = guid();}message = msg;$('#txtMessage').val(message);//发送消息mqclient.pub(mqclient.topics.huanjing, message);$('#lstLog').append('<li>Send Huanjing Message: ' + message + '</li>');});$('#btnClearLog').click(function () {$('#lstLog').empty();});function mqconnected() {//alert("mqconnected");$('#btnSubscribe').removeAttr('disabled');$('#btnPublish').removeAttr('disabled');$('#btnSSHuanjing').removeAttr('disabled');$('#btnPubHuanjing').removeAttr('disabled');$('#lstLog').append('<li>mqclient connected</li>');}function mqSubscribeSuccess() {//订阅成功,就注册接受消息的方法,此处要接收多次,因此使用了onmqclient.on($("#btnRoutingKey").val(), onMessageArrived);$('#btnSubscribe').val('Unsubscribe');$('#lstLog').append('<li>Subscribe successful.' + $("#btnRoutingKey").val()+'</li>');}function mqUnsubscribeSuccess() {//注销订阅,所以将事件处理器解除绑定mqclient.off($("#btnRoutingKey").val(), onMessageArrived);$('#btnSubscribe').val('Subscribe');$('#lstLog').append('<li>Unsubscribe successful</li>');}function mqHJSubscribeSuccess() {$('#btnSSHuanjing').val('Unsubscribe Huanjing');$('#lstLog').append('<li>Hanjing Subscribe successful</li>');}function mqHJUnsubscribeSuccess() {$('#btnSSHuanjing').val('Subscribe Huanjing');$('#lstLog').append('<li>Huanjing Unsubscribe successful</li>');}function onMessageArrived(message) {$('#lstLog').append('<li>Receive message: ' + new Date().toString() + '    ' + message.toString() + '</li>');}function onHuanjingMessageArrived(message) {$('#lstLog').append('<li>Receive Huanjing message: ' + new Date().toString() + '    ' + message.toString() + '</li>');}function guid() {function s4() {return Math.floor((1 + Math.random()) * 0x10000).toString(16).substring(1);}return s4() + s4() + '-' + s4() + '-' + s4() + '-' +s4() + '-' + s4() + s4() + s4();}});
</script>

3.后端代码:

3.1客户端sdk代码

/// <summary>/// 写日志/// </summary>/// <param name="model"></param>public static void Write(LogModel model){//判断写入的日志级别if (model != null && model.LogLevel >= LogLevel){try{var mqMsg = new MqMessage(){MessageBody = JSON.Serialize(model),MessageRouter = SystemConst.RoutingKeyTopic.LogTopic_Producer};//MQHelper.Instance.ProducerMessage_Fanout(mqMsg);MQHelper.Instance.ProducerMessage_Topic(mqMsg);}catch (Exception ex){var errorLog = string.Format("Ip:{0},LogHelper.Write方法异常,{1}", IpHelper.LocalHostIp, ex.Message);//MQHelper.Instance.ProducerMessage_Fanout(new MqMessage() { MessageBody = errorLog });MQHelper.Instance.ProducerMessage_Topic(new MqMessage() { MessageBody = errorLog });}}}

3.2后端MQ代码:

#region 主题 交换机/// <summary>/// 生产者 客户端调用/// </summary>/// <param name="msg"></param>public void ProducerMessage_Topic(MqMessage msg){try{using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){var body = Encoding.UTF8.GetBytes(msg.MessageBody);channel.BasicPublish(exchange: SystemConst.MqName_LogMq_TopicDefault,routingKey: msg.MessageRouter,basicProperties: null,body: body);Console.WriteLine(" [x] Sent {0}", msg.MessageBody);}}}catch (Exception ex){var exMsg = ex.Message;}}/// <summary>/// 消费者 服务器接收并写入数据库/// 消费方法无法通过参数传入/// EventHandler<BasicDeliverEventArgs> received/// </summary>public void ConsumeMessage_Topic(params string[] routingKeys){if (routingKeys == null || routingKeys.Length == 0){throw new Exception("请指定接收路由");}using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){var queueName = channel.QueueDeclare().QueueName;//获得已经生成的随机队列名//对列与交换机绑定foreach (var rKey in routingKeys){channel.QueueBind(queue: queueName,exchange: SystemConst.MqName_LogMq_TopicDefault,routingKey: rKey);}var consumer = new EventingBasicConsumer(channel);//绑定消费方法consumer.Received += consomer_Received_Topic;//绑定消费者channel.BasicConsume(queue: queueName,autoAck: true,consumer: consumer);Console.WriteLine("日志订阅服务启动成功.");Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}}}/// <summary>/// 接收通知服务异步的推送/// </summary>/// <param name="sender"></param>/// <param name="e"></param>void consomer_Received_Topic(object sender, BasicDeliverEventArgs e){var body = e.Body;var message = Encoding.UTF8.GetString(body);Console.WriteLine(" [x] {0}", message);
//这里可以增加写入数据库的代码}#endregion

3.3路由

/// <summary>/// 主题路由/// </summary>public class RoutingKeyTopic{/// <summary>/// 生产者/// </summary>public const string LogTopic_Producer = "Dcon.Logs.Client";/// <summary>/// 消息者_日志服务_保存日志/// </summary>public const string LogTopic_Consume_Server_SaveDB = "Dcon.Logs.*";/// <summary>/// 消息者_日志服务_Web显示日志/// </summary>public const string LogTopic_Consume_Server_WebShow = "Dcon.Logs#";//".Logs.Client";/// <summary>/// 消息者_日志服务_Web显示日志/// </summary>public const string LogTopic_Consume_Server_WebShow_T = "*.Logs.Client";//".Logs.Client";/// <summary>/// 消息者_日志服务_ # 接收所有/// </summary>public const string LogTopic_Consume_Server_All = "#";//".Logs.Client";}}

注意点:

1、MQTT的路由是以 / 来分割的。在RabbitMQ中会被转义成 . 如示例中的路由Dcon/Logs/Client会被转换成 Dcon.Logs.Client

2、网页端接收时的路由要和发送端的路由一至。也就是说 后端用 Dcon.Logs.Client 来推数据前端就要使用 Dcon/Logs/Client来接收数据。

3、MQTT路由不支持通配符.

4、由于MQTT的JS库没有提供Topic交换机与路由绑定功能。所以前端接收时 不能设置订阅主题交换机名称。如果要和amqp交互只能使用amqp的默认主题交换机名称 amq.topic

运行效果图:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/806283.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【机器学习】科学库使用第4篇:Matplotlib,学习目标【附代码文档】

机器学习&#xff08;科学计算库&#xff09;完整教程&#xff08;附代码资料&#xff09;主要内容讲述&#xff1a;机器学习&#xff08;常用科学计算库的使用&#xff09;基础定位、目标&#xff0c;机器学习概述定位,目标,学习目标,学习目标,1 人工智能应用场景,2 人工智能小…

uniapp小程序编译报错

说明 微信小程序编译每次都出现[ project.config.json 文件内容错误] project.config.json: libVersion 字段需为 string, 解决 找到manifest.json文件 添加&#xff1a;"libVersion": "latest"&#xff0c;重新编译即可。

产品推荐 | iWave 的 FPGA-IP 评估附加 FMC 卡

1、产品概述 iWave 的 FPGA-IP 评估附加 FMC 卡旨在满足 ANSI/VITA 57.1 FMC 标准。该卡支持高引脚数 &#xff08;HPC&#xff09; 和低引脚数 &#xff08;LPC&#xff09; 连接器&#xff0c;可在风冷环境中使用。FPGA-IP评估附加卡可以与市场上的大多数FPGA开发套件连接。…

去中心化社交媒体:分析 Facebook 在区块链平台上的角色

在当今数字时代&#xff0c;社交媒体已经成为人们日常生活中不可或缺的一部分。然而&#xff0c;随着人们对数据隐私和信息控制的关注不断增加&#xff0c;传统的中心化社交媒体平台也面临着越来越多的质疑和挑战。为了应对这些挑战&#xff0c;越来越多的人开始探索去中心化社…

怎样关闭浏览器文件下载安全病毒中检测功能

怎样关闭浏览器文件下载安全病毒中检测功能 有时候需要通过浏览下载一些特殊文件&#xff0c;浏览器会提示有病毒&#xff0c;终止下载并且自动删除文件。 以为是浏览器的问题&#xff0c;用 chrome、Edge、firefox 三种浏览器下载均失败。 尝试关闭了所有浏览器安全防护也不行…

赋能Web3用户:增强在线隐私

随着数字化时代的发展&#xff0c;人们越来越依赖互联网来进行各种活动&#xff0c;从社交互动到金融交易&#xff0c;几乎所有的日常生活都离不开网络。然而&#xff0c;随之而来的是个人隐私安全面临的挑战。在传统的互联网架构下&#xff0c;用户的个人数据往往被中心化的平…

【MIT6.S081】Lab1: Xv6 and Unix utilities(详细解答版)

实验内容网址&#xff1a;https://xv6.dgs.zone/labs/requirements/lab1.html Sleep 关键点&#xff1a;函数参数判断、系统函数调用 思路&#xff1a; 通过argc来判断函数参数是否正确&#xff0c;通过atoi函数来讲字符串转化为整型&#xff0c;调用sleep函数后退出程序。 代…

Vue2和Vue3组件通信:父子与兄弟间的桥梁

&#x1f31f; 前言 欢迎来到我的技术小宇宙&#xff01;&#x1f30c; 这里不仅是我记录技术点滴的后花园&#xff0c;也是我分享学习心得和项目经验的乐园。&#x1f4da; 无论你是技术小白还是资深大牛&#xff0c;这里总有一些内容能触动你的好奇心。&#x1f50d; &#x…

3D Matching:实现halcon中的find_surface_model

halcon中的三维匹配大致分为两类&#xff0c;一类是基于形状的(Shape-Based)&#xff0c;一类是基于表面的(Surface-Based)。基于形状的匹配可用于单个2D图像中定位复杂的3D物体&#xff0c;3D物体模型必须是CAD模型&#xff0c;且几何边缘清晰可见&#xff0c;使用的相机也要预…

性能优化原则

相关链接&#xff1a;【运行环境】加载资源的形式 性能优化 1 性能优化原则 多使用内存、缓存或其他方法 减少CPU计算量&#xff0c;减少网络加载耗时 &#xff08;适用于所有编程的性能优化----空间换时间&#xff09; 2 从何入手 性能优化-让加载更快 减少资源体积&#x…

iPad手绘+Ai二合一课程,Procreate+Mj+SD零基础到精通(10节视频课)

课程内容&#xff1a; 1 系统课 AI辅助设计流-从零进阶轻松驾驭AI设计,mp4 2 商务沟通阶段 ChatGPT Midjourney-聊天机器人 项目调研资料收集 ,mp4 3_商务沟通阶段 ChatGPT_Midjourney-Midjourney基础 界面初识初步设置 .mp4 4_商务沟通阶段 ChatGPT_Midjourney-Midjourney…

康谋分享 | aiSim5 物理相机传感器模型验证方法(一)

摘要&#xff1a; aiSim5可以实时模拟复杂的传感器配置&#xff0c;在多GPU分布式渲支持的支持下&#xff0c;aiSim可以渲染20多个摄像头、10多个雷达和10多个激光雷达在同一环境下运行。aiSim5独有的实时渲染引擎能够满足对物理精确环境和天气模拟的所有要求&#xff0c;具有…

【MATLAB源码-第3期】基于MATLAB的256QAM误码率曲线,使用IQ调制解调,以及星座图展示。

1、算法描述 正交幅度调制&#xff08;QAM&#xff0c;Quadrature Amplitude Modulation&#xff09;是一种在两个正交载波上进行幅度调制的调制方式。这两个载波通常是相位差为90度&#xff08;π/2&#xff09;的正弦波&#xff0c;因此被称作正交载波。这种调制方式因此而得…

虚幻引擎启动报错记录

0x00007FFEF0C8917C (UnrealEditor-CoreUObject.dll)处(位于 UnrealEditor.exe 中)引发的异常: 0xC0000005: 写入位置 0x0000000000000030 时发生访问冲突。 解决办法&#xff1a;首先查看堆栈信息&#xff0c;我的项目启动是因为默认场景编译不过&#xff0c;进到编辑器配置文…

蓝桥杯真题代码记录(直线

目录 1. 题目&#xff1a;2. 我的代码&#xff1a;小结&#xff1a; 1. 题目&#xff1a; 本题为填空题&#xff0c;只需要算出结果后&#xff0c;在代码中使用输出语句将所填结果输出即可。 在平面直角坐标系中&#xff0c;两点可以确定一条直线。如果有多点在一条直线上&…

新零售SaaS架构:客户管理系统架构设计(万字图文总结)

什么是客户管理系统&#xff1f; 客户管理系统&#xff0c;也称为CRM&#xff08;Customer Relationship Management&#xff09;&#xff0c;主要目标是建立、发展和维护好客户关系。 CRM系统围绕客户全生命周期的管理&#xff0c;吸引和留存客户&#xff0c;实现缩短销售周…

CentOS上使用cgroup限制进程使用内存

安装cgroup 要使用cgroup首先需要系统支持&#xff0c;需要安装两个rpm包 yum install libcgroup libcgroup-tools 创建限制内存的cgroup组 cgroup组需要在/sys/fs/cgroup/memory目录下创建&#xff0c;我们创建一个限制进程内存大小为10M的cgroup组&#xff0c;这个组中内存…

CUDA 12.4文档2 内核线程架构

本博客参考官方文档进行介绍&#xff0c;全网仅此一家进行中文翻译&#xff0c;走过路过不要错过。 官方网址&#xff1a;https://docs.nvidia.com/cuda/cuda-c-programming-guide/ 本文档分成多个博客进行介绍&#xff0c;在本人专栏中含有所有内容&#xff1a; https://bl…

C++设计模式:单例模式(十)

1、单例设计模式 单例设计模式&#xff0c;使用的频率比较高&#xff0c;整个项目中某个特殊的类对象只能创建一个 并且该类只对外暴露一个public方法用来获得这个对象。 单例设计模式又分懒汉式和饿汉式&#xff0c;同时对于懒汉式在多线程并发的情况下存在线程安全问题 饿汉…

【优选算法专栏】专题十三:队列+宽搜(一)

本专栏内容为&#xff1a;算法学习专栏&#xff0c;分为优选算法专栏&#xff0c;贪心算法专栏&#xff0c;动态规划专栏以及递归&#xff0c;搜索与回溯算法专栏四部分。 通过本专栏的深入学习&#xff0c;你可以了解并掌握算法。 &#x1f493;博主csdn个人主页&#xff1a;小…