消息的发送与接收

消息的发送与接收

消息的发送与接收不仅仅是在于聊天功能的实现。其实还有很多种情况也算"消息的发送与接收"。而且我们还可以通过多种方法去实现。我们可以基于实际情况来选择。

WebSocket实现

node做后端。找了好多,前端页面总是用到了jQuery,包括底下的java做后端的前端代码等。我们先用最简单的代码来帮助我们吧!

首先,配好package.json。注意依赖要引入ws。运行项目前先要运行一下:npm i

{"name": "chat-server","version": "1.0.0","description": "","main": "client.js","scripts": {"test": "echo \"Error: no test specified\" && exit 1","start": "node server.js"},"dependencies": {"debug": "~2.6.9","ejs": "~2.6.1","express": "~4.16.1","express-session": "^1.17.2","http-errors": "~1.6.3","jsonwebtoken": "^8.5.1","ws": "^8.5.0"},"keywords": [],"author": "","license": "ISC"
}

创建server.js,这是服务端

const  WebSocket = require("ws")
WebSocketServer = WebSocket.WebSocketServer
const wss = new WebSocketServer({ port: 8080 });
wss.on('connection', function connection(ws) {ws.on('message', function message(data, isBinary) {wss.clients.forEach(function each(client) {if (client !== ws && client.readyState === WebSocket.OPEN) {client.send(data, { binary: isBinary });}});});ws.send('欢迎加入聊天室');
});

创建client.js,这是客户端

const  WebSocket = require("ws")
var ws = new WebSocket("ws://localhost:8080")
ws.onopen = ()=>{console.log("open")
}
ws.onmessage = (evt)=>{console.log(evt.data)
}

创建test.html,此处用vscode插件live-server打开。

<!DOCTYPE html>
<html lang="en"><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><meta http-equiv="X-UA-Compatible" content="ie=edge"><title>Document</title>
</head><body><h1>websockets简单示例</h1><br><div id="message"></div><div><input type="text" id="sendText"><button id="connect" onclick="connect()">建立连接</button><button id="sendData" onclick="sendData()">发送数据</button><button id="closeConnect" onclick="closeConnect()">关闭连接</button></div>
</body>
<script type="text/javascript">let websockets;//创建一个数组对象用于存放当前的连接的状态,以便在页面上实时展示出来当前的状态let statusArr = [{ state: 0, value: '正在连接' },{ state: 1, value: '已建立连接' },{ state: 2, value: '正在关闭连接' },{ state: 3, value: '已关闭连接' },]/***   建立连接**/function connect() {// 1. 创建websockets对象,参数为服务器websockets地址websockets = new WebSocket("ws:127.0.0.1:8080");// 2.监听websocket的状态变化,接收的信息,关闭时的状态//监听连接状态的变化websockets.onopen = (event) => socketChange();//监听接收消息的情况websockets.onmessage = (res) => {document.querySelector("#message").innerHTML += `<p>接收数据: ${res.data}</p>`}//监听关闭时的状态变化websockets.onclose = (event) => socketChange();}/***   socket状态变化**/function socketChange() {let state = websockets.readyState;let val = statusArr.map((item) => {if (item.state == state) {return item.value}});//实时显示状态的变化document.querySelector("#message").innerHTML += `<p>当前的socket连接状态是: ${val}</p>`}/***   发送数据**/function sendData() {//1. 首先获取输入的信息,判断信息是否可以发送let val = document.querySelector("#sendText").value;if (val == "" || val == undefined) {document.querySelector("#message").innerHTML += "<p>发送数据为空,请填写完成后再发送!</p>";return;}websockets.send(val);document.querySelector("#message").innerHTML += `<p>发送数据:${val}</p>`;}/***   关闭连接**/function closeConnect() {websockets.close();}
</script>
</html>

在本文件夹内的控制台输入 node .\server.js node .\client.js 启动服务器端和客户端。

我们打开Document两个窗口,尝试进行消息交流。

在这里插入图片描述

我们发现消息是能正常发送出去并能被正常接收到。

Java使用Socket实现

java做后端。此处使用了Spring-websocket: Spring boot整合websocket实现即时通讯 (gitee.com)的代码。

引入依赖:

 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId><version>2.1.6.RELEASE</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-thymeleaf</artifactId><version>2.1.6.RELEASE</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId><version>2.1.6.RELEASE</version><exclusions><!-- 去掉springboot默认配置 -->  <exclusion>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-logging</artifactId>  </exclusion>  </exclusions>  </dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><version>2.1.6.RELEASE</version><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.1.6.RELEASE</version><exclusions><!-- 去掉springboot默认配置 -->  <exclusion>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-logging</artifactId>  </exclusion>  </exclusions>  </dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.25</version></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.0.1</version></dependency><dependency><groupId>com.github.pagehelper</groupId><artifactId>pagehelper</artifactId><version>4.1.6</version></dependency><dependency><groupId>org.apache.poi</groupId><artifactId>poi</artifactId><version>3.14</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.20</version></dependency><dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><version>42.2.5</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.68</version></dependency><dependency><groupId>commons-net</groupId><artifactId>commons-net</artifactId><version>3.1</version></dependency><dependency><groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-log4j2</artifactId><version>2.1.6.RELEASE</version></dependency>

