前言
1使用mqtt协议的简单示例记录
代码
MQTT服务端(C# 编写,使用MQTTnet提供的示例代码)
主程序
namespace ConsoleApp1;public class Program
{public static async Task Main(string[] args){await Run_Server_With_Logging();}}public static async Task Run_Server_With_Logging(){/** This sample starts a simple MQTT server and prints the logs to the output.** IMPORTANT! Do not enable logging in live environment. It will decrease performance.** See sample "Run_Minimal_Server" for more details.*/var mqttFactory = new MqttFactory(new ConsoleLogger());var mqttServerOptions = new MqttServerOptionsBuilder().WithDefaultEndpoint().Build();using (var mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions)){await mqttServer.StartAsync();Console.WriteLine("Press Enter to exit.");Console.ReadLine();// Stop and dispose the MQTT server if it is no longer needed!await mqttServer.StopAsync();}}
NexDuo组件的编写
/** Copyright (c) 2024-2024,shchl** SPDX-License-Identifier: Apache-2.0** Change Logs:* Date Author Notes* 2024-4-15 shchl first version*/
#include "includes.h"#if APP_COMPONENT_NX_ENABLE
#include "nx_api.h"
#include "nx_stm32_eth_driver.h"#define APP_NX_PACKET_SIZE 1536 /*数据包大小*/
#define APP_NX_PACKET_POOL_SIZE ((sizeof(NX_PACKET) + APP_NX_PACKET_SIZE) * 16)#define APP_NX_IP_STACK_SIZE 2048
#define APP_NX_IP_PRIORITY 10#define APP_NX_ARP_CACHE_SIZE 1024#define APP_NX_IP_ADDR IP_ADDRESS(192,168,8,9)
#define APP_NX_IP_SUB_MASK IP_ADDRESS(255,255,255,0)/*
*******************************************************************************************************
* 外部引入变量
*******************************************************************************************************
*//*
*******************************************************************************************************
* 变量
*******************************************************************************************************
*/void *loc_ip_area;
void *packet_pool_area;
void *arp_cache_area;NX_PACKET_POOL g_packet_pool;/*全局packet pool 池*/
NX_IP g_loc_ip; /*全局 本地ip实例*/
/*
*********************************************************************************************************
* 静态全局变量
*********************************************************************************************************
*//*
*********************************************************************************************************
* 函数声明
*********************************************************************************************************
*//*
*********************************************************************************************************
* 外部函数
*********************************************************************************************************
*/
int nx_application_define(void) {UINT status;/*内存分配*/packet_pool_area = app_malloc(APP_NX_PACKET_POOL_SIZE);loc_ip_area = app_malloc(APP_NX_IP_STACK_SIZE);arp_cache_area = app_malloc(APP_NX_ARP_CACHE_SIZE);nx_system_initialize();status = app_nx_ip_instance_create();if (status) {tx_printf("app_nx_ip_instance_create error:%d\r\n", status);}return NX_SUCCESS;
}TX_APP_DEFINE_EXPORT(nx_application_define); /*首先创建模块应用*/UINT app_nx_ip_instance_create() {UINT stat = 0;/*创建packet pool*/stat += nx_packet_pool_create(&g_packet_pool, "NX PACKET POOL",APP_NX_PACKET_SIZE,packet_pool_area, APP_NX_PACKET_POOL_SIZE);stat += nx_ip_create(&g_loc_ip,"NX IP",APP_NX_IP_ADDR,APP_NX_IP_SUB_MASK,&g_packet_pool,nx_stm32_eth_driver,loc_ip_area,APP_NX_IP_STACK_SIZE,APP_NX_IP_PRIORITY);/*ip 相关配置*/{/* 启用 ARP 并为 IP 实例 提供 ARP 缓存内存。 */stat += nx_arp_enable(&g_loc_ip, arp_cache_area, APP_NX_ARP_CACHE_SIZE);/* 启用 ICMP */stat += nxd_icmp_enable(&g_loc_ip);/* 为IP 实例启用 TCP 处理. */stat += nx_tcp_enable(&g_loc_ip);/*禁止分包 */stat += nx_ip_fragment_disable(&g_loc_ip);}return stat;
}
/*
*********************************************************************************************************
* 内部函数
*********************************************************************************************************
*/#endif
MQTT 客户端任务线程
/** Copyright (c) 2024-2024,shchl** SPDX-License-Identifier: Apache-2.0** Change Logs:* Date Author Notes* 2024-4-19 shchl first version** @description mqtt 客户端任务*/
#include "includes.h"
#include "nxd_mqtt_client.h"#define QOS0 0
#define QOS1 1
#if 1#define APP_TASK_NET_MQTT_CLIENT_STACK_SIZE (4096)
#define APP_TASK_NET_MQTT_CLIENT_PRIORITY (10)#define APP_MQTT_CLIENT_STACK_SIZE 4096
#define APP_MQTT_CLIENT_PRIORITY 2/* MQTT 相关宏定义*/
#define MQTT_CLIENT_ID "client_id_stm32"
#define MQTT_SERVER_IP_ADDR IP_ADDRESS(192, 168, 8, 2)#define MQTT_KEEP_ALIVE_TIMER 300
#define STRLEN(p) (sizeof(p) - 1)
#define MQTT_SUB_TOPIC "/sub/topic1"
#define MQTT_SUB_TOPIC2 "/sub/topic2"
#define MQTT_PUB_TOPIC "/pub/topic"//-------------------------------事件标志位
#define MQTT_APP_CONNECT_ERROR_EVENT ((1) << 0) // 客户端连接失败事件
#define MQTT_APP_SUB_TOPIC_RECEIVED_EVENT ((1) << 1) // 客户端订阅主题接收到数据事件
#define MQTT_APP_CLIENT_DISCONNECT_EVENT ((1) << 2) // 客户端断开连接#define MATT_APP_ALL_EVENT (MQTT_APP_CONNECT_ERROR_EVENT | MQTT_APP_SUB_TOPIC_RECEIVED_EVENT \|MQTT_APP_CLIENT_DISCONNECT_EVENT)/*
*******************************************************************************************************
* 外部引入变量
*******************************************************************************************************
*//*
*******************************************************************************************************
* 变量
*******************************************************************************************************
*/TX_EVENT_FLAGS_GROUP mqtt_app_event_group; /*mqtt 事件组*/
NXD_ADDRESS mqtt_server_ip; /*mqtt 服务地址*//*
*********************************************************************************************************
* 静态全局变量
*********************************************************************************************************
*/
static NXD_MQTT_CLIENT mqtt_client; /*声明 mqtt 客户端*/
static TX_THREAD net_mqtt_client_thread;
static void *area;static void *mqtt_client_area;
static UCHAR message_buffer[NXD_MQTT_MAX_MESSAGE_LENGTH];
static UCHAR topic_buffer[NXD_MQTT_MAX_TOPIC_NAME_LENGTH];
UINT topic_actual_len, topic_actual_msg_len;/*
*********************************************************************************************************
* 函数声明
*********************************************************************************************************
*/
static void net_mqtt_client_thread_entry(ULONG par);static VOID mqtt_disconnect_notify(NXD_MQTT_CLIENT *client);static VOID mqtt_receive_notify(NXD_MQTT_CLIENT *client_ptr, UINT message_count);/*
*********************************************************************************************************
* 外部函数
*********************************************************************************************************
*/
int app_task_net_mqtt_client_create() {area = app_malloc(APP_TASK_NET_MQTT_CLIENT_STACK_SIZE);tx_thread_create(&net_mqtt_client_thread, /* 任务控制块地址 */"app net server", /* 任务名 */net_mqtt_client_thread_entry, /* 启动任务函数地址 */0, /* 传递给任务的参数 */area, /* 堆栈基地址 */APP_TASK_NET_MQTT_CLIENT_STACK_SIZE, /* 堆栈空间大小 */APP_TASK_NET_MQTT_CLIENT_PRIORITY, /* 任务优先级*/APP_TASK_NET_MQTT_CLIENT_PRIORITY, /* 任务抢占阀值 */TX_NO_TIME_SLICE, /* 不开启时间片 */TX_AUTO_START); /* 创建后立即启动 *//*内存分配*/mqtt_client_area = app_malloc(APP_MQTT_CLIENT_STACK_SIZE);/*创建mqtt 事件*/tx_event_flags_create(&mqtt_app_event_group, "mqtt event");/*服务端连接信息配置*/mqtt_server_ip.nxd_ip_version = 4;mqtt_server_ip.nxd_ip_address.v4 = MQTT_SERVER_IP_ADDR;return TX_SUCCESS;
}TX_THREAD_EXPORT(app_task_net_mqtt_client_create);/*
*********************************************************************************************************
* 内部函数
*********************************************************************************************************
*/
static inline UINT mqtt_client_sub_topic(CHAR *topic, UINT qos) {return nxd_mqtt_client_subscribe(&mqtt_client, topic, strlen(topic), qos);
}
static inline UINT mqtt_client_unsub_topic(CHAR *topic) {return nxd_mqtt_client_unsubscribe(&mqtt_client,topic,strlen(topic));
}static void net_mqtt_client_thread_entry(ULONG par) {UINT status = 0;ULONG actual_status = 0;ULONG actual_event_flag;do {/* 等待 1 秒钟,让 内部 IP 线程完成其初始化。. */status = nx_ip_status_check(&g_loc_ip,NX_IP_INITIALIZE_DONE,&actual_status,NX_IP_PERIODIC_RATE);} while (status != NX_SUCCESS);/* 创建客户端实例. */status = nxd_mqtt_client_create(&mqtt_client,"mqtt client",MQTT_CLIENT_ID, strlen(MQTT_CLIENT_ID),&g_loc_ip, &g_packet_pool,mqtt_client_area,APP_MQTT_CLIENT_STACK_SIZE,APP_MQTT_CLIENT_PRIORITY,NX_NULL, 0);/* 设置通知回调函数 */// 客户端断开连接通知nxd_mqtt_client_disconnect_notify_set(&mqtt_client, mqtt_disconnect_notify);// 设置接收到数据通知回调nxd_mqtt_client_receive_notify_set(&mqtt_client, mqtt_receive_notify);while (1) {/* 开始与服务器的连接*/status = nxd_mqtt_client_connect(&mqtt_client,&mqtt_server_ip,NXD_MQTT_PORT,MQTT_KEEP_ALIVE_TIMER,0,NX_WAIT_FOREVER);if (status == NXD_MQTT_CONNECT_FAILURE) {logError("mqtt unable connect mqtt server[%d,%d,%d,%d:%d]", NX_IP_FMT(MQTT_SERVER_IP_ADDR), NXD_MQTT_PORT);tx_thread_sleep(1000);continue;}/* 订阅主题 */status = mqtt_client_sub_topic(MQTT_SUB_TOPIC, QOS0);status = mqtt_client_sub_topic(MQTT_SUB_TOPIC2, QOS0);switch (status) {case 0:break;case NXD_MQTT_NOT_CONNECTED:default:logError("nxd_mqtt_client_subscribe error:%#x", status);tx_thread_sleep(200);continue;}while (1) {status = tx_event_flags_get(&mqtt_app_event_group, MATT_APP_ALL_EVENT, TX_OR_CLEAR, &actual_event_flag,100);if (status == TX_SUCCESS) {/*接收来自topic 的数据*/if (MQTT_APP_SUB_TOPIC_RECEIVED_EVENT == actual_event_flag) {status = nxd_mqtt_client_message_get(&mqtt_client,topic_buffer, sizeof(topic_buffer), &topic_actual_len,message_buffer, sizeof(message_buffer), &topic_actual_msg_len);if (status == NXD_MQTT_SUCCESS) {/*添加结束符*/topic_buffer[topic_actual_len] = 0;message_buffer[topic_actual_msg_len] = 0;logInfo("topic = %s, message = %s", topic_buffer, message_buffer);}} else if (actual_event_flag == MQTT_APP_CLIENT_DISCONNECT_EVENT) {logInfo("client disconnect.....");break;}} else if (status == TX_NO_EVENTS) {/*没有事件通知*/// todo} else {break;}/*todo 推送状态数据*//*向发布主题推送数据*/// const char *send_data = "this is publish msg";// status += nxd_mqtt_client_publish(&mqtt_client,// MQTT_PUB_TOPIC,// STRLEN(MQTT_PUB_TOPIC),// (CHAR *) send_data,// strlen(send_data),// 0,// QOS1,// NX_WAIT_FOREVER); /*等待超时*/}mqtt_client_unsub_topic(MQTT_SUB_TOPIC); /* Now unsubscribe the topic. */mqtt_client_unsub_topic(MQTT_SUB_TOPIC2); /* Now unsubscribe the topic. */nxd_mqtt_client_disconnect(&mqtt_client); /* Disconnect from the broker. */}/*删除客户端,释放资源(需要重新创建) */nxd_mqtt_client_delete(&mqtt_client);}static VOID mqtt_disconnect_notify(NXD_MQTT_CLIENT *client) {/**/logInfo("mqtt_disconnect_notify :%s", client->nxd_mqtt_client_id);tx_event_flags_set(&mqtt_app_event_group, MQTT_APP_CLIENT_DISCONNECT_EVENT, TX_OR);
}static VOID mqtt_receive_notify(NXD_MQTT_CLIENT *client_ptr, UINT message_count) {NX_PARAMETER_NOT_USED(client_ptr);
// NX_PARAMETER_NOT_USED(message_count); /*接收的消息数*/logInfo("rec msg num:%d", message_count);tx_event_flags_set(&mqtt_app_event_group, MQTT_APP_SUB_TOPIC_RECEIVED_EVENT, TX_OR);
}#endif