1.添加依赖:
首先,需要在Flutter项目的pubspec.yaml文件中添加mqtt_client依赖。
dependencies:#https://pub.dev/packages/mqtt_clientmqtt_client: ^10.0.02.创建MQTT客户端并连接到MQTT服务器:
2.创建一个MQTT客户端实例来进行连接和通信
Future<MqttServerClient> connect(String cid) async {print('mqtt connect host = $host cid = $cid ');MqttServerClient client =MqttServerClient.withPort(host, cid, port);client.logging(on: true);client.onConnected = onConnected;client.onDisconnected = onDisconnected;client.onUnsubscribed = onUnsubscribed;client.onSubscribed = onSubscribed;client.onSubscribeFail = onSubscribeFail;client.pongCallback = pong;final connMessage = MqttConnectMessage().authenticateAs(user, pwd).keepAliveFor(60)// 保持连接时间,单位为秒.withWillTopic('willtopic').withWillMessage('Will message').startClean()// 清理会话.withWillQos(MqttQos.atLeastOnce);client.connectionMessage = connMessage;try {await client.connect();} catch (e) {print('Exception: $e');client.disconnect();}return client;}
其中host 是主机名,port是端口号,cid是客户端ID,你可以根据需要为其分配一个唯一的标识。
3.订阅主题:
一旦连接到MQTT服务器,你可以订阅感兴趣的主题以接收消息。以下是订阅主题的示例代码:
//用于监听已订阅主题的消息到达。
client.updates?.listen((List<MqttReceivedMessage<MqttMessage>> c) {final MqttPublishMessage recMess = c[0].payload as MqttPublishMessage;final String pt = MqttPublishPayload.bytesToStringAsString(recMess.payload.message);// 解码包含中文字符的字符串final String decodedString = utf8.decode(pt.codeUnits);LogI('Received message: $decodedString from topic: ${c[0].topic}');});
通过client.subscribe
方法订阅一个主题,并使用client.updates
流来监听接收到的消息。
4.发布消息:
使用MQTT客户端来发布消息到特定的主题。以下是发布消息的示例代码:
final MqttClientPayloadBuilder builder = MqttClientPayloadBuilder();
builder.addString('Hello from Flutter');client.publishMessage('your_topic', MqttQos.exactlyOnce, builder.payload);
在上面的代码中,使用MqttClientPayloadBuilder
来构建消息的有效载荷,然后使用client.publishMessage
方法来发布消息到指定的主题。
5.断开连接:
当你不再需要与MQTT服务器通信时,记得断开连接以释放资源:
client.disconnect();
完整代码:
class XMqttClient {static final XMqttClient _instance = XMqttClient._();static XMqttClient get instance => _instance;static const host = '139.196.xx.xx';//替换成你自己的主机static const port = 1883;//端口号static const user = 'admin';//用户static const pwd = 'public';//密码List<String> topics = [];MqttClient? client;XMqttClient._() {_initMqtt();}_initMqtt() async {//clientld 确保唯一性,否则如果两台机器的clientld 相同 则会连上立刻断开连接!!!String clientId = '${DateTime.now().millisecondsSinceEpoch}asc';client = await connect(clientId);}Future<MqttServerClient> connect(String cid) async {print('mqtt connect host = $host cid = $cid ');MqttServerClient client =MqttServerClient.withPort(host, cid, port);client.logging(on: true);client.onConnected = onConnected;client.onDisconnected = onDisconnected;client.onUnsubscribed = onUnsubscribed;client.onSubscribed = onSubscribed;client.onSubscribeFail = onSubscribeFail;client.pongCallback = pong;final connMessage = MqttConnectMessage().authenticateAs(user, pwd).keepAliveFor(60)// 保持连接时间,单位为秒.withWillTopic('willtopic').withWillMessage('Will message').startClean()// 清理会话.withWillQos(MqttQos.atLeastOnce);client.connectionMessage = connMessage;try {await client.connect();} catch (e) {print('Exception: $e');client.disconnect();}//用于监听已订阅主题的消息到达。client.updates?.listen((List<MqttReceivedMessage<MqttMessage>> c) {final MqttPublishMessage recMess = c[0].payload as MqttPublishMessage;final String pt = MqttPublishPayload.bytesToStringAsString(recMess.payload.message);// 解码包含中文字符的字符串final String decodedString = utf8.decode(pt.codeUnits);LogI('Received message: $decodedString from topic: ${c[0].topic}');});return client;}///订阅一个主题_subscribe(String topic) {client?.subscribe(topic, MqttQos.atLeastOnce);}///订阅多个主题topicSubscribe(List<String> topics) async {this.topics.addAll(topics);if (client?.connectionStatus?.state == MqttConnectionState.connected) {topics.forEach((topic) {_subscribe(topic);});} else {//未连接成功 每隔3s重新订阅Future.delayed(const Duration(seconds: 3), () {topicSubscribe(topics);});}}///取消订阅_unsubscribe() {client?.unsubscribe('topic/test');}///断开连接_disconnect() {client?.disconnect();}// 连接成功void onConnected() {print('连接成功');}// 连接断开void onDisconnected() {print('连接断开');}// 订阅主题成功void onSubscribed(String topic) {print('订阅主题成功: $topic');}// 订阅主题失败void onSubscribeFail(String topic) {print('订阅主题失败 $topic');}// 成功取消订阅void onUnsubscribed(String? topic) {print('成功取消订阅: $topic');}// 收到 PING 响应void pong() {print('收到 PING 响应 Ping response client callback invoked');}}