核心方法:

webSocketServer类

package boot.spring.service;import java.io.IOException;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;import org.springframework.stereotype.Component;import com.alibaba.fastjson.JSON;import boot.spring.po.Message;@ServerEndpoint("/webSocket/{username}")
@Component
public class WebSocketServer {//静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。private static AtomicInteger onlineNum = new AtomicInteger();//concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象。private static ConcurrentHashMap<String, Session> sessionPools = new ConcurrentHashMap<>();//发送消息public void sendMessage(Session session, String message) throws IOException {if(session != null){synchronized (session) {System.out.println("发送数据:" + message);session.getBasicRemote().sendText(message);}}}//给指定用户发送信息public void sendInfo(String userName, String message){Session session = sessionPools.get(userName);try {sendMessage(session, message);}catch (Exception e){e.printStackTrace();}}// 群发消息public void broadcast(String message){for (Session session: sessionPools.values()) {try {sendMessage(session, message);} catch(Exception e){e.printStackTrace();continue;}}}//建立连接成功调用@OnOpenpublic void onOpen(Session session, @PathParam(value = "username") String userName){sessionPools.put(userName, session);addOnlineCount();System.out.println(userName + "加入webSocket!当前人数为" + onlineNum);// 广播上线消息Message msg = new Message();msg.setDate(new Date());msg.setTo("0");msg.setText(userName);broadcast(JSON.toJSONString(msg,true));}//关闭连接时调用@OnClosepublic void onClose(@PathParam(value = "username") String userName){sessionPools.remove(userName);subOnlineCount();System.out.println(userName + "断开webSocket连接!当前人数为" + onlineNum);// 广播下线消息Message msg = new Message();msg.setDate(new Date());msg.setTo("-2");msg.setText(userName);broadcast(JSON.toJSONString(msg,true));}//收到客户端信息后,根据接收人的username把消息推下去或者群发// to=-1群发消息@OnMessagepublic void onMessage(String message) throws IOException{System.out.println("server get" + message);Message msg=JSON.parseObject(message, Message.class);msg.setDate(new Date());if (msg.getTo().equals("-1")) {broadcast(JSON.toJSONString(msg,true));} else {sendInfo(msg.getTo(), JSON.toJSONString(msg,true));}}//错误时调用@OnErrorpublic void onError(Session session, Throwable throwable){System.out.println("发生错误");throwable.printStackTrace();}public static void addOnlineCount(){onlineNum.incrementAndGet();}public static void subOnlineCount() {onlineNum.decrementAndGet();}public static AtomicInteger getOnlineNumber() {return onlineNum;}public static ConcurrentHashMap<String, Session> getSessionPools() {return sessionPools;}
}

controller层

@Controller
public class ChatController {@AutowiredLoginService loginservice;@RequestMapping("/onlineusers")@ResponseBodypublic Set<String> onlineusers(@RequestParam("currentuser") String currentuser) {ConcurrentHashMap<String, Session> map = WebSocketServer.getSessionPools();Set<String> set = map.keySet();Iterator<String> it = set.iterator();Set<String> nameset = new HashSet<String>();while (it.hasNext()) {String entry = it.next();if (!entry.equals(currentuser))nameset.add(entry);}return nameset;}@RequestMapping("getuid")@ResponseBodypublic User getuid(@RequestParam("username") String username) {Long a = loginservice.getUidbyname(username);User u = new User();u.setUid(a);return u;}
}

启动项目,访问:http://localhost:8080/login。此前需要先运行项目里的stuff.sql文件,并配好其数据库密码等,点击登录即可。

在这里插入图片描述

这个项目包含了:即时通信,查看当前在线的其他用户,用户的上线提醒,群发消息等功能。

消息中间件实现

消息中间件市面上常见的四种:ActiveMQ 、RabbitMQ 、 RocketMQ、Kafka等。

几种常见MQ的对比

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延迟:RabbitMQ、Kafka

综合看来,我们选择比较流行的rabbitmq来帮助我们。

部署

在安装rabbitmq之前需要先安装erlang,并配置好其环境变量。(跟java环境变量配置一样)

安装rabbitmq部署并启动服务:安装rabbitmq(解压版教程)_rabbitmq-server 压缩包版本如何使用-CSDN博客

启动有可能会报毒,我们选择允许。

