车联网架构设计(二)_消息缓存

在上一篇博客车联网架构设计(一)_消息平台的搭建-CSDN博客中,我介绍了车联网平台需要实现的一些功能,并介绍了如何用EMQX+HAPROXY来搭建一个MQTT消息平台。车联网平台的应用需要消费车辆发布的消息,同时也会下发消息给车辆,以实现车辆控制等功能。通常我们会在MQTT消息平台收到车辆消息后对消息进行缓存,以供上层应用使用。我们可以直接把消息保存到数据库,或者引入一个消息队列,这样可以方便对应用和车辆之间进行解耦合。

这里我将介绍一下如何引入一个Kafka消息队列,把车辆以及上层应用之间需要交互的消息缓存到这个消息队列之中。

在EMQX的企业版中,提供了丰富的数据桥接功能,可以支持把MQTT消息桥接到其他外部系统,例如Kafka或数据库中。但是在开源版,只提供了很有限的数据桥接,不支持Kafka。为此我们可以通过给EMQX开发Hook extension的方式,来加载我们的插件,实现把数据桥接到Kafak。

在EMQX官网的介绍中,Hook扩展是通过gRPC的方式来实现的,支持多种编程语言。如下图:

这里我以Python为例子,来定义一个扩展。

搭建Kafka

首先是在K8S上部署一个Kafka集群,这里我选择了Strimizi的Kafka operator来部署

先创建一个namespace

kubectl create namespace kafka

安装Operator, CRD以及定义RBAC等

kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka

创建一个只包含一个节点的Kafka

kubectl apply -f https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml -n kafka 

打开两个终端,分别运行以下的订阅和发布的指令,测试Kafka是否正常工作

kubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.38.0-kafka-3.6.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic
kubectl -n kafka run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.38.0-kafka-3.6.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic --from-beginning

开发ExHook

首先是获取当前EMQX版本定义的gPRC proto。在EMQX服务器的/opt/emqx/lib/emqx_exhook-5.0.14/priv/protos/目录下面有一个exhook.proto文件。

运行以下命令来基于这个proto生成python文件

python -m grpc_tools.protoc -I./ --python_out=. --pyi_out=. --grpc_python
_out=. ./exhook.proto

运行之后,在当前目录下会新生成三个文件,exhook_pb2_grpc.py,exhook_pb2.py,exhook_pb2.pyi

新建一个exhook_server.py文件,继承exhook_pb2_grpc里面的HookProviderServicer,注册对应事件的处理方法,如以下代码:

from concurrent import futures
import logging
from multiprocessing.sharedctypes import Valueimport grpcimport exhook_pb2
import exhook_pb2_grpcimport pickle
from kafka import KafkaProducerclass HookProvider(exhook_pb2_grpc.HookProviderServicer):def __init__(self):self.producer = KafkaProducer(bootstrap_servers='my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092')def OnProviderLoaded(self, request, context):print("OnProviderLoaded:", request)'''specs = [exhook_pb2.HookSpec(name="client.connect"),exhook_pb2.HookSpec(name="client.connack"),exhook_pb2.HookSpec(name="client.connected"),exhook_pb2.HookSpec(name="client.disconnected"),exhook_pb2.HookSpec(name="client.authenticate"),exhook_pb2.HookSpec(name="client.authorize"),exhook_pb2.HookSpec(name="client.subscribe"),exhook_pb2.HookSpec(name="client.unsubscribe"),exhook_pb2.HookSpec(name="session.created"),exhook_pb2.HookSpec(name="session.subscribed"),exhook_pb2.HookSpec(name="session.unsubscribed"),exhook_pb2.HookSpec(name="session.resumed"),exhook_pb2.HookSpec(name="session.discarded"),exhook_pb2.HookSpec(name="session.takenover"),exhook_pb2.HookSpec(name="session.terminated"),exhook_pb2.HookSpec(name="message.publish"),exhook_pb2.HookSpec(name="message.delivered"),exhook_pb2.HookSpec(name="message.acked"),exhook_pb2.HookSpec(name="message.dropped")]'''specs = [exhook_pb2.HookSpec(name="message.publish")]return exhook_pb2.LoadedResponse(hooks=specs)def OnProviderUnloaded(self, request, context):print("OnProviderUnloaded:", request)return exhook_pb2.EmptySuccess()def OnClientConnect(self, request, context):print("OnClientConnect:", request)return exhook_pb2.EmptySuccess()def OnClientConnack(self, request, context):print("OnClientConnack:", request)return exhook_pb2.EmptySuccess()def OnClientConnected(self, request, context):print("OnClientConnected:", request)return exhook_pb2.EmptySuccess()def OnClientDisconnected(self, request, context):print("OnClientDisconnected:", request)return exhook_pb2.EmptySuccess()def OnClientAuthenticate(self, request, context):print("OnClientAuthenticate:", request)reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", bool_result=True)return replydef OnClientAuthorize(self, request, context):print("OnClientAuthorize:", request)reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", bool_result=True)return replydef OnClientSubscribe(self, request, context):print("OnClientSubscribe:", request)return exhook_pb2.EmptySuccess()def OnClientUnsubscribe(self, request, context):print("OnClientUnsubscribe:", request)return exhook_pb2.EmptySuccess()def OnSessionCreated(self, request, context):print("OnSessionCreated:", request)return exhook_pb2.EmptySuccess()def OnSessionSubscribed(self, request, context):print("OnSessionSubscribed:", request)return exhook_pb2.EmptySuccess()def OnSessionUnsubscribed(self, request, context):print("OnSessionUnsubscribed:", request)return exhook_pb2.EmptySuccess()def OnSessionResumed(self, request, context):print("OnSessionResumed:", request)return exhook_pb2.EmptySuccess()def OnSessionDiscarded(self, request, context):print("OnSessionDiscarded:", request)return exhook_pb2.EmptySuccess()def OnSessionTakenover(self, request, context):print("OnSessionTakenover:", request)return exhook_pb2.EmptySuccess()def OnSessionTerminated(self, request, context):print("OnSessionTerminated:", request)return exhook_pb2.EmptySuccess()def OnMessagePublish(self, request, context):self.producer.send('testtopic', pickle.dumps(nmsg))reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", message=nmsg)return reply## case2: stop publish the 't/d' messages#def OnMessagePublish(self, request, context):#    nmsg = request.message#    if nmsg.topic == 't/d':#        nmsg.payload = b""#        nmsg.headers['allow_publish'] = b"false"##    reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", message=nmsg)#    return replydef OnMessageDelivered(self, request, context):print("OnMessageDelivered:", request)return exhook_pb2.EmptySuccess()def OnMessageDropped(self, request, context):print("OnMessageDropped:", request)return exhook_pb2.EmptySuccess()def OnMessageAcked(self, request, context):print("OnMessageAcked:", request)return exhook_pb2.EmptySuccess()def serve():server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))exhook_pb2_grpc.add_HookProviderServicer_to_server(HookProvider(), server)server.add_insecure_port('[::]:9000')server.start()print("Started gRPC server on [::]:9000")server.wait_for_termination()if __name__ == '__main__':logging.basicConfig()serve()

 解释一下代码,在OnProvidedLoader里面是加载各种事件的钩子,这里只加载message.publish事件。在OnMessagePublish是对应事件的处理函数,这里把收到的MQTT消息通过Pickle进行序列化,发送到Kafka的对应topic

部署ExHook

写一个Dockerfile,把代码打包为一个镜像

FROM python:3.7-slim
WORKDIR /app
COPY requirements.txt ./
RUN pip install -r requirements.txt
COPY . .
CMD ["python", "./exhook_server.py"]

 requirements.txt文件内容为

grpcio==1.59.3
grpcio-tools==1.59.3
kafka-python==2.0.2

运行以下命令来构建镜像

docker build --network=host -t emqx_plugin_test:v1.0 .

创建一个部署这个镜像的deployment和service,然后部署到K8S

apiVersion: apps/v1
kind: Deployment
metadata:name: emqx-hookserver-deploymentlabels:app: hookservernamespace: emqx
spec:replicas: 1selector:matchLabels:app: hookservertemplate:metadata:labels:app: hookserverspec:containers:- name: hookserverimage: emqx_plugin_test:v1.0imagePullPolicy: Neverresources:requests:memory: "250Mi"cpu: "100m"limits:memory: "250Mi"cpu: "100m"ports:- name: rpccontainerPort: 9000
---
apiVersion: v1
kind: Service
metadata:name: hookserver-servicenamespace: emqx
spec:selector:app: hookserverports:- name: rpcport: 9000

