车联网架构设计(一)_消息平台的搭建

车联网是物联网的一个主要应用方向,车辆通过连接车联网平台,实时进行消息的交互,平台可以提供车辆远程控制,故障检测,车路协同等各方面的功能。

我在车联网行业从事了很长时间的技术工作,参与了整个车联网平台的构建以及很多不同车联网应用的开发工作,这里打算以构建一个车联网平台作为例子,总结一下涉及到的架构设计方面的东西。

系统功能

一个车联网平台需要实现以下的一些功能:

1. 与车辆的消息交互

在物联网中,主要应用的通信协议是MQTT,这是一个基于发布/订阅模式的物联网通信协议,具备了支持QoS,简单易实现,报文紧凑等特点。在车联网中,大部分的车企也是采用MQTT协议来进行通讯。因此在车联网的架构中,我们需要考虑设置一个MQTT Broker集群来与大量的车辆进行连接通信。目前有很多的开源的Broker,例如ActiveMQ, EMQ, RocketMQ等等,其中EMQ和RocketMQ都是国内的产品,有详尽的中文资料介绍。这里我选择EMQ作为MQTT Broker。

2. 车辆消息的消费与存储

MQTT Broker接收到车辆的消息后,需要把消息给到上层应用来进行处理。我们可以把这些消息保存到数据库或者转发到一个消息队列来缓存。这里我选择Kafka。上层应用通过订阅Kafka主题,来获得其需要的相关车辆信息,进行处理。上层应用也可以把要下发给车辆的消息发送到Kafka的主题,然后让MQTT Broker再转发给车辆,也可以直接通过MQTT主题发布消息的方式来直接发送给车辆。

3. V2X应用

包括了V2V, V2I, V2P等应用场景,车辆需要能和不同的数据源进行消息交互,从而为驾驶提供决策信息。我将基于这个平台展示一些V2X应用的开发设计,实现3GPP规范里面制定的一些V2X场景。

4. 车辆数据分析与报表

车联网平台每天都收集和生成了大量的数据,通过对这些数据进行发掘分析,可以更好的了解业务运行的情况,同时也可以更好的为商业决策提供参考。我们可以基于目前流行的大数据处理平台,例如Spark/Beam/Flink等,对数据进行即时的处理,保存到数据仓库,随后再进行各种数据分析和报表呈现。

在这篇文章中,我先对以上提到的第一点功能进行介绍,搭建一个MQTT消息平台。

MQTT消息平台

我选择EMQX来搭建这个平台,EMQX是国内的一个优秀的MQTT broker软件,有企业版和开源版,这里我选择开源版。在官网上有介绍安装方式,在Kubernetes上是采用Operator的方式来安装的,但是我这里采用kustomization的方式来安装,因为这样方便我进行一些设置上的改动。在我本地用minikube启动了一个kubernetes cluster。

安装EMQX集群

定义一个新的namespace

apiVersion: v1
kind: Namespace
metadata:name: emqx

为这个namespace创建一个service account并赋予相关权限

apiVersion: v1
kind: ServiceAccount
metadata:namespace: emqxname: emqx
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:namespace: emqxname: emqx
rules:
- apiGroups:- ""resources:- endpoints verbs: - get- watch- list
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:namespace: emqxname: emqx
subjects:- kind: ServiceAccountname: emqxnamespace: emqx
roleRef:kind: Rolename: emqxapiGroup: rbac.authorization.k8s.io

定义一个configmap,因为我们要创建一个statefulset的emqx多个节点,要实现auto cluster的功能,自动把这多个节点组成一个cluster,因此需要定义相关的配置:

apiVersion: v1
kind: ConfigMap
metadata:name: emqx-confignamespace: emqx
data:EMQX_NAME: "emqx"EMQX_CLUSTER__DISCOVERY_STRATEGY: "k8s"EMQX_CLUSTER__K8S__SERVICE_NAME: "emqx-headless"EMQX_CLUSTER__K8S__NAMESPACE: "emqx"EMQX_CLUSTER__K8S__ADDRESS_TYPE: "hostname"EMQX_CLUSTER__K8S__APISERVER: "https://kubernetes.default.svc:443"EMQX_CLUSTER__K8S__SUFFIX: "svc.cluster.local"