访问:RabbitMQ Management,显示出rabbitmq的管理界面就算部署成功!

默认账号密码都是guest。

RabbitMQ的简要概述

RabbitMQ中的一些角色:

  • publisher:生产者
  • consumer:消费者
  • exchange个:交换机,负责消息路由
  • queue:队列,存储消息
  • virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离

RabbitMQ使用流程总结

基本消息队列的消息发送流程:

  1. 建立connection

  2. 创建channel

  3. 利用channel声明队列

  4. 利用channel向队列发送消息

基本消息队列的消息接收流程:

  1. 建立connection

  2. 创建channel

  3. 利用channel声明队列

  4. 定义consumer的消费行为handleDelivery()

  5. 利用channel将消费者与队列绑定

SpringAMQP

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息
  <dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>

创建2个模块,publisher和consumer,完善一下项目的启动类,yaml配置文件等。每个模块写一个测试类:

PublisherTest:

public class PublisherTest {@Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("localhost");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("guest");factory.setPassword("guest");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.发送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("发送消息成功:【" + message + "】");// 5.关闭通道和连接channel.close();connection.close();}
}

ConsumerTest:

public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("localhost");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("guest");factory.setPassword("guest");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.处理消息String message = new String(body);System.out.println("接收到消息:【" + message + "】");}});System.out.println("等待接收消息。。。。");}
}

先启动publisher:

在这里插入图片描述

再看看rabbitmq的控制台:
在这里插入图片描述

再启动consumer
在这里插入图片描述

此时再看看rabbitmq控制台:

在这里插入图片描述

消息已经被成功消费了!

rabbitmq实现即时通信

具体可以参考:RabbitMQ实现即时通讯_rabbitmq 聊天-CSDN博客

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,该协议构建于TCP/IP协议上。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。

RabbitMQ启用MQTT功能,需要先安装然RabbitMQ然后再启用MQTT插件。

接下来就是启用RabbitMQ的MQTT插件了,默认是不启用的,使用命令开启即可;

需要进到目录\rabbitmq_server-3.9.13\sbin里,执行如下命令:

.\rabbitmq-plugins.bat enable rabbitmq_mqtt

在这里插入图片描述

MQTTX

我们可以使用MQTT客户端来测试MQTT的即时通讯功能,这里使用的是MQTTX这个客户端工具。下载地址:MQTTX: Your All-in-one MQTT Client Toolbox

点击新建连接按钮或者左边的加号来创建一个MQTT客户端;

接下来对MQTT客户端进行配置,主要是配置好协议端口、连接用户名密码和QoS即可。

再配置一个订阅者,订阅者订阅testTopicA这个主题,我们会向这个主题发送消息;

在这里插入图片描述

发布者向主题中发布消息,订阅者可以实时接收到。

在这里插入图片描述

前端直接实现即时通讯

既然MQTTBox客户端可以直接通过RabbitMQ实现即时通讯,那我们是不是直接使用前端技术也可以实现即时通讯?答案是肯定的!下面我们将通过html+javascript实现一个简单的聊天功能,真正不写一行后端代码实现即时通讯!

  • 由于RabbitMQ与Web端交互底层使用的是WebSocket,所以我们需要开启RabbitMQ的MQTT WEB支持,使用如下命令开启即可;
rabbitmq-plugins enable rabbitmq_web_mqtt
  • 开启成功后,查看管理控制台,我们可以发现MQTT的WEB服务运行在15675端口上了。

WEB端与MQTT服务进行通讯需要使用一个叫MQTT.js的库,项目地址:https://github.com/mqttjs/MQTT.js

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>Title</title>
</head>
<body>
<div><label>目标Topic:<input id="targetTopicInput" type="text"></label><br><label>发送消息:<input id="messageInput" type="text"></label><br><button onclick="sendMessage()">发送</button><button onclick="clearMessage()">清空</button><div id="messageDiv"></div>
</div>
</body>
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
<script>//RabbitMQ的web-mqtt连接地址const url = 'ws://localhost:15675/ws';//获取订阅的topicconst topic = getQueryString("topic");//连接到消息队列let client = mqtt.connect(url);client.on('connect', function () {//连接成功后订阅topicclient.subscribe(topic, function (err) {if (!err) {showMessage("订阅topic:" + topic + "成功!");}});});//获取订阅topic中的消息client.on('message', function (topic, message) {showMessage("收到消息:" + message.toString());});//发送消息function sendMessage() {let targetTopic = document.getElementById("targetTopicInput").value;let message = document.getElementById("messageInput").value;//向目标topic中发送消息client.publish(targetTopic, message);showMessage("发送消息给" + targetTopic + "的消息:" + message);}//从URL中获取参数function getQueryString(name) {let reg = new RegExp("(^|&)" + name + "=([^&]*)(&|$)", "i");let r = window.location.search.substr(1).match(reg);if (r != null) {return decodeURIComponent(r[2]);}return null;}//在消息列表中展示消息function showMessage(message) {let messageDiv = document.getElementById("messageDiv");let messageEle = document.createElement("div");messageEle.innerText = message;messageDiv.appendChild(messageEle);}//清空消息列表function clearMessage() {let messageDiv = document.getElementById("messageDiv");messageDiv.innerHTML = "";}
</script>
</html>