回到EMQX的控制面板Dashboard,在ExHook里面添加,url填入http://hookserver-service.emqx.svc.cluster.local:9000,然后选择启用即可,可以看到状态为连接成功,并且显示注册了1个钩子。

在minikube上部署,一开始是显示连接中,等了很久仍然无法连接成功,最后查了资料,原来是coredns的问题,运行以下命令重启即可:

kubectl -n kube-system rollout restart deployment coredns

之后打开订阅Kafka的testtopic,然后通过MQTT连接到EMQX发送消息,可以看到Kafka能成功收到EMQX转发的消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/203032.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringBoot 属性配置解析

属性配置介绍 spring官方提供的17中属性配置的方式 Devtools全局配置测试环境TestPropertySource注解测试环境properties属性命令行参数SPRING_APPLICATION_JSON属性ServletConfig初始化参数ServletContext初始化参数JNDI属性JAVA系统属性操作系统环境变量RandomValueProperty…

基于OpenCV+CNN+IOT+微信小程序智能果实采摘指导系统——深度学习算法应用(含pytho、JS工程源码)+数据集+模型(二)

目录 前言总体设计系统整体结构图系统流程图 运行环境Python环境TensorFlow 环境Jupyter Notebook环境Pycharm 环境微信开发者工具OneNET云平台 相关其它博客工程源代码下载其它资料下载 前言 本项目基于Keras框架,引入CNN进行模型训练,采用Dropout梯度…

uni-app 设置tabBar的setTabBarBadge购物车/消息等角标

目录 一、效果二、代码实现二、全部代码1.index.vue2.cart.vue 三、真实案例参考最后 一、效果 二、代码实现 只要使用uni.setTabBarBadge和uni.removeTabBarBadge来进行对红点的设置和移除。 主要代码: //设置红点 uni.setTabBarBadge({index: 1, // 底部菜单栏…

大数据可视化项目——基于Python豆瓣电影数据可视化分析系统的设计与实现

大数据可视化项目——基于Python豆瓣电影数据可视化分析系统的设计与实现 本项目旨在通过对豆瓣电影数据进行综合分析与可视化展示,构建一个基于Python的大数据可视化系统。通过数据爬取收集、清洗、分析豆瓣电影数据,我们提供了一个全面的电影信息平台…

论文阅读[2023ICME]Edge-FVV: Free Viewpoint Video Streaming by Learning at the Edge

Edge-FVV: Free Viewpoint Video Streaming by Learning at the Edge 会议信息: Published in: 2023 IEEE International Conference on Multimedia and Expo (ICME) 作者: 1 背景 FVV允许观众从多个角度观看视频,但是如果所选视点的视频…

机器学习-逻辑回归

一、引言 逻辑回归(Logistic Regression)是一种广泛应用于分类问题的监督学习算法。尽管名字中含有“回归”二字,但这并不意味着它用于解决回归问题。相反,逻辑回归专注于解决二元或多元分类问题,如邮件是垃圾邮件还是…

vue2+typescript使用高德地图2.0版本

高德地图 webjs api 2.0官网教程 AMap.Driving使用说明 <div class"mmp"><div id"map" ref"mapcontainer"></div></div><script lang"ts"> //安全密钥 window._AMapSecurityConfig{securityJsCode: &qu…

ExoPlayer架构详解与源码分析(10)——H264Reader

系列文章目录 ExoPlayer架构详解与源码分析&#xff08;1&#xff09;——前言 ExoPlayer架构详解与源码分析&#xff08;2&#xff09;——Player ExoPlayer架构详解与源码分析&#xff08;3&#xff09;——Timeline ExoPlayer架构详解与源码分析&#xff08;4&#xff09;—…

数据结构初阶之二叉树性质练习与代码练习

个人主页&#xff1a;点我进入主页 专栏分类&#xff1a;C语言初阶 C语言程序设计————KTV C语言小游戏 C语言进阶 C语言刷题 数据结构初阶 Linux 欢迎大家点赞&#xff0c;评论&#xff0c;收藏。 一起努力,共赴大厂。 目录 1.前言 2.性质练习 3…

Python中的匿名函数是什么