定义一个headless的service,用于statefulset的服务暴露和通信。

apiVersion: v1
kind: Service
metadata:name: emqx-headlessnamespace: emqx
spec:type: ClusterIPclusterIP: Noneselector:app: emqxports:- name: mqttport: 1883protocol: TCPtargetPort: 1883- name: mqttsslport: 8883protocol: TCPtargetPort: 8883- name: mgmtport: 8081protocol: TCPtargetPort: 8081- name: websocketport: 8083protocol: TCPtargetPort: 8083- name: wssport: 8084protocol: TCPtargetPort: 8084- name: dashboardport: 18083protocol: TCPtargetPort: 18083

最后是定义一个statefulset,里面包含了2个节点。

apiVersion: apps/v1
kind: StatefulSet
metadata:name: emqx-statefulsetlabels:app: emqxnamespace: emqx
spec:serviceName: emqx-headlessupdateStrategy:type: RollingUpdatereplicas: 2selector:matchLabels:app: emqxtemplate:metadata:labels:app: emqxspec:serviceAccountName: emqxcontainers:- name: emqximage: emqx/emqx:5.1.6resources:requests:memory: "1Gi"cpu: "250m"limits:memory: "1Gi"cpu: "250m"ports:- name: mqttcontainerPort: 1883- name: mqttsslcontainerPort: 8883- name: mgmtcontainerPort: 8081- name: wscontainerPort: 8083- name: wsscontainerPort: 8084- name: dashboardcontainerPort: 18083envFrom:- configMapRef:name: emqx-config

定义一个kustomization.yaml文件,把以上定义的manifest包括进来:

apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- namespace.yaml
- rbac.yaml
- configmap.yaml
- headless.yaml
- statefulset.yaml

最后运行kubectl apply -k即可部署,我们可以运行以下命令来查看emqx cluster的状态:

kubectl exec emqx-statefulset-0 -n emqx -- emqx_ctl cluster status

如果成功运行,将显示如下信息:

Cluster status: #{running_nodes =>['emqx@emqx-statefulset-0.emqx-headless.emqx.svc.cluster.local','emqx@emqx-statefulset-1.emqx-headless.emqx.svc.cluster.local'],stopped_nodes => []}

可见当前的EMQX cluster包括了两个节点并已成功运行。

配置HAProxy

下一步我将配置一个HAProxy来作为Load balancer,连接EMQX集群。这种方式可以提供如下好处:

  • HAProxy作为一个反向代理可以隐藏emqx节点的信息,并为外部提供一个统一的地址来连接
  • 可以用作MQTT over TLS的终结,减轻emqx节点处理SSL加密的计算负荷,并且简化证书部署和管理的工作
  • 提供内在的MQTT支持,支持解析MQTT消息以实现粘性附着和智能负荷分配等功能
  • 通过主备方式提供高可靠性

同样我也是以kustomization的方式来部署HAProxy

定义一个namespace

apiVersion: v1
kind: Namespace
metadata:name: haproxy

定义一个configmap,因为haproxy启动需要读取haproxy.cfg配置文件的信息,把这个文件通过configmap的方式来加载

apiVersion: v1
kind: ConfigMap
metadata:name: haproxy-confignamespace: haproxy
data:haproxy.cfg: |global  log 127.0.0.1 local3 info daemon  maxconn 10240defaults  log global mode tcp option tcplog #option dontlognull  timeout connect 10000 # timeout > mqtt's keepalive * 1.2  timeout client 240s  timeout server 240s maxconn 20000backend mqtt_backendmode tcp# 粘性会话负载均衡stick-table type string len 32 size 1000k expire 30mstick on req.payload(0,0),mqtt_field_value(connect,client_identifier)server emqx0 emqx-statefulset-0.emqx-headless.emqx.svc.cluster.local:1883server emqx1 emqx-statefulset-1.emqx-headless.emqx.svc.cluster.local:1883frontend mqtt_serversbind *:1883mode tcp# 拒绝非 MQTT 连接# tcp-request content reject unless { req.payload(0,0),mqtt_is_valid }default_backend mqtt_backend

定义一个deployment

apiVersion: apps/v1
kind: Deployment
metadata:labels:app: haproxyname: haproxynamespace: haproxy
spec:replicas: 1selector:matchLabels:app: haproxytemplate:metadata:labels:app: haproxyspec:containers:- name: haproxyimage: haproxy:2.8ports:- name: httpcontainerPort: 80- name: httpscontainerPort: 443- name: haproxy-mgmtcontainerPort: 1024- name: mqttcontainerPort: 1883- name: mqttsslcontainerPort: 8883- name: mgmtcontainerPort: 8081- name: wscontainerPort: 8083- name: wsscontainerPort: 8084- name: dashboardcontainerPort: 18083volumeMounts:- name: haproxy-configmountPath: /usr/local/etc/haproxy/haproxy.cfgsubPath: haproxy.cfgvolumes:- name: haproxy-configconfigMap:name: haproxy-configitems:- key: haproxy.cfgpath: haproxy.cfg

定义一个service,暴露harpoxy的端口

apiVersion: v1
kind: Service
metadata:name: haproxy-servicenamespace: haproxy
spec:selector:app: haproxyports:- name: mqttport: 1883protocol: TCPtargetPort: mqtt- name: mqttsport: 8883protocol: TCPtargetPort: 8883- name: wsport: 8083protocol: TCPtargetPort: 8083- name: wssport: 8084protocol: TCPtargetPort: 8084- name: dashboardport: 18083protocol: TCPtargetPort: 18083

最后定义一个kustomization.yaml

apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- namespace.yaml
- configmap.yaml
- haproxy_deployment.yaml
- service.yaml

运行kubectl apply -k来部署

配置Ingress

在我的minikube k8s集群上暴露HAProxy的端口,使得外部可以访问MQTT。因为我想仍然暴露1883端口给外部访问,所以需要在minikube启动的时候设置

minikube start --extra-config=apiserver.service-node-port-range=1-65535

然后安装HAProxy ingress,通过helm的方式安装

helm repo add haproxytech https://haproxytech.github.io/helm-charts
helm repo update
helm install haproxy-kubernetes-ingress haproxytech/kubernetes-ingress \--create-namespace \--namespace haproxy-controller

安装完成之后,我们需要创建一个configmap,配置要暴露的TCP端口,通过kubectl apply -f来部署。

apiVersion: v1
kind: ConfigMap
metadata:name: tcpnamespace: haproxy
data:1883:haproxy/haproxy-service:18838883:haproxy/haproxy-service:8883   8083:haproxy/haproxy-service:80838084:haproxy/haproxy-service:808418083:haproxy/haproxy-service:18083

读取HAProxy ingress的配置信息,保存在values.yaml文件

helm show values haproxytech/kubernetes-ingress > values.yaml

然后在values.yaml里面找到以下对应位置,进行修改:

    tcpPorts:- name: mqttport: 1883targetPort: 1883nodePort: 1883- name: mqttsport: 8883targetPort: 8883nodePort: 8883- name: wsport: 8083targetPort: 8083nodePort: 8083- name: wssport: 8084targetPort: 8084nodePort: 8084- name: dashboardport: 18083targetPort: 18083nodePort: 18083
# add extra args in controller sectionextraArgs:- --configmap-tcp-services=haproxy/tcp

运行以下命令更新haproxy-ingress的配置

helm upgrade -f values.yaml haproxy-kubernetes-ingress -n haproxy-controller haproxytech/kubernetes-ingress

现在我们就可通过一个MQTT客户端来通过HAPROXY来连接EMQX了,服务器地址是minikubeip:1883,通过访问minikubeip:18083可以访问EMQX的dashboard

配置证书

在实际应用中,车辆和平台之间是通过TLS加密来通信的,有单向认证和双向认证两种方式。单向认证指客户端需要验证服务器端是否持有受信任的证书,双向认证则指双方都需要验证。这里以双向认证为例进行配置。

1. 创建根CA证书

以自签证书的方式来做,首先是创建一个根CA证书,如以下命令。

openssl req -newkey rsa:2048 -nodes -x509 -days 3650 -keyout root-ca.key -out root-ca.crt

用以下命令可以查看创建的证书内容

openssl x509 -noout -text -in root-ca.crt

2. 为客户端签发证书

 

有了中间CA证书后,我们就可以为客户端创建证书了,例如为一辆ID为vehicle-1的车辆签发证书。

openssl req -newkey rsa:2048 -nodes -days 365 -subj "/CN=vehicle-1/O=vehicle" -keyout client.key -out client.csr

创建一个扩展文件client-cert-extensions.cnf,声明其是client certificate type

basicConstraints = CA:FALSE
keyUsage = digitalSignature
extendedKeyUsage = clientAuth
subjectKeyIdentifier = hash
authorityKeyIdentifier = keyid,issuer

然后用中间CA证书签发

openssl x509 -req -in client.csr -out client.crt -CA intermediate-ca.crt -CAkey intermediate-ca.key    -CAcreateserial -days 365 -extfile client-cert-extensions.cnf

3. 为服务器端签发证书

同理,为服务器端也签发证书,这里稍微不同的是需要在证书里面指定服务器的IP或DNS地址。

首先创建服务器的private key

openssl genrsa -out emqx.key 2048

创建一个扩展文件openssl.cnf,其中distinguished_name可以按照自己需要设置,在IP.1和DNS.1里面设置服务器的地址。例如我在IP.1上面设置了Minikube的IP

[req]
default_bits  = 2048
distinguished_name = req_distinguished_name
req_extensions = req_ext
x509_extensions = v3_req
prompt = no
[req_distinguished_name]
countryName = CN
stateOrProvinceName = Zhejiang
localityName = Hangzhou
organizationName = EMQX
commonName = Server certificate
[req_ext]
subjectAltName = @alt_names
[v3_req]
subjectAltName = @alt_names
[alt_names]
IP.1 = BROKER_ADDRESS
DNS.1 = BROKER_ADDRESS

生成一个请求签发证书的csr

openssl req -new -key ./emqx.key -config openssl.cnf -out emqx.csr

用根CA证书签发

openssl x509 -req -in ./emqx.csr -CA root-ca.crt -CAkey root-ca.key -CAcreateserial -out emqx.crt -days 3650 -sha256 -extensions v3_req -extfile openssl.cnf

然后把emqx.crt和emqx.key合成一个文件

cat emqx.crt emqx.key > server.pem

4. 创建一个secret

把刚才创建的emqx.pem,root-ca.crt文件的内容保存到secret中,定义一个yaml

apiVersion: v1
kind: Secret
metadata:name: haproxy-secretnamespace: haproxy
type: Opaque
stringData:server.pem: |-----BEGIN CERTIFICATE-----XXXX-----END CERTIFICATE----------BEGIN RSA PRIVATE KEY-----XXXX-----END RSA PRIVATE KEY-----cacert.pem: |-----BEGIN CERTIFICATE-----XXXX-----END CERTIFICATE-----

5. 修改haproxy deployment

修改之前定义的haproxy deployment manifest,在Volumes增加一个secret类型的volume

      - name: haproxy-secretsecret:secretName: haproxy-secretitems:- key: server.pempath: server.pem- key: cacert.pempath: cacert.pem

在volumeMounts里面增加挂载

        - name: haproxy-secretmountPath: /etc/haproxy/certs/cacert.pemsubPath: cacert.pem- name: haproxy-secretmountPath: /etc/haproxy/certs/server.pemsubPath: server.pem

6. 修改HAproxy.cfg配置

在刚才定义的configmap中,修改HAproxy.cfg的配置,增加一个mqtt_tls_frontend的配置,表示要启用双向认证

    frontend mqtt_tls_frontend# bind *:8883 ssl crt /etc/haproxy/certs/server.pem # 双向认证bind *:8883 ssl crt /etc/haproxy/certs/server.pem ca-file /etc/haproxy/certs/cacert.pem verify requiredmode tcpdefault_backend mqtt_backend

最后重新部署HAproxy即可生效

连接检验

我们可以用mqttx来测试一下连接到MQTT消息平台

运行以下命令来订阅hello主题的消息,可见能成功连接。

mqttx sub -t "hello" -h "minikubeip" -p 8883 -l "mqtts" --key "client.key" --cert "client.crt" --ca "root-ca.crt"

可见我们已经成功搭建了一个EMQX的消息平台,可以实现与车辆之间的MQTT消息通信。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/193974.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

树莓派多串口通信

树莓派多串口通信 串口配置串口通信函数分析串口通信示例代码 参考博文1:树莓派 4 UART 多串口配置通信参考博文2:树莓派wiringPi库详解关于树莓派相关其他环境配置可参考:快速上手树莓派关于wiringPi库初始化与IO口开发可参考:树…

深入探索FastAPI单元测试:使用TestClient轻松测试你的API

原文:深入探索FastAPI单元测试:使用TestClient轻松测试你的API-51CTO.COM 当使用FastAPI进行单元测试时,一个重要的工具是TestClient类。TestClient类允许我们模拟对FastAPI应用程序的HTTP请求,并测试应用程序的响应。这使我们能…

调优--学习笔记

1,Presto调优 数据存储格式 1)合理设置分区 与Hive类似,Presto会根据元信息读取分区数据,合理的分区能减少Presto数据读取量,提升查询性能。 2)使用列式存储 Presto对ORC文件读取做了特定优化&#xff0c…

Qt OpenCV 学习(一):环境搭建

对应版本 Qt 5.15.2OpenCV 3.4.9MinGW 8.1.0 32-bit 1. OpenCV 下载 确保安装 Qt 时勾选了 MinGW 编译器 本文使用 MinGW 编译好的 OpenCV 库,无需自行编译 确保下载的 MinGW 和上述安装 Qt 时勾选的 MinGW 编译器位数一致,此处均为 x86/32-bit下载地址…

《微信小程序开发从入门到实战》学习四十

4.2 云开发JSON数据库 4.2.11 更新数据 使用数据库API更新数据有两种方法:一.将记录局部更新的update方法;二.以替换的方式更新记录的set方法 update方法可以局部更新一个记录或一个集合的多个记录,更新时只有指定字段更新,其他…

电脑连不上wifi,适配器Intel(R)WiFi6 AX201 160MHz遇到与驱动程序或硬件相关问题,连不上wifi,电脑WiFi图标没了

电脑WiFi图标没了,电脑连不上wifi 适配器IntelWiFi6 AX201 160MHz遇到与驱动程序或硬件相关问题应该怎么解决? 方法一:电脑冷重启即可 就是长按那个开机键,然后滑动关机,,,重启(我…

智能诊疗体验:整合AI技术的互联网医院小程序开发

在科技化的趋势下,互联网医院小程序的开发变得愈发重要,尤其是通过整合人工智能(AI)技术,进一步提升了就医的效率。 一、引言 互联网医院小程序其开发目标是提高医疗服务的效率,同时也也提升了用户的就医…

Clickhouse在货品标签场景的应用

背景 在电商场景中,我们经常需要对货品进行打标签的操作,简单来说就是对货品进行各种分类,按照价格段进行分组,此时运营人员就可以通过价格段捞取到满足条件的商品了,本文就来简单看下这个场景如何在clickhouse中实现…

23种设计模式之C++实践(一)

23种设计模式之C++实践 1. 简介2. 基础知识3. 设计模式(一)创建型模式1. 单例模式——确保对象的唯一性1.2 饿汉式单例模式1.3 懒汉式单例模式比较IoDH单例模式总结2. 简单工厂模式——集中式工厂的实现简单工厂模式总结3. 工厂方法模式——多态工厂的实现工厂方法模式总结4.…

【像素画板】游戏地图编辑器-uniapp项目开发流程详解

嘿,用过像素画板没有哦,相信喜欢绘画的小朋友会对它感兴趣呢,用来绘制像素画非常好看,有没有发现,它是可以用来绘制游戏地图的,是不是很好奇,来一起看看吧。 像素画板,也叫像素画的绘…

c语言-归并排序

目录 1、归并排序基本思想 2、归并排序的实现(递归法) 2.1 代码实现递归法归并排序 3、归并排序的实现(非递归法) 3.1 修正边界问题 3.2 代码实现非递归法归并排序 结语: 前言: 归并排序是一种把数…

Python---格式化输出与%百分号----涉及转义符 \ 反斜杠的使用

相关链接Python--格式化输出中的转义符号----\t 制表符(空格的)和\n(换行的)_唯元素的博客-CSDN博客 Python---字符串(用单、双引号、 三单/双引号定义。反斜杠 \ 转义,单在双内/双在单内 )-CS…

力扣 --- 最后一个单词的长度

题目描述: 给你一个字符串 s,由若干单词组成,单词前后用一些空格字符隔开。返回字符串中 最后一个 单词的长度。 单词 是指仅由字母组成、不包含任何空格字符的最大子字符串。 示例 1: 输入:s "Hello World&…

运维02:Linux

Linux安装 VMWare安装:夸克网盘分享(提取码:refg) CentOS安装:Index of /centos/7.9.2009/isos/x86_64/ Xshell安装:百度网盘 请输入提取码(提取码:juau) 环境准备 1、…

在Windows 11中,把iPhone照片和视频导出来又快又简单,无需第三方软件

如果你想将照片和视频从iPhone传输到Windows 11 PC,最快、最简单的方法是插入手机并执行自动导入。以下是操作方法。 如何将照片和视频从iPhone导入Windows 如果你用USB数据线将iPhone插入Windows PC,Windows 11可以像标准数码相机一样连接到它&#x…

react之封装有无Token(路由权限控制)的高阶组件

TOC 前景 有些路由页面内的内容信息比较敏感,如果用户没有经过登录获取到有效Token,是没有权限跳转的,根据Token的有 无控制当前路由是否可以跳转就是路由的权限控制 技术方案 实现步骤 1.在 components 目录中,创建 AuthRoute/in…

solidity实现ERC721代币标准发布NFT

文章目录 1、非同质化货币(NFT)- 维基百科2、IERC1653、IERC7214、IERC721Receiver5、IERC721Metadata6、ERC7217、ERC721 NFT 的实现8、编译部署 1、非同质化货币(NFT)- 维基百科 非同质化代币(英语:Non-F…

Elasticsearch:什么是大语言模型(LLM)?

大语言模型定义 大语言模型 (LLM) 是一种深度学习算法,可以执行各种自然语言处理 (natural language processing - NLP) 任务。 大型语言模型使用 Transformer 模型,并使用大量数据集进行训练 —— 因此规模很大。 这使他们能够识别、翻译、预测或生成文…

时间复杂度为O (nlogn)的排序算法

归并排序 归并排序遵循分治的思想:将原问题分解为几个规模较小但类似于原问题的子问题,递归地求解这些子问题,然后合并这些子问题的解来建立原问题的解,归并排序的步骤如下: 划分:分解待排序的 n 个元素的…

【c】求一组数据的最大值和第二大的值

我们可以创建数组&#xff0c;利用冒泡排序法把数组进行排序&#xff0c;但是当元素过多时候循环可能过多导致循环超限 所以我们可以换种其他方法&#xff0c;代码附上 #include<stdio.h> int main() {int n,i;puts("输入这组数据的个数");scanf("%d&qu…