以下是一个简单的MQTT连接库文件,其中包含了连接、断开、订阅主题、发送数据和接收数据等函数。请注意,这只是一个示例,你可能需要根据自己的实际需求进行修改。
#include <iostream>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>const std::string SERVER_ADDRESS = "mqtt.server.com";
const int SERVER_PORT = 1883;// MQTT固定报头结构
struct MqttFixedHeader {uint8_t controlPacketType;uint8_t remainingLength;
};// MQTT连接报文结构
struct MqttConnectPacket {MqttFixedHeader fixedHeader;uint8_t variableHeader[10];uint8_t payload[20];
};// MQTT订阅报文结构
struct MqttSubscribePacket {MqttFixedHeader fixedHeader;uint16_t packetIdentifier;uint8_t topic[20];uint8_t qos;
};// MQTT发布报文结构
struct MqttPublishPacket {MqttFixedHeader fixedHeader;uint16_t topicLength;uint8_t topic[20];uint8_t payload[100];
};class MqttClient {
public:MqttClient() : sockfd(-1), connected(false) {}~MqttClient() {if (connected) {disconnect();}}bool connect(const std::string& clientId) {sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd < 0) {std::cerr << "Failed to create socket" << std::endl;return false;}struct sockaddr_in serv_addr;memset(&serv_addr, 0, sizeof(serv_addr));serv_addr.sin_family = AF_INET;serv_addr.sin_port = htons(SERVER_PORT);if (inet_pton(AF_INET, SERVER_ADDRESS.c_str(), &(serv_addr.sin_addr)) <= 0) {std::cerr << "Failed to set server address" << std::endl;return false;}if (connect(sockfd, reinterpret_cast<struct sockaddr*>(&serv_addr), sizeof(serv_addr)) < 0) {std::cerr << "Connection failed" << std::endl;return false;}// 构建MQTT连接报文MqttConnectPacket connectPacket;connectPacket.fixedHeader.controlPacketType = 0x10; // 连接请求connectPacket.fixedHeader.remainingLength = 0x0C; // 可变报头长度为12字节memcpy(connectPacket.variableHeader, "MQTT", 4);connectPacket.variableHeader[4] = 0x04; // MQTT协议版本号(4)connectPacket.variableHeader[5] = 0x02; // 连接标志connectPacket.variableHeader[6] = 0x00; // 保持连接时间的最高8位connectPacket.variableHeader[7] = 0x3C; // 保持连接时间的最低8位connectPacket.variableHeader[8] = 0x00; // 清理会话位为0connectPacket.variableHeader[9] = 0x00; // 预留位为0memcpy(connectPacket.payload, clientId.c_str(), clientId.length());// 发送MQTT连接报文if (send(sockfd, &connectPacket, sizeof(connectPacket), 0) < 0) {std::cerr << "Failed to send connect packet" << std::endl;return false;}// 接收MQTT服务器的响应uint8_t response[1024];ssize_t bytesRead = recv(sockfd, response, sizeof(response), 0);if (bytesRead <= 0) {std::cerr << "Failed to receive response" << std::endl;return false;}// 处理MQTT服务器的响应connected = true;return true;}void disconnect() {if (connected) {close(sockfd);sockfd = -1;connected = false;}}bool subscribe(const std::string& topic, uint8_t qos) {if (!connected) {std::cerr << "Not connected" << std::endl;return false;}// 构建MQTT订阅报文MqttSubscribePacket subscribePacket;subscribePacket.fixedHeader.controlPacketType = 0x82; // 订阅请求subscribePacket.fixedHeader.remainingLength = 0x0E; // 可变报头长度为14字节subscribePacket.packetIdentifier = 0x1234; // 包标识符memcpy(subscribePacket.topic, topic.c_str(), topic.length());subscribePacket.qos = qos; // QoS等级// 发送MQTT订阅报文if (send(sockfd, &subscribePacket, sizeof(subscribePacket), 0) < 0) {std::cerr << "Failed to send subscribe packet" << std::endl;return false;}// 接收MQTT服务器的响应uint8_t response[1024];ssize_t bytesRead = recv(sockfd, response, sizeof(response), 0);if (bytesRead <= 0) {std::cerr << "Failed to receive response" << std::endl;return false;}// 处理MQTT服务器的响应return true;}bool publish(const std::string& topic, const std::string& message, uint8_t qos) {if (!connected) {std::cerr << "Not connected" << std::endl;return false;}// 构建MQTT发布报文MqttPublishPacket publishPacket;publishPacket.fixedHeader.controlPacketType = 0x30; // 发布消息publishPacket.fixedHeader.remainingLength = 0x2D; // 可变报头长度为45字节publishPacket.topicLength = topic.length();memcpy(publishPacket.topic, topic.c_str(), topic.length());memcpy(publishPacket.payload, message.c_str(), message.length());// 发送MQTT发布报文if (send(sockfd, &publishPacket, sizeof(publishPacket), 0) < 0) {std::cerr << "Failed to send publish packet" << std::endl;return false;}// 处理MQTT服务器的响应return true;}ssize_t receive(uint8_t* buffer, size_t bufferSize) {if (!connected) {std::cerr << "Not connected" << std::endl;return -1;}return recv(sockfd, buffer, bufferSize, 0);}private:int sockfd;bool connected;
};
在上述代码中,我们将MQTT连接功能封装到了一个名为MqttClient
的类中,并提供了连接、断开、订阅主题、发送数据和接收数据等函数。你可以根据自己的实际需求调用这些函数。
例如,要连接到MQTT服务器,请使用以下代码:
MqttClient client;
if (client.connect("client_id")) {// 连接成功
} else {// 连接失败
}
要订阅主题,请使用以下代码:
if (client.subscribe("topic", 0x01)) {// 订阅成功
} else {// 订阅失败
}
要发布消息,请使用以下代码:
if (client.publish("topic", "message", 0x01)) {// 发布成功
} else {// 发布失败
}
要接收消息,请使用以下代码:
uint8_t buffer[1024];
ssize_t bytesRead = client.receive(buffer, sizeof(buffer));
if (bytesRead >= 0) {// 处理接收到的数据
} else {// 接收数据失败
}
下面是一个完整的MQTT连接示例,包括从用户输入地址和端口到订阅主题和发送消息的全部过程:
#include <iostream>
#include <string>// 导入上述的MQTT连接库文件int main() {std::string serverAddress;int serverPort;std::string clientId;std::string topic;// 获取用户输入的MQTT服务器地址和端口std::cout << "Enter MQTT server address: ";std::cin >> serverAddress;std::cout << "Enter MQTT server port: ";std::cin >> serverPort;std::cout << "Enter client ID: ";std::cin >> clientId;MqttClient client;// 连接到MQTT服务器if (client.connect(clientId)) {std::cout << "Connected to MQTT server" << std::endl;// 订阅主题std::cout << "Enter topic to subscribe: ";std::cin >> topic;if (client.subscribe(topic, 0x01)) {std::cout << "Subscribed to topic: " << topic << std::endl;} else {std::cerr << "Failed to subscribe to topic" << std::endl;return -1;}// 发布消息std::string message;std::cout << "Enter message to publish: ";std::cin.ignore(); // 忽略之前的换行符std::getline(std::cin, message);if (client.publish(topic, message, 0x01)) {std::cout << "Published message: " << message << std::endl;} else {std::cerr << "Failed to publish message" << std::endl;return -1;}// 接收消息uint8_t buffer[1024];ssize_t bytesRead = client.receive(buffer, sizeof(buffer));if (bytesRead >= 0) {std::string receivedMessage(reinterpret_cast<char*>(buffer), bytesRead);std::cout << "Received message: " << receivedMessage << std::endl;} else {std::cerr << "Failed to receive message" << std::endl;return -1;}client.disconnect();std::cout << "Disconnected from MQTT server" << std::endl;} else {std::cerr << "Failed to connect to MQTT server" << std::endl;return -1;}return 0;
}
在这个示例中,我们使用std::cin
从用户那里获取了MQTT服务器的地址、端口、客户端ID以及要订阅和发布的主题和消息。然后,我们通过调用相应的函数来进行连接、订阅、发布和接收。