匿名函数 lambda x , y : xy 1.匿名的目的就是要没有名字&#xff0c;给匿名函数赋给一个名字是没有意义的。 2.匿名函数的参数规则、作用域关系与有名函数是一样的。 3.匿名函数的函数体通常应该是 一个表达式,该表达式必须要有一个返回值。 flambda x,n:x ** n print(f…

我把springboot项目从Java 8 升级 到了Java 17 的过程总结,愿为君提前踩坑!

项目从jdk8升级到jdk17&#xff0c;我不是为了追求java 17的新特性&#xff08;准确来说也还没有去了解有什么新特性&#xff09;&#xff0c;也不是为了准确与时俱进&#xff0c;永远走在java行列的最前端&#xff0c;纯粹因为项目需要&#xff0c;因为我们都知道&#xff0c;…

【C++】:set和map

朋友们、伙计们&#xff0c;我们又见面了&#xff0c;本期来给大家解读一下有关多态的知识点&#xff0c;如果看完之后对你有一定的启发&#xff0c;那么请留下你的三连&#xff0c;祝大家心想事成&#xff01; C 语 言 专 栏&#xff1a;C语言&#xff1a;从入门到精通 数据结…

深入理解 Java 虚拟机(JVM)从入门到精通

目录 一、JVM内存结构1、堆&#xff08;Heap&#xff09;&#xff08;1&#xff09;特点&#xff08;2&#xff09;堆内存分配&#xff08;3&#xff09;晋升到老年代的方式&#xff08;4&#xff09;堆内存检验方式2、虚拟机栈&#xff08;VM Stack&#xff09;&#xff08;1&…

OpenHarmony北向-让更广泛的应用开发者更容易参与

一、标准系统的体验 按照官方文档指导&#xff0c;这样操作&#xff0c;OH标准系统开发板就可以运行开发者开发的OpenHarmony应用了。 二、实际情况 按照开发文档上的说明&#xff0c;肯定是装不上的。因为OH不同的发行版&#xff0c;不同发行板不同的设备&#xff0c;IDE&…

uni-app中vue3表单校验失败

目录 1.问题 2.原因及解决方式 3.表单校验方式&#xff08;vue3&#xff09; 1.问题 在app中使用uni-forms表单&#xff0c;并添加校验规则&#xff0c;问题是即使输入内容&#xff0c;表单校验依然失败。 代码&#xff1a; <template><view><uni-forms r…

vue中shift+alt+f格式化防止格式掉其它内容

好处就是使得提交记录干净&#xff0c;否则修改一两行代码&#xff0c;习惯性按了一下格式化快捷键&#xff0c;遍地飘红&#xff0c;下次找修改就费时间 1.点击设置图标-设置 2.点击这个转成配置文件 {"extensions.ignoreRecommendations": true,"[vue]":…

Android Glide自定义AppCompatImageView切分成若干小格子,每个小格子onDraw绘制Bitmap,Kotlin(1)

Android Glide自定义AppCompatImageView切分成若干小格子&#xff0c;每个小格子onDraw绘制Bitmap&#xff0c;Kotlin&#xff08;1&#xff09; 垂直方向的RecyclerView&#xff0c;每行一个AppCompatImageView&#xff0c;每个AppCompatImageView被均匀切割成n个小格子&#…

Games 103 作业三

Games 103 作业三 作业三的内容主要就是实现一下FVM。我们按照文档中的步骤&#xff0c;第一步就是去独立地更新mesh的速度和位置&#xff0c;在初始化每个顶点的受力时&#xff0c;需要考虑到重力的影响。 for(int i0 ;i<number; i) {//TODO: Add gravity to Force.Force[…

wsl安装虚拟机平台报错“无法解析服务器的名称或地址

wsl安装虚拟机平台报错“无法解析服务器的名称或地址” 1.问题 利用wsl安装拟机平台报错“无法解析服务器的名称或地址” 2.解决方案 修改DNS即可 控制面板->网络和Internet&#xff0c;选择查看网络状态和任务 选择更改适配器设置 选择所连接的网络&#xff0c;选择属性…

我不是DBA之慢SQL诊断方式

最近经常遇到技术开发跑来问我慢SQL优化相关工作&#xff0c;所以干脆出几篇SQL相关优化技术月报&#xff0c;我这里就以公司mysql一致的5.7版本来说明下。 在企业中慢SQL问题进场会遇到&#xff0c;尤其像我们这种ERP行业。 成熟的公司企业都会有晚上的慢SQL监控和预警机制。…