Docker Compose 构建 EMQX 集群 实现mqqt 和websocket

EMQX 集群化管理mqqt真香

目录

#目录 /usr/emqx

容器构建

vim docker-compose.yml

version: '3'services:emqx1:image: emqx:5.8.3container_name: emqx1environment:- "EMQX_NODE_NAME=emqx@node1.emqx.io"- "EMQX_CLUSTER__DISCOVERY_STRATEGY=static"- "EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io,emqx@node2.emqx.io]"healthcheck:test: ["CMD", "/opt/emqx/bin/emqx", "ctl", "status"]interval: 5stimeout: 25sretries: 5networks:emqx-bridge:aliases:- node1.emqx.ioports:- 1883:1883- 8083:8083- 8084:8084- 8883:8883- 18083:18083 # volumes:#   - $PWD/emqx1_data:/opt/emqx/dataemqx2:image: emqx:5.8.3container_name: emqx2environment:- "EMQX_NODE_NAME=emqx@node2.emqx.io"- "EMQX_CLUSTER__DISCOVERY_STRATEGY=static"- "EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io,emqx@node2.emqx.io]"healthcheck:test: ["CMD", "/opt/emqx/bin/emqx", "ctl", "status"]interval: 5stimeout: 25sretries: 5networks:emqx-bridge:aliases:- node2.emqx.io# volumes:#   - $PWD/emqx2_data:/opt/emqx/datanetworks:emqx-bridge:driver: bridge

启动

docker-compose up -d

集群状态

#查看集群状态
docker exec -it emqx1 sh -c "emqx ctl cluster status"#验证
telnet 192.168.0.15 1883
#内网
nc -zv  192.168.0.15 1883 #账户
admin
#默认密码
public

服务开放端口

1883,8083,8084,8883,18083

端口占用

EMQX 默认使用以下端口,请确保这些端口未被其他应用程序占用,并按照需求开放防火墙以保证 EMQX 正常运行。

端口协议描述
1883TCPMQTT over TCP 监听器端口,主要用于未加密的 MQTT 连接。
8883TCPMQTT over SSL/TLS 监听器端口,用于加密的 MQTT 连接。
8083TCPMQTT over WebSocket 监听器端口,使 MQTT 能通过 WebSocket 进行通信。
8084TCPMQTT over WSS (WebSocket over SSL) 监听器端口,提供加密的 WebSocket 连接。
18083HTTPEMQX Dashboard 和 REST API 端口,用于管理控制台和 API 接口。
4370TCPErlang 分布式传输端口,根据节点名称不同实际端口可能是 BasePort (4370) + Offset。
5370TCP集群 RPC 端口(在 Docker 环境下为 5369),根据节点名称不同实际端口可能是 BasePort (5370) + Offset。

前端js示

<!DOCTYPE html>
<html>
<head><title>MQTT WebSocket Test</title><script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
</head>
<body><script>// 使用提供的客户端 ID 或生成一个唯一 IDconst clientId = 'emqx_NjI4MT2';// 配置 WebSocket MQTT broker 地址const host = 'ws://127.0.0.1:8083/mqtt';// MQTT 连接选项const options = {keepalive: 60, // 心跳时间间隔clientId: clientId,protocolId: 'MQTT', // 协议 IDprotocolVersion: 5, // 使用 MQTT 5 协议clean: true, // 是否清除会话reconnectPeriod: 1000, // 重连间隔时间 (ms)connectTimeout: 30 * 1000, // 连接超时时间 (ms)username: 'admin', // 设置用户名password: 'public', // 设置密码will: {topic: 'pushRanking/1',payload: 'Connection Closed abnormally..!',qos: 0,retain: false},};console.log('Connecting mqtt client');// 连接到 MQTT Brokerconst client = mqtt.connect(host, options);// 连接成功回调client.on('connect', () => {console.log('Connected to MQTT broker');// 订阅主题 pushRanking/#,支持通配符client.subscribe('pushRanking/1', { qos: 0 }, (err) => {if (!err) {console.log('Subscribed to topic: pushRanking/1');} else {console.error('Failed to subscribe:', err);}});});// 处理接收到的消息client.on('message', (topic, message) => {console.log(`Received message from topic "${topic}": ${message.toString()}`);});// 连接错误回调client.on('error', (err) => {console.log('Connection error:', err);client.end();});// 重新连接回调client.on('reconnect', () => {console.log('Reconnecting...');});// 连接关闭回调client.on('close', () => {console.log('Connection closed');});// 模拟消息发布以测试接收setTimeout(() => {client.publish('pushRanking/1', JSON.stringify({ msg: 'hello' }), { qos: 0 });}, 5000);</script>
</body>
</html>