之后在界面输入对应的topic地址,发送消息,发现是可以实现的!

在SpringBoot中使用

没有特殊业务需求的时候,前端可以直接和RabbitMQ对接实现即时通讯。但是有时候我们需要通过服务端去通知前端,此时就需要在应用中集成MQTT了

此处项目源码地址:https://github.com/macrozheng/mall-learning/tree/master/mall-tiny-mqtt

首先,项目需要引入mqtt依赖

<!--Spring集成MQTT-->
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId>
</dependency>

在application.yml添加配置:

rabbitmq:mqtt:url: tcp://localhost:1883username: guestpassword: guestdefaultTopic: testTopic

编写一个Java配置类从配置文件中读取配置便于使用;

/*** MQTT相关配置*/
@Data
@EqualsAndHashCode(callSuper = false)
@Component
@ConfigurationProperties(prefix = "rabbitmq.mqtt")
public class MqttConfig {/*** RabbitMQ连接用户名*/private String username;/*** RabbitMQ连接密码*/private String password;/*** RabbitMQ的MQTT默认topic*/private String defaultTopic;/*** RabbitMQ的MQTT连接地址*/private String url;
}

添加MQTT消息订阅者相关配置,使用@ServiceActivator注解声明一个服务激活器,通过MessageHandler来处理订阅消息;

/*** MQTT消息订阅者相关配置*/
@Slf4j
@Configuration
public class MqttInboundConfig {@Autowiredprivate MqttConfig mqttConfig;@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}@Beanpublic MessageProducer inbound() {MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient",mqttConfig.getDefaultTopic());adapter.setCompletionTimeout(5000);adapter.setConverter(new DefaultPahoMessageConverter());//设置消息质量:0->至多一次;1->至少一次;2->只有一次adapter.setQos(1);adapter.setOutputChannel(mqttInputChannel());return adapter;}@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler handler() {return new MessageHandler() {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {//处理订阅消息log.info("handleMessage : {}",message.getPayload());}};}
}

注意:messageHandler导包路径:import org.springframework.messaging.MessageHandler;

添加MQTT消息发布者相关配置;

/*** MQTT消息发布者相关配置*/
@Configuration
public class MqttOutboundConfig {@Autowiredprivate MqttConfig mqttConfig;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[] { mqttConfig.getUrl()});options.setUserName(mqttConfig.getUsername());options.setPassword(mqttConfig.getPassword().toCharArray());factory.setConnectionOptions(options);return factory;}@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler =new MqttPahoMessageHandler("publisherClient", mqttClientFactory());messageHandler.setAsync(true);messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());return messageHandler;}@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}
}

添加MQTT网关,用于向主题中发送消息;

/*** MQTT网关,通过接口将数据传递到集成流*/
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {/*** 发送消息到默认topic*/void sendToMqtt(String payload);/*** 发送消息到指定topic*/void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);/*** 发送消息到指定topic并设置QOS*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

添加MQTT测试接口,使用MQTT网关向特定主题中发送消息;

/*** MQTT测试接口*/
@RestController
@RequestMapping("/mqtt")
public class MqttController {@Autowiredprivate MqttGateway mqttGateway;@PostMapping("/sendToDefaultTopic")public CommonResult sendToDefaultTopic(String payload) {mqttGateway.sendToMqtt(payload);return CommonResult.success(null);}@PostMapping("/sendToTopic")public CommonResult sendToTopic(String payload, String topic) {mqttGateway.sendToMqtt(payload, topic);return CommonResult.success(null);}
}

别忘声明一个返回类

package com.itcast.mq.model;import lombok.Data;import java.io.Serializable;@Data
public class CommonResult<T> {/*返回体*/private  Integer code;private String msg;private T data;/*成功,且返回体有数据*/public static CommonResult success(Object object) {CommonResult r = new CommonResult();r.setCode(200);r.setMsg("成功");r.setData(object);return r;}//成功,但返回体没数据public static CommonResult success(){return success(null);}//失败返回信息public static CommonResult Err(Integer code,String msg){CommonResult r = new CommonResult();r.setCode(code);r.setMsg(msg);return r;}}

配置一下启动类

@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}

启动该项目,打开postman输入链接:http://localhost:8080/mqtt//sendToTopic?payload=&topic=。填好输入内容和目标topic的id即可!
在这里插入图片描述

回到mqtt客户端看一下

消息发送成功!

netty实现

Netty是 一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端

超详细Netty入门,看这篇就够了! - 知乎 (zhihu.com)

具体实现可以参考:https://blog.csdn.net/weixin_44814270/article/details/132947704

依赖:

 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId><version>2.1.6.RELEASE</version><exclusions><!-- 去掉springboot默认配置 --><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.1.6.RELEASE</version><exclusions><!-- 去掉springboot默认配置 --><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><!--我这里使用的是jfinal-enjoy模板引擎--><dependency><groupId>com.jfinal</groupId><artifactId>enjoy</artifactId><version>5.1.2</version></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.65.Final</version> <!-- 使用最新版本 --></dependency></dependencies>

配置NettyChatServer:

public class NettyChatServer {private final int port;private final EventExecutorGroup eventExecutorGroup;private final ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);public NettyChatServer(int port) {this.port = port;this.eventExecutorGroup = new DefaultEventExecutorGroup(4); // 用于在handler中处理耗时任务}public void start() throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 字符串编解码器,用于将消息编码成字符串和解码成字符串pipeline.addLast(new HttpServerCodec());pipeline.addLast(new HttpObjectAggregator(65536));pipeline.addLast(new WebSocketServerProtocolHandler("/websocket"));pipeline.addLast(eventExecutorGroup, new ChatServerHandler(channelGroup));// 添加自定义的聊天处理器//  pipeline.addLast(eventExecutorGroup, new ChatServerHandler(channelGroup));}});ChannelFuture channelFuture = serverBootstrap.bind(port).sync();System.out.println("Chat Server started on port " + port);channelFuture.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}public void stop() {// 停止服务器}public static void main(String[] args) {int port = 8888;NettyChatServer chatServer = new NettyChatServer(port);try {chatServer.start();} catch (Exception e) {e.printStackTrace();}}

配置ChatServerHandler

public class ChatServerHandler extends SimpleChannelInboundHandler<WebSocketFrame> {private final ChannelGroup channelGroup;public ChatServerHandler(ChannelGroup channelGroup) {this.channelGroup = channelGroup;}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {// 处理WebSocket消息if (frame instanceof TextWebSocketFrame) {TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;String message = textFrame.text();// 在服务器控制台上输出消息System.out.println("Received message: " + message);// 将消息广播给所有连接的客户端channelGroup.writeAndFlush(new TextWebSocketFrame(message));}}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) {// 新客户端连接时添加到ChannelGroupchannelGroup.add(ctx.channel());}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) {// 客户端断开连接时从ChannelGroup中移除channelGroup.remove(ctx.channel());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// 异常处理cause.printStackTrace();ctx.close();}

因为项目用到了,所以我们需要jfinal-enjoy模板引擎文件:

package com.zd.config;import com.jfinal.template.Engine;
import com.jfinal.template.ext.spring.JFinalViewResolver;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class EnjoyConfig {@Bean(name = "jfinalViewResolver")public JFinalViewResolver getJFinalViewResolver() {// 创建用于整合 spring boot 的 ViewResolver 扩展对象JFinalViewResolver jfr = new JFinalViewResolver();// 对 spring boot 进行配置jfr.setSuffix(".html");jfr.setContentType("text/html;charset=UTF-8");jfr.setOrder(0);// 设置在模板中可通过 #(session.value) 访问 session 中的数据jfr.setSessionInView(true);// 获取 engine 对象,对 enjoy 模板引擎进行配置,配置方式与前面章节完全一样Engine engine = JFinalViewResolver.engine;// 热加载配置能对后续配置产生影响,需要放在最前面engine.setDevMode(true);// 使用 ClassPathSourceFactory 从 class path 与 jar 包中加载模板文件engine.setToClassPathSourceFactory();// 在使用 ClassPathSourceFactory 时要使用 setBaseTemplatePath// 设置静态资源路径在 /static 下engine.setBaseTemplatePath("/static/");return jfr;}
}

配置一下controller层:

@Controller
public class ChatController {@RequestMapping("/chat")public String main() {return "main";}
}

配置html、css、js

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>简单聊天室</title><!-- 引入Bootstrap CSS文件 --><link href="./bootstarp/css/bootstrap.min.css" rel="stylesheet"><link href="main.css" rel="stylesheet">
</head>
<body>
<div class="container"><div class="row"><div class="col-md-8 offset-md-2"><div id="chat-container"><div id="chat-header"><h2>简单聊天室</h2></div><div id="chat-box"><!-- 示例聊天消息 --><!--  <div class="message"><div class="avatar">A</div><div class="message-content"><div class="sender-name">User 1</div><div class="message-text">Hello, how are you?</div></div></div>--><!-- 示例聊天消息结束 --></div><div id="message-buttons"><input type="text" id="message-input" placeholder="输入消息..."><input type="file" id="file-input"><button class="btn btn-primary" onclick="sendMessage()">发送文本</button><button class="btn btn-primary" onclick="sendFile()">发送文件</button><button class="btn btn-danger" id="clear-button" onclick="clearChat()">清空聊天</button></div></div></div></div>
</div><!-- 引入jQuery -->
<script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
<!-- 引入Bootstrap JS文件 -->
<script src="./bootstarp/js/bootstrap.min.js"></script>
<script src="main.js"></script>
</body>
</html>
body {background-color: #f2f2f2;font-family: Arial, Helvetica, sans-serif;margin: 0;padding: 0;
}#chat-container {max-width: 600px;margin: 20px auto;background-color: #fff;border-radius: 5px;box-shadow: 0px 0px 10px rgba(0, 0, 0, 0.1);overflow: hidden;
}#chat-header {background-color: #007BFF;color: #fff;padding: 10px;text-align: center;border-top-left-radius: 5px;border-top-right-radius: 5px;
}#chat-box {max-height: 100vh;width: 100%;overflow-y: scroll;height: 600px;
}.message {display: flex;margin: 10px;padding: 10px;border-bottom: 1px solid #ccc;
}.avatar {width: 50px;height: 50px;background-color: #007BFF;color: #fff;border-radius: 50%;text-align: center;line-height: 50px;margin-right: 10px;
}.message-content {flex-grow: 3;
}.sender-name {font-weight: bold;margin-bottom: 5px;
}.message-text {word-wrap: break-word;
}#message-input, #file-input {width: 100%;padding: 10px;border: 1px solid #ccc;
}#message-buttons {padding: 10px;text-align: center;
}button {padding: 10px 20px;background-color: #007BFF;color: #fff;border: none;cursor: pointer;margin-right: 10px;
}button:hover {background-color: #0056b3;
}#clear-button {background-color: #dc3545;
}@media (max-width: 768px) {#chat-container {margin-top: 10px;}#chat-box {max-height: 200px;}
}
// WebSocket连接
const socket = new WebSocket('ws://localhost:8888/websocket'); // 请将 your_netty_server_address 替换为实际的Netty WebSocket服务器地址socket.addEventListener('open', (event) => {console.log('WebSocket连接已建立');
});socket.addEventListener('message', (event) => {// 解析消息console.log(event)const data = JSON.parse(event.data);const chatBox = document.getElementById('chat-box');if (data.type === 'text') {// 接收到文本消息const messageDiv = document.createElement('div');messageDiv.classList.add('message');const avatarDiv = document.createElement('div');avatarDiv.classList.add('avatar');avatarDiv.textContent = data.sender.charAt(0); // 使用发送者的首字母作为头像内容const messageContentDiv = document.createElement('div');messageContentDiv.classList.add('message-content');const senderNameDiv = document.createElement('div');senderNameDiv.classList.add('sender-name');senderNameDiv.textContent = data.sender;const messageTextDiv = document.createElement('div');messageTextDiv.classList.add('message-text');messageTextDiv.textContent = data.message;messageContentDiv.appendChild(senderNameDiv);messageContentDiv.appendChild(messageTextDiv);messageDiv.appendChild(avatarDiv);messageDiv.appendChild(messageContentDiv);chatBox.appendChild(messageDiv);} else if (data.type === 'file') {// 接收到文件消息const fileURL = URL.createObjectURL(data.file);const messageDiv = document.createElement('div');messageDiv.classList.add('message');const avatarDiv = document.createElement('div');avatarDiv.classList.add('avatar');avatarDiv.textContent = data.sender.charAt(0); // 使用发送者的首字母作为头像内容const messageContentDiv = document.createElement('div');messageContentDiv.classList.add('message-content');const senderNameDiv = document.createElement('div');senderNameDiv.classList.add('sender-name');senderNameDiv.textContent = data.sender;const fileLink = document.createElement('a');fileLink.href = fileURL;fileLink.textContent = '下载文件';fileLink.download = data.fileName;messageContentDiv.appendChild(senderNameDiv);messageContentDiv.appendChild(fileLink);messageDiv.appendChild(avatarDiv);messageDiv.appendChild(messageContentDiv);chatBox.appendChild(messageDiv);}// 滚动到最新消息chatBox.scrollTop = chatBox.scrollHeight;
});function sendMessage() {const messageInput = document.getElementById('message-input');const message = messageInput.value.trim();if (message !== '') {// 发送文本消息到服务器const data = {sender: 'YSK',type: 'text',message: message};socket.send(JSON.stringify(data));// 清空输入框messageInput.value = '';}
}function sendFile() {const fileInput = document.getElementById('file-input');const file = fileInput.files[0];if (file) {// 发送文件到服务器const reader = new FileReader();reader.onload = function (event) {const data = {type: 'file',fileName: file.name,file: event.target.result};socket.send(JSON.stringify(data));};reader.readAsArrayBuffer(file);// 清空文件选择框fileInput.value = '';}
}function clearChat() {const chatBox = document.getElementById('chat-box');chatBox.innerHTML = '';
}// 监听Enter键,发送文本消息
const messageInput = document.getElementById('message-input');
messageInput.addEventListener('keyup', function (event) {if (event.key === 'Enter') {sendMessage();}
});

先启动nettyServer 再启动Application,访问:简单聊天室

我们似乎能基本实现聊天。(发送文件需要服务器存储未实现)。但是还是缺了太多东西。这里netty我们还是接触了一点点罢了。

netty更多内容可以参考官方文档Netty: Home或者黑马的教程:https://www.bilibili.com/video/BV1py4y1E7oA

第三方平台实现

环信 - 中国IM即时通讯云服务开创者! (easemob.com)

每个免费用户最多可以注册100个能通讯的用户。

基于项目的实现

简便且功能要求不太多的:websocket

搭建难度高,但是稳定且专业的:netty

哪都沾一点的:rabbitmq

直接上手但是有限制需要花钱的:第三方

参考文档

Spring-websocket: Spring boot整合websocket实现即时通讯 (gitee.com)

RabbitMQ实现即时通讯_rabbitmq 聊天-CSDN博客

https://github.com/macrozheng/mall-learning/tree/master/mall-tiny-mqtt

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/623540.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

lvgl简介

LVGL&#xff08;Light and Versatile Graphics Library&#xff09;是一个开源的图形用户界面库&#xff0c;旨在提供轻量级、可移植、灵活和易于使用的图形用户界面解决方案。 它适用于嵌入式系统&#xff0c;可以在不同的操作系统、微控制器和图形加速器上运行。LVGL的核心…

1.go安装及相关配置

目录 概述下载基本命令Go build环境设置 结束 概述 下载 官网速递 选择 1.20.x 一个是因为是次新版本&#xff0c;另一个&#xff0c;mac 系统是 10.13.6 &#xff0c;1.20.x 是最后一个支持此版本的。 环境变量 GOROOT go的安装目录(将go安装到哪里 which go、cat /etc/p…

高并发IO底层原理

1 概述 IO底层原理是隐藏在Java编程知识之下的基础知识&#xff0c;是开发人员必须掌握的基本原理。本文从操作系统的底层原理入手&#xff0c;通过图文的方式为大家深入剖析高并发IO的底层原理&#xff0c;并介绍如何通过设置来让操作系统支持高并发。 2 IO读写的基本原理 为…

文件模块常用api

文件模块常用api 文件夹常用操作 文件夹操作 fs.mkdir fs.rmdir 需要是空目录 题目&#xff1a;递归删除目录* 串行/并行删除文件*

【电商API】DIY网络爬虫收集电商数据

DIY网络爬虫收集电商数据 网络爬虫是最常见和使用最广泛的数据收集方法。DIY网络爬虫确实需要一些编程知识&#xff0c;但整个过程比一开始看起来要简单得多。 当然&#xff0c;爬虫的有效性取决于许多因素&#xff0c;例如目标的难度、网站方的反爬虫措施等。如果将网络抓取用…

机器学习根据金标准标记数据-九五小庞

根据金标准标记数据是一种在机器学习和数据科学中常见的操作&#xff0c;主要用于评估分类模型的性能。其基本步骤如下&#xff1a; 收集数据&#xff1a;首先需要收集相关领域的原始数据&#xff0c;这些数据通常来自不同的来源和渠道。数据清洗和预处理&#xff1a;在这一步…

soft212期末

文章目录 安卓填空题选择题 C# 安卓 Dalvik中得到Dx工具会把部分class文件转换成dex文件。 如果希望在XML布局文件中调用颜色资源&#xff0c;可以使用color调用 Android程序入口的Activity是在AndroidManifest.xml文件中注册的 Android中查看应用程序日志的工具是LogCat Dal…

JavaScript面向对象之实践项目

1、cat项目 &#xff08;1&#xff09;需要修改的原代码 &#xff08;2&#xff09;修改要求 使用括号表示法将name属性的值存储在变量catName中。使用点表示法运行greeting()方法。将color属性值更新为白。重写greeting() 方法&#xff0c;使它的问候语为"孟买猫碳头对…

良心推荐!几款收藏的神级IDEA插件分享

本文已收录至Github&#xff0c;推荐阅读 &#x1f449; Java随想录 微信公众号&#xff1a;Java随想录 文章目录 CodeGlanceGsonFormatPOJO to JsonRainbow BracketsTranslationLombokMaven HelperAlibaba Java Code GuidelinesGenerateAllSetterMybatisXChinese (Simplified…

Tomcat10.X部署老版本axis2 webservice项目不生效

目录 一、使用场景 二、问题描述 三、原因排查 四、解决方案 一、使用场景 原来项目是OpenJDK8tomcat9构建&#xff0c;现在需要升级到OpenJDK17tomcat10的组合。原来的webservice项目打包成aar格式&#xff0c;通过axis2部署在tomcat上。 二、问题描述 在配置好jdk和to…

【软件测试学习笔记1】测试基础

1.软件测试的定义 软件的定义&#xff1a;控制计算机硬件工作的工具 软件的基本组成&#xff1a;页面客户端&#xff0c;代码服务器&#xff0c;数据服务器 软件产生的过程&#xff1a;需求产生&#xff08;产品经理&#xff09;&#xff0c;需求文档&#xff0c;设计效果图…

NEAU_Python程序设计结课作业

1.身份证号合法性判别 【问题描述】我国身份证号码由数字与字母混合组成。早期身份证由15位数字构成。后来考虑到千年虫问题(&#xff08;15位的身份证号码只能为150c年1月1日到9年12月31日出生的人确号)&#xff0c;所以又增加了18位身份证号码编号规则。最后—位(第18位)校验…

用二维码介绍产品详情,扫码查看图文并茂的宣传册

传统的产品宣传方式&#xff0c;往往以产品手册、宣传单等纸质物料为主&#xff0c;更新成本高昂&#xff0c;一旦修改内容&#xff0c;就必须重新印刷&#xff0c;而且不易携带和保存&#xff0c;影响宣传效果和客户体验。 为了避免上述问题&#xff0c;可以在草料上搭建产品…

【树莓派】网线远程连接电脑和树莓派,实现SSH连接

目录 1、硬件连接&#xff1b; 2、电脑端&#xff1a; 3、查找树莓派的IP地址 4、开启树莓派的SSH接口 5、putty 6、命令行 参考文章 通过网线连接笔记本与树莓派 开启SSH和VNC功能 无显示器安装树莓派 实现&#xff1a;打开putty输入树莓派地址使用ssh方式登陆&…

java小游戏——动漫美女拼图

1&#xff1a;继承 1.1 继承概述 首先&#xff0c;我们来说一下&#xff0c;什么是继承&#xff1a; 继承是面向对象三大特征之一(封装&#xff0c;继承和多态) 可以使得子类具有父类的属性和方法&#xff0c;还可以在子类中重新定义&#xff0c;追加属性和方法 也就是说&…

【办公技巧】ppt修改全部字体怎么改?

制作完PPT之后&#xff0c;想要更换ppt中的字体&#xff0c;有没有什么快捷的方法呢&#xff1f;今天分享两个方法&#xff0c;一键修改ppt文件字体。 方法一&#xff1a; 找到功能栏中的编辑选项卡&#xff0c;点击替换 – 替换字体&#xff0c;在里面选择我们想要替换的字体…

【DolphinScheduler】datax读取hive分区表时,空分区、分区无数据任务报错问题解决

问题背景&#xff1a; 最近在使用海豚调度DolphinScheduler的Datax组件时&#xff0c;遇到这么一个问题&#xff1a;之前给客户使用海豚做的离线数仓的分层搭建&#xff0c;一直都运行好好的&#xff0c;过了个元旦&#xff0c;这几天突然在数仓做任务时报错&#xff0c;具体报…

JUC之锁

乐观锁和悲观锁 悲观锁 当一个线程在操作资源的时候&#xff0c;会悲观的任务有其他的线程会来抢占该资源&#xff0c;因此会在操作资源前进行加锁&#xff0c;避免其他线程抢占。 Synchronized关键字和Lock实现类就是悲观锁。 显示的锁定资源后再对资源进行操作。 使用场景&…

生日视频模板-试试这样制作

视频制作已经成为表达情感、记录生活的重要方式。尤其在生日这样的特殊日子&#xff0c;一份个性化的视频祝福不仅能让人感到温馨&#xff0c;还能成为长久珍藏的回忆。那么&#xff0c;如何快速制作出精美的生日模版视频呢&#xff1f;下面就给大家介绍几种可以制作生日模版的…

通信入门系列——离散卷积、连续卷积、卷积性质

本节目录 一、线性系统的激励响应 1、离散δ信号 2、离散卷积 3、连续δ信号 4、连续卷积 二、卷积性质 1、交换律 2、分配律 3、结合律 4、与冲激函数卷积本节内容 一、线性系统的激励响应 输入信号又称为激励&#xff0c;输出信号又称为响应。一个信号输入给一个线性系统的时…