使用 EMQX 开源版的 Webhook 机制处理消息并存储数据

1、前言

EMQX 是一款强大的开源 MQTT 消息代理,它支持大量的连接和高吞吐量,适用于各种物联网应用。Webhook 是 EMQX 提供的扩展功能之一,用于将消息推送到外部的 HTTP 服务。在本文中,我们将介绍如何使用 EMQX 开源版的 Webhook 机制,并展示如何处理收到的 Webhook 请求,将其中的数据存储到数据库中。

2、Webhook 简介

Webhook 是一种常见的 HTTP 回调机制,用于将事件或数据推送到外部服务器。当 MQTT 客户端发布消息时,EMQX 可以通过 Webhook 将该消息发送给指定的 HTTP 端点,方便我们在接收到消息后进一步处理数据。

3、搭建 Webhook 服务

接下来,我们编写一个简单的 SpringBoot 2.7服务,用于接收 EMQX 的 Webhook 请求并将其中的数据存储到数据库中。

3.1、项目依赖

pom.xml 中添加以下依赖:

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.13.5</version></dependency><!-- Jackson Databind --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.13.5}</version></dependency><!-- Jackson Annotations --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-annotations</artifactId><version>2.13.5</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.6</version></dependency></dependencies>  

3.2、实现 Webhook 控制器

3.2.1、Controller
package ....这里填写你自己的import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;import java.io.IOException;
import java.util.Map;@RestController
@RequestMapping("/emqx/test")
@AllArgsConstructor
@Slf4j
public class WebhookController {private final EmqxTestService emqxTestService;private final ObjectMapper objectMapper = new ObjectMapper();@PostMapping("/webhook")public String webhook(@RequestBody String payload) {try {// 解析主 JSON 字符串为 MapMap<String, Object> payloadMap = objectMapper.readValue(payload, new TypeReference<Map<String, Object>>() {});// 从主 Map 中提取 clientid 和 topicString clientId = (String) payloadMap.get("clientid");String topic = (String) payloadMap.get("topic");log.info("Received clientid: {}", clientId);log.info("Received topic: {}", topic);// 提取 payload 字段的 JSON 字符串String payloadString = (String) payloadMap.get("payload");// 解析 payload 字段的 JSON 字符串为 MapMap<String, Object> payloadDataMap = objectMapper.readValue(payloadString, new TypeReference<Map<String, Object>>() {});// 从 payload 数据中提取 msg 参数String msg = (String) payloadDataMap.get("msg");log.info("Received msg: {}", msg);// 创建 EmqxTest 实例并设置字段EmqxTest testData = new EmqxTest();testData.setData(payload);testData.setClientId(clientId);testData.setTopic(topic);// 保存数据emqxTestService.insertData(testData);} catch (IOException e) {log.error("解析JSON有效负载失败", e);return "Error parsing payload";}return "Received";}
}
 3.2.2、Service
package ....这里填写你自己的import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.ldb.tool.entity.EmqxTest;
import com.ldb.tool.mapper.EmqxTestMapper;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;import java.util.Date;
import java.util.List;@Service
@AllArgsConstructor
@Slf4j
public class EmqxTestService {private final EmqxTestMapper emqxTestMapper;public EmqxTest insertData(EmqxTest testData) {EmqxTest emqxTest = new EmqxTest();// 你可以手动设置其他需要的字段,如 clientId, topic, data 等emqxTest.setClientId(testData.getClientId());emqxTest.setTopic(testData.getTopic());emqxTest.setData(testData.getData());emqxTest.setCreateTime(new Date()); // 如果你有自动填充策略,可以忽略这行this.emqxTestMapper.insert(emqxTest);return emqxTest;}
}
3.2.3、Mapper
package ...这里填写你自己的;import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.ldb.tool.entity.EmqxTest;public interface EmqxTestMapper extends BaseMapper<EmqxTest> {
}
 3.2.4、Entity
package ...这里填写你自己的;import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;import java.io.Serializable;
import java.util.Date;@TableName("emqx_test")
@Data
public class EmqxTest implements Serializable {private static final long serialVersionUID = 1L;@TableId(value = "id", type = IdType.ASSIGN_ID)private Long id;private String clientId;private String topic;private String data;private Date createTime;private Date updateTime;
}

4、配置 EMQX Webhook

4.1、运行

我们这里使用docker来运行EMQX。

通过 Docker 运行 EMQX | EMQX文档

4.1.1、获取镜像
docker pull emqx/emqx:5.8.0
4.1.2、启动容器

docker run -d --name emqx \-p 1883:1883 -p 8083:8083 \-p 8084:8084 -p 8883:8883 \-p 18083:18083 \-v $PWD/data:/opt/emqx/data \-v $PWD/log:/opt/emqx/log \emqx/emqx:5.8.0

4.2、配置EMQX-Webhook

4.2.1、创建Webhook

访问EMQX可视化后台(http://localhost:18083/)=>集成=>Webhook=>创建Webhook

在填写设置的时候,需要注意的是我们本地docke访问宿主机,在容器内部URL:127.0.0.1,指向的是容器本身,你可以获取宿主机IP作为URL,比如192.168.30.44。

我们通过URL选项的测试按钮可以点击测试是否正常请求。

5、测试 Webhook

在保证我们的Java-Webhook、EMQX服务运行的情况下,我们可以通过MQTTX(简介 - MQTTX 文档)软件去模拟一台直连的MQTT设备发起一个主题,因为我们在创建Webhook的时候触发者是消息发布。

5.1、MQTTX发送主题

首先我们需要新建一个MQTT连接,配置如下所以,未设置认证的话不需要用户名密码。

右下角,我们填写主题(Topic)的消息路由为listen/me,消息内容为{"msg": "send messgae","status":1},点击小飞机按钮发送。

5.2、查看Webhook触发情况

在EMQX后台,集成=>Webhook,查看送达情况。

在查看我们的Java服务的日志打印,也收到了。

查看sql表,也已经正常保存。

6、结论

Webhook 是一种强大的机制,MQTT 消息发布事件触发后,通过 HTTP 推送到 Spring Boot 服务,对接收到的数据进行解析和存储。这种机制能够让我们轻松地将消息从 EMQX 转发到其他服务,从而实现复杂的业务逻辑处理。


7、参考资料

  • EMQX官方文档

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/53351.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

蜜罐网络MHN安装过程中的坑

蜜罐网络MHN安装过程中的坑 1. 相当的资源在github上&#xff0c;下不下来 科学上网&#xff0c;直接sudo proxychains4 ./install.sh是不行的&#xff0c;修改不了sh脚本里面的访问 配置好proxychains4以后&#xff0c;直接修改系统别名&#xff1a; alias wgetproxychai…

Java+Swing+sqlserver学生成绩管理系统

JavaSwingsqlserver学生成绩管理系统 一、系统介绍二、系统展示1.登陆2.课程分配3.选课管理4.学生打分--教师4.查询个人成绩--学生 三、其他1.其它系统 一、系统介绍 管理员:登陆页面、课程管理、选课管理 老师&#xff1a;给学生打分 学生&#xff1a;查询个人成绩 二、系…

node.js、php、Java、python校园点餐与数据分析系统 校园食堂订餐系统(源码、调试、LW、开题、PPT)

&#x1f495;&#x1f495;作者&#xff1a;计算机源码社 &#x1f495;&#x1f495;个人简介&#xff1a;本人 八年开发经验&#xff0c;擅长Java、Python、PHP、.NET、Node.js、Android、微信小程序、爬虫、大数据、机器学习等&#xff0c;大家有这一块的问题可以一起交流&…

日本IT编程语言对比分析-Python /Ruby /C++ /Java

在日本IT行业中&#xff0c;Python、Ruby、C和Java是几种广泛使用的编程语言&#xff0c;它们各自具有独特的优势和适用场景。以下是对这四种编程语言的对比分析&#xff1a; 1. Python 优势&#xff1a; 简洁易读&#xff1a;Python的语法简洁清晰&#xff0c;易于学习和使用…

五,Spring Boot中的 Spring initializr 的使用

五&#xff0c;Spring Boot中的 Spring initializr 的使用 文章目录 五&#xff0c;Spring Boot中的 Spring initializr 的使用1. 方式1&#xff1a;IDEA创建2. 方式2&#xff1a;start.spring.io 创建3. 注意事项和细节4. 最后&#xff1a; 需要&#xff1a;使用 Spring initi…

电器维修系统小程序的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;管理员管理&#xff0c;客服聊天管理&#xff0c;基础数据管理&#xff0c;公告管理&#xff0c;新闻信息管理 微信端账号功能包括&#xff1a;系统首页&#xff0c;新闻信息&#xff0c;我的 开发系…

OpenCV绘图函数(15)图像上绘制矩形函数 rectangle()的使用

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 绘制一个简单的、粗的或填充的直立矩形。 这个函数 cv::rectangle 绘制一个矩形轮廓或一个填充的矩形&#xff0c;其两个相对的顶点分别是 pt1 和…

Arcgis字段计算器:随机生成规定范围内的数字

选择字段计算器在显示的字段计算器对话框内&#xff0c;解析程序选择Python&#xff0c;勾选上显示代码块&#xff0c; 半部分输入&#xff1a; import random; 可修改下半部分输入&#xff1a; random.randrange(3, 28) 表示生成3-28之间的随机数 字段计算器设置点击确定…

Java+Swing可视化图像处理软件

JavaSwing可视化图像处理软件 一、系统介绍二、功能展示1.图片裁剪2.图片缩放3.图片旋转4.图像灰度处理5.图像变形6.图像扭曲7.图像移动 三、系统实现1.ImageProcessing.java 四、其它1.其他系统实现 一、系统介绍 该系统实现了图片裁剪、缩放、旋转、图像灰度处理、变形、扭曲…

[Go]-抢购类业务方案

文章目录 要点&#xff1a;1. 抢购/秒杀业务的关键挑战2. 技术方案3.关键实现点4.性能优化建议5.其他考虑因素 细节拆分&#xff1a;1. **高并发处理**2.**限流与防护**3.**库存控制**4. **异步处理**5. **数据一致性**6. **常用架构设计**7. **代码示例**8. 进一步优化9. 注意…

谈一谈MVCC

一 MVCC的定义 MVCC&#xff08;Multi-Version Concurrency Control&#xff0c;多版本并发控制&#xff09;是一种用于数据库管理系统&#xff08;DBMS&#xff09;中的并发控制方法&#xff0c;它允许数据库读写操作不加锁地并发执行&#xff0c;从而提高了数据库系统的并发性…

苹果手机突然黑屏打不开怎么办?

苹果手机作为市场上备受欢迎的智能手机之一&#xff0c;其稳定性和流畅性一直备受赞誉。然而&#xff0c;偶尔遇到手机突然黑屏无法打开的情况&#xff0c;也会让不少用户感到困扰。今天&#xff0c;我们就来详细探讨一下苹果手机突然黑屏打不开的解决方法&#xff0c;帮助大家…

echarts 水平柱图 科技风

var category [{ name: "管控", value: 2500 }, { name: "集中式", value: 8000 }, { name: "纳管", value: 3000 }, { name: "纳管", value: 3000 }, { name: "纳管", value: 3000 } ]; // 类别 var total 10000; // 数据…

【RabbitMQ之一:windows环境下安装RabbitMQ】

目录 一、下载并安装Erlang1、下载Erlang2、安装Erlang3、配置环境变量4、验证erlang是否安装成功 二、下载并安装RabbitMQ1、下载RabbitMQ2、安装RabbitMQ3、配置环境变量4、验证RabbitMQ是否安装成功5、启动RabbitMQ服务&#xff08;安装后服务默认自启动&#xff09; 三、安…

vue3+ts封装类似于微信消息的组件

组件代码如下&#xff1a; <template><div:class"[voice-message, { sent: isSent, received: !isSent }]":style"{ backgroundColor: backgroundColor }"click"togglePlayback"><!-- isSent为false在左侧&#xff0c;为true在右…

传输层协议UDP

本篇将主要介绍 UDP 协议&#xff0c;介绍了有关 UDP 协议的报头、协议特点、UDP 协议在操作系统中的缓冲区、UDP 协议使用的注意事项&#xff0c;以及有关 UDP 的 Socket 编程程序&#xff0c;同时重点介绍了操作系统对于 UDP 协议报文的管理。 接着介绍了有关端口号的映射。 …

网络编程学习:TCP/IP协议

TCP/IP协议简介 TCP/IP协议包含了一系列的协议&#xff0c;也叫TCP/IP协议族&#xff08;TCP/IP Protocol Suite&#xff0c;或TCP/IP Protocols&#xff09;&#xff0c;简称TCP/IP。 分层结构 为了能够实现不同类型的计算机和不同类型的操作系统之间进行通信&#xff0c;引…

【数据结构-二维前缀和】力扣1504. 统计全 1 子矩形

给你一个 m x n 的二进制矩阵 mat &#xff0c;请你返回有多少个 子矩形 的元素全部都是 1 。 示例 1&#xff1a; 输入&#xff1a;mat [[1,0,1],[1,1,0],[1,1,0]] 输出&#xff1a;13 解释&#xff1a; 有 6 个 1x1 的矩形。 有 2 个 1x2 的矩形。 有 3 个 2x1 的矩形。 有…

ICLR2024: 大视觉语言模型中对象幻觉的分析和缓解

https://arxiv.org/pdf/2310.00754 https://github.com/YiyangZhou/LURE 背景 对象幻觉&#xff1a;生成包含图像中实际不存在的对象的描述 早期的工作试图通过跨不同模式执行细粒度对齐&#xff08;Biten et al.&#xff0c;2022&#xff09;或通过数据增强减少对象共现模…

2024 天池云原生编程挑战赛决赛名单公布,9 月 20 日开启终极答辩

历时 4 个月&#xff0c;2024 天池云原生编程挑战赛决赛名单公布&#xff01; 本届大赛规模创新高&#xff0c;参赛战队达 20000 支&#xff0c; 广覆盖国内外优秀高校和杰出企业&#xff01;吸引了来自北京大学、清华大学等 176 所国内外优秀高校&#xff0c;以及美团、米哈游…