后端

package emqximport ("fmt"mqtt "github.com/eclipse/paho.mqtt.golang""testing""time"
)func TestMQTT(t *testing.T) {// 创建 EMQX 客户端实例client := NewEMQXClient("tcp://127.0.0.1:1883", "test-client", "admin", "QfTzLy3cop9NOGWj")// 连接到 EMQXif err := client.Connect(); err != nil {fmt.Printf("Failed to connect: %v\n", err)return}defer client.Disconnect()// 订阅主题client.Subscribe("testtopic/#", 1, func(client mqtt.Client, msg mqtt.Message) {fmt.Printf("Message received: %s\n", msg.Payload())})// 发布消息client.Publish("testtopic/1", 1, false, "Hello from Golang!")// 保持连接一段时间以接收消息time.Sleep(10 * time.Second)
}/*长连接的场景 DEMOfunc main() {client := emqxclient.NewEMQXClient("tcp://broker.emqx.io:1883", "test-client", "", "")if err := client.Connect(); err != nil {fmt.Printf("Failed to connect: %v\n", err)return}// 使用 defer 确保程序退出时断开连接defer client.Disconnect()// 订阅主题client.Subscribe("test/topic", 1, func(client mqtt.Client, msg mqtt.Message) {fmt.Printf("Received message: %s\n", msg.Payload())})// 发布消息client.Publish("test/topic", 1, false, "Hello from Golang!")// 捕获退出信号signalChan := make(chan os.Signal, 1)signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)fmt.Println("Running... Press Ctrl+C to exit.")<-signalChanfmt.Println("Exiting...")
}*//*JS 调用如下
import mqtt from 'mqtt';const brokerURL = 'ws://broker.emqx.io:8083/mqtt'; // WebSocket 连接地址
const clientID = `mqttjs_${Math.random().toString(16).substr(2, 8)}`;// 创建客户端
const client = mqtt.connect(brokerURL, {clientId: clientID,username: '', // 如需要认证,填入用户名password: '', // 如需要认证,填入密码
});// 连接事件
client.on('connect', () => {console.log('Connected to EMQX');// 订阅主题client.subscribe('test/topic', (err) => {if (!err) {console.log('Subscribed to topic: test/topic');} else {console.error('Failed to subscribe:', err);}});// 发布消息client.publish('test/topic', 'Hello from JavaScript!');
});// 接收消息事件
client.on('message', (topic, message) => {console.log(`Received message on topic "${topic}": ${message.toString()}`);
});// 错误事件
client.on('error', (err) => {console.error('Connection error:', err);
});*/

common封装调用

package emqximport ("fmt""time"mqtt "github.com/eclipse/paho.mqtt.golang"
)type EMQXClient struct {client mqtt.Client
}// NewEMQXClient 初始化 EMQX 客户端
func NewEMQXClient(broker string, clientID string, username string, password string) *EMQXClient {opts := mqtt.NewClientOptions().AddBroker(broker).SetClientID(clientID).SetUsername(username).SetPassword(password).SetKeepAlive(60 * time.Second).SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) {fmt.Printf("Received message on topic: %s, message: %s\n", msg.Topic(), msg.Payload())}).SetPingTimeout(1 * time.Second)client := mqtt.NewClient(opts)return &EMQXClient{client: client}
}// Connect 连接到 EMQX
func (c *EMQXClient) Connect() error {token := c.client.Connect()if token.Wait() && token.Error() != nil {return token.Error()}fmt.Println("Connected to EMQX broker")return nil
}// Publish 发布消息
func (c *EMQXClient) Publish(topic string, qos byte, retained bool, payload interface{}) error {token := c.client.Publish(topic, qos, retained, payload)token.Wait()return token.Error()
}// Subscribe 订阅主题
func (c *EMQXClient) Subscribe(topic string, qos byte, callback mqtt.MessageHandler) error {token := c.client.Subscribe(topic, qos, callback)token.Wait()return token.Error()
}// Unsubscribe 取消订阅
func (c *EMQXClient) Unsubscribe(topics ...string) error {token := c.client.Unsubscribe(topics...)token.Wait()return token.Error()
}// Disconnect 断开连接
func (c *EMQXClient) Disconnect() {c.client.Disconnect(250)fmt.Println("Disconnected from EMQX broker")
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/65790.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何在没有 iCloud 的情况下将数据从 iPhone 传输到 iPhone

概括 您可能会遇到将数据从 iPhone 转移到 iPhone 的情况&#xff0c;尤其是当您获得新的 iPhone 15/14 时&#xff0c;您会很兴奋并希望将数据转移到它。 使用iCloud最终可以做到这一点&#xff0c;但它的缺点也不容忽视&#xff0c;阻碍了你选择它。例如&#xff0c;您需要…

CUDA与Microsoft Visual Studio不兼容问题

简介&#xff1a;在安装一些 python库时&#xff0c;涉及到第三方库&#xff08;特别是需要引用 C 代码&#xff09;时&#xff0c;通常的安装方式会涉及到编译过程&#xff0c;通常称为"源代码安装"&#xff08;source installation&#xff09;&#xff0c;或是 “…

Unity2D无限地图的实现(简单好抄)

说明&#xff1a;本教程实现的是在2D游戏中玩家在游戏中上下左右移动的时候自动进行地图拼接的功能&#xff0c;如果你只想实现左右移动的无限地图&#xff0c;那么这篇博客也能起到一定参考作用。 思路 第一步&#xff1a; 创建一个10*10的2D游戏对象当做地图 第二步创建一个…

C13.【C++ Cont】初识string类字符串的迭代器

目录 1.迭代器的定义 2.迭代器的作用 3.string类字符串的常用迭代器 4.第3点的两个迭代器的使用 示例代码1:解引用 运行结果 示例代码2 运行结果 示例代码3:用迭代器正序遍历字符串 运行结果 示例代码4:用迭代器逆序遍历字符串 运行结果 示例代码5:用迭代器修改字…

HTML——13.超链接

<!DOCTYPE html> <html><head><meta charset"UTF-8"><title>超链接</title></head><body><!--超链接:从一个网页链接到另一个网页--><!--语法&#xff1a;<a href"淘宝网链接的地址"> 淘宝…

《代码随想录》Day22打卡!

回溯算法 《代码随想录》回溯算法&#xff1a;组合 本题完整题目如下&#xff1a; 本题的完整思路如下&#xff1a; 1.本题使用回溯算法&#xff0c;其实回溯和递归是一样的道理&#xff0c;也是分为三步曲进行&#xff1a; 2.第一步&#xff1a;确定递归函数的返回值和参数&…

鱼眼相机模型与去畸变实现

1.坐标系说明 鱼眼相机模型涉及到世界坐标系、相机坐标系、图像坐标系、像素坐标系之间的转换关系。对于分析鱼眼相机模型&#xff0c;假定世界坐标系下的坐标点,经过外参矩阵的变换转到相机坐标系&#xff0c;相机坐标再经过内参转换到像素坐标&#xff0c;具体如下 进一步进…

Windows 下安装 triton 教程

目录 背景解决方法方法一&#xff1a;&#xff08;治标不治本&#xff09;方法二&#xff1a;&#xff08;triton-windows&#xff09;- 安装 MSVC 和 Windows SDK- vcredist 安装- whl 安装- 验证 背景 triton 目前官方只有Linux 版本&#xff0c;若未安装&#xff0c;则会出…

如何使用网络工具进行网络性能评估

网络评估是对IT基础设施的系统评估&#xff0c;以确保它能够很好地满足企业的核心运营需求&#xff0c;确定了基础设施中需要改进的领域&#xff0c;并定义了改进的范围。 网络评估工具分析IT基础设施的各个方面&#xff0c;它通过评估网络设备、网络性能和安全威胁来仔细检查…

Vue项目整合与优化

前几篇文章&#xff0c;我们讲述了 Vue 项目构建的整体流程&#xff0c;从无到有的实现了单页和多页应用的功能配置&#xff0c;但在实现的过程中不乏一些可以整合的功能点及可行性的优化方案&#xff0c;就像大楼造完需要进行最后的项目验收改进一样&#xff0c;有待我们进一步…

Python、R用深度学习神经网络组合预测优化能源消费总量时间序列预测及ARIMA、xgboost对比...

全文链接&#xff1a;https://tecdat.cn/?p38726 分析师&#xff1a;Qingxia Wang 在能源领域&#xff0c;精准预测能源消费总量对制定合理能源战略至关重要。当前&#xff0c;能源消费预测分析主要运用单一模型&#xff08;如灰色预测法、时间序列分析法等&#xff09;和组合…

STM32使用UART发送字符串与printf输出重定向

首先我们先看STM32F103C8T6的电路图 由图可知&#xff0c;其PA9和PA10引脚分别为UART的TX和RX(注意&#xff1a;这个电路图是错误的&#xff0c;应该是PA9是X而PA9是RX&#xff0c;我们看下图的官方文件可以看出)&#xff0c;那么接下来我们应该找到该引脚的定义是什么&#xf…

Kotlin在医疗大健康域的应用实例探究与编程剖析(下)

四、Kotlin医疗编程实例分析 4.1 移动医疗应用实例 4.1.1 患者健康监测应用 在当今数字化医疗时代,患者健康监测应用为人们提供了便捷的健康管理方式。利用Kotlin开发的患者健康监测应用,能够实时采集患者的各类生理数据,如心率、血压、血氧饱和度等,并通过直观的可视化…

Redis 5设计与源码分析读书笔记

目录 引言Redis 5.0的新特性Redis源码概述Redis安装与调试 简单动态字符串数据结构基本操作创建字符串释放字符串拼接字符串扩容策略 其余API 本章小结兼容C语言字符串、保证二进制安全sdshdr5的特殊之处是什么SDS是如何扩容的 跳跃表简介跳跃表节点与结构跳跃表节点跳跃表结构…

Golang学习历程【第五篇 复合数据类型:数组切片】

Golang学习历程【第五篇 复合数据类型&#xff1a;数组&切片】 1. 数组&#xff08;Array&#xff09;1.1 数组的定义1.2 初始化数组1.3 数据的循环遍历1.4 多维数组 2. 切片&#xff08;Slice&#xff09;2.1 切片声明、初始化2.2 基于数组创建切片2.2 切片的长度(len)和容…

【Unity】 HTFramework框架(五十七)通过Tag、Layer批量搜索物体

更新日期&#xff1a;2024年12月30日。 Github源码&#xff1a;[点我获取源码] Gitee源码&#xff1a;[点我获取源码] 索引 问题再现通过Tag搜索物体&#xff08;SearchByTag&#xff09;打开SearchByTag窗口搜索标记指定Tag的所有物体批量修改Tag搜索Undefined状态的所有物体 …

基于feapder爬虫与flask前后端框架的天气数据可视化大屏

# 最近又到期末了&#xff0c;有需要的同学可以借鉴。 一、feapder爬虫 feapder是国产开发的新型爬虫框架&#xff0c;具有轻量且数据库操作方便、异常提醒等优秀特性。本次设计看来利用feapder进行爬虫操作&#xff0c;可以加快爬虫的速率&#xff0c;并且简化数据入库等操作…

PCL点云库入门——PCL库点云滤波算法之统计滤波(StatisticalOutlierRemoval)

1、算法原理 统计滤波算法是一种利用统计学原理对点云数据进行处理的方法。它主要通过计算点云中每个点的统计特性&#xff0c;如均值、方差等&#xff0c;来决定是否保留该点。算法首先会设定一个统计阈值&#xff0c;然后对点云中的每个点进行分析。如果一个点的统计特性与周…

CentOS7 解决ping:www.baidu.com 未知的名称或服务

CentOS7 解决ping&#xff1a;www.baidu.com“未知的名称或服务 在VM查看网络配置 查看虚拟网络编辑器 编辑网络配置文件 vi /etc/sysconfig/network-scripts/ifcfg-ens33注意&#xff1a;不同机器的配置文件名可能不相同&#xff0c;通过 ip addr 命令查看 将 ONBOOT 从 no 改…

aws(学习笔记第二十一课) 开发lambda应用程序

aws(学习笔记第二十一课) 开发lambda应用程序 学习内容&#xff1a; lambda的整体概念开发lambda应用程序 1. lambda的整体概念 借助AWS Lambda&#xff0c;无需预置或管理服务器即可运行代码。只需为使用的计算时间付费。借助 Lambda&#xff0c;可以为几乎任何类型的应用进…