实时通讯技术实现

实时通讯技术实现

前言

在CS架构中,经常会有实时通信的需求。客户端和服务端建立连接,服务端实时推送数据给客户端。本文介绍几种常见的实现方式,希望能给读者们一点点参考。

实时通讯的主要实现技术

  • 长轮询(Long Polling)
  • WebSocket
  • 服务器发送事件(Server-Sent Events, SSE)
  • XMPP (Extensible Messaging and Presence Protocol)
  • MQTT (Message Queuing Telemetry Transport)

长轮询

长轮询(Long Polling): 一种网络通信机制,用于实现客户端和服务器之间的实时数据传输。

Http长轮询机制:

原理图
原理图

长轮询工作原理:

  1. client端请求server端,并约定好超时时间;
  2. server端收到请求后,判断数据是否有变化:
    • 有变化:立即返回数据;
    • 没变化:则阻塞http请求,并且将长轮询请求任务放入队列中,然后开启任务调度,调度任务在长连接维持时间到期后,会将长轮询请求移除队列,并返回对应数据。
  3. 如果在挂起的这段时间内,数据有变化,服务器会移除队列中的长轮询请求,并响应数据给客户端。

长轮询优缺点:

优点:
  • 兼容性好
  • 实现简单
  • 即时性
缺点:
  • 服务器hold住连接,占用资源
  • 会有延迟,服务器响应后,客户端要重新发起连接(这段时间内有新消息不能即时触达)

Java 示例代码


@Controller
public class LongPollingController {

    private final Map<String, DeferredResult<String>> deferredResults = new ConcurrentHashMap<>();

    @GetMapping("/longpolling")
    @ResponseBody
    public Object longPolling() {
        DeferredResult<String> deferredResult = new DeferredResult<>(30000L, "time out");
        deferredResults.put("key", deferredResult); // 假设每个客户端有一个唯一的key
        return deferredResult;
    }

    @GetMapping("/push")
    public void push() {
        // 模拟异步数据获取
        deferredResults.get("key").setResult("data update"); // 当数据准备好时,触发长轮询
    }
}

DeferredResult 是 Spring MVC 提供的一种用于处理异步请求的机制,它允许在处理请求时延迟产生结果,并且允许在处理请求的不同线程中生成结果。DeferredResult 可以用于异步处理 HTTP 请求,并在处理完成后返回结果给客户端。

WebSocket

WebSocket 是 HTML5 开始提供的一种浏览器与服务器间进行全双工通信的网络技术。WebSocket 基于 TCP 双向全双工进行消息传递,在同一时刻,既可以发送消息,也可以接收消息。

WebSocket原理图:

alt

WebSocket特点:

  1. 单一的TCP连接,采用全双工模式通信;
  2. 对代理、防火墙和路由器透明;
  3. 无头部信息、Cookie和身份验证;
  4. 无安全开销;
  5. 通过“ping/pong”帧保持链路激活;
  6. 服务器可以主动传递消息给客户端,不再需要客户端轮询。

示例代码(netty实现websocket通信)

WebSocket服务端启动类:


@Component
@Slf4j
public class WebSocketServer {

    /**
     * webSocket协议名
     */
    private static final String WEBSOCKET_PROTOCOL = "WebSocket";

    /**
     * 端口号
     */
    @Value("${webSocket.netty.port:58080}")
    private int port;

    /**
     * webSocket路径
     */
    @Value("${webSocket.netty.path:/webSocket}")
    private String webSocketPath;

    @Autowired
    private WebSocketHandler webSocketHandler;

    private EventLoopGroup bossGroup;
    private EventLoopGroup workGroup;

    /**
     * 启动
     *
     * @throws InterruptedException
     */
    private void start() throws InterruptedException {
        bossGroup = new NioEventLoopGroup();
        workGroup = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        // bossGroup负责客户端的tcp连接请求, workGroup负责与客户端之前的读写操作
        bootstrap.group(bossGroup, workGroup);
        // 设置NIO类型的channel
        bootstrap.channel(NioServerSocketChannel.class);
        // 设置监听端口
        bootstrap.localAddress(new InetSocketAddress(port));
        // 连接到达时会创建一个通道
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                // 流水线管理通道中的处理程序(Handler),用来处理业务
                // webSocket协议本身是基于http协议的,所以这边也要使用http编解码器
                ch.pipeline().addLast(new HttpServerCodec());
                ch.pipeline().addLast(new ObjectEncoder());
                // 以块的方式来写的处理器
                ch.pipeline().addLast(new ChunkedWriteHandler());
                //将收到的 HTTP 请求或响应的多个部分合并成一个完整的对象
                ch.pipeline().addLast(new HttpObjectAggregator(8192));
                /*
                说明:
                1、对应webSocket,它的数据是以帧(frame)的形式传递
                2、浏览器请求时 ws://localhost:58080/xxx 表示请求的uri
                3、核心功能是将http协议升级为ws协议,保持长连接
                */
                ch.pipeline().addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
                ch.pipeline().addLast(new IdleStateHandler(10, 0, 0));
                ch.pipeline().addLast(new HeartBeatHandler());
                // 自定义的handler,处理业务逻辑
                ch.pipeline().addLast(webSocketHandler);

            }
        });
        // 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
        ChannelFuture channelFuture = bootstrap.bind().sync();
        log.info("Server started and listen on:{}", channelFuture.channel().localAddress());
        // 对关闭通道进行监听
        channelFuture.channel().closeFuture().sync();
    }

    /**
     * 释放资源
     *
     * @throws InterruptedException
     */
    @PreDestroy
    public void destroy() throws InterruptedException {
        if (bossGroup != null) {
            bossGroup.shutdownGracefully().sync();
        }
        if (workGroup != null) {
            workGroup.shutdownGracefully().sync();
        }
    }

    @PostConstruct
    public void init() {
        //需要开启一个新的线程来执行netty server 服务器
        new Thread(() -> {
            try {
                start();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

业务逻辑处理器:


@Component
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        // 接收到 WebSocket 文本消息
        System.out.println("Received message: " + msg.text());
        // 响应 WebSocket 文本消息
        ctx.writeAndFlush(new TextWebSocketFrame("Received your message: " + msg.text()));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 异常处理
        cause.printStackTrace();
        ctx.close();
    }
}

SSE

SSE(Server-Sent Events,服务器发送事件)是一种用于实现服务器向客户端单向推送数据的技术。它允许服务器端在任何时候发送数据到客户端,而客户端不需要发起请求。SSE 基于 HTTP 协议,使用简单的文本格式进行通信,通常被用于实时更新网页内容、实时通知等场景。

SSE 的工作原理如下:

  1. 客户端向服务器发送一个 HTTP 请求,请求的头部包含 Accept: text/event-stream 表示接受 SSE 格式的响应。
  2. 服务器接收到请求后,保持连接打开,并在连接上周期性地发送消息给客户端。每个消息都以 data: 开头,并以两个换行符 \n\n 结束。
  3. 客户端接收到消息后,将其通过事件监听器处理。
SSE原理图:
alt

代码示例


@Controller
public class SSEController {

    @GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<String>> sse() {
        return Flux.interval(Duration.ofSeconds(10))
                .map(sequence -> ServerSentEvent.<String>builder()
                        .id(String.valueOf(sequence))
                        .event("message")
                        .data("Hello SSE - " + LocalTime.now())
                        .build());
    }
}

Postman 响应示例:

alt

长轮询、WebSocket、SSE 对比

1. 服务器资源消耗:
  • 长轮询:服务器需要为每个客户端请求保持一个开放的连接,直到有数据发送。这导致服务器资源(如内存和连接槽)的消耗,特别是在高并发场景下。
  • WebSocket:建立后,WebSocket 提供了一个持久的、全双工的连接通道。虽然它也占用服务器资源,但由于其连接是持久的,所以不需要频繁地创建和销毁连接,相对于长轮询,这可以减少资源消耗和延迟。
  • SSE:SSE 也保持开放的连接,但只支持单向通信(服务器到客户端)。与长轮询相比,SSE 通过减少连接的建立和销毁次数来优化资源使用,但对于每个客户端,它仍然占用一个连接。
2. 网络延迟和效率:
  • 长轮询:每次请求可能在服务器有数据可发送之前保持打开状态,这可能导致网络延迟。
  • WebSocket:一旦建立,消息可以几乎无延迟地在客户端和服务器之间传输,提高了效率和实时性。
  • SSE:由于连接持续开放,SSE 可以实现低延迟的服务器到客户端消息传输,但不支持客户端到服务器的实时通信。
3. 实现复杂性:
  • 长轮询:相对简单,不需要特殊的协议支持,但服务器端需要逻辑来管理多个持续的请求。
  • WebSocket:需要在客户端和服务器端实现WebSocket协议,比长轮询实现复杂,连接的建立、错误的处理和断开连接时的重连等机制都需要考虑。
  • SSE:客户端实现相对简单,主要的复杂性在于服务器端,需要支持HTTP/1.1的持久连接。

XMPP

XMPP (Extensible Messaging and Presence Protocol) 是一个支持消息传递和状态显示的开放即时通讯协议。它实现了客户端与服务器之间的双向通信,并可以通过扩展以适应多样的即时通讯服务需求。基于 XML (Extensible Markup Language) 和 TCP/IP 协议构建,XMPP 特点包括灵活性、可扩展性和分布式架构。

XMPP设计的网络结构中定义了3类通信实体:

  • 客户端
  • 服务器
  • 网关

XMPP中基本的通信基于传统的CS模式,即客户端通过TCP/IP连接到服务器,然后通过传输XML流进行通信。

XMPP的系统原理图:

网图
网图

MQTT

MQTT(Message Queuing Telemetry Transport)是一个轻量级的消息协议,专为低带宽和不可靠网络环境设计,广泛应用于物联网(IoT)、移动应用等场景。基于发布/订阅模型,它允许设备发布消息到主题,同时允许其他设备订阅这些主题以接收消息。MQTT运行于TCP/IP协议之上,提供了一种简单有效的方式来进行设备间的通信。

MQTT原理图:

alt

MQTT 和 XMPP 都需要额外的服务器来进行消息通信,这些服务器通常被称为 MQTT 代理(broker)和 XMPP 服务器。

总结

MQTT 和 XMPP 需要中间服务器(分别是 MQTT 代理和 XMPP 服务器)来处理消息的路由、传递和存储,增加了部署的复杂性,并需要确保中间件服务的高可用性。

相比之下,SSE(Server-Sent Events)和 WebSocket 提供了更直接的通信方式,允许服务器和客户端之间建立持久的连接。这两种技术直接基于现有的 HTTP/HTTPS 协议,可以利用现有的 Web 服务器架构进行部署,从而减少了额外的中间件需求。

在复杂性方面,MQTT 和 XMPP 要求开发者具备对相应协议的深入了解,相较于 SSE 和 WebSocket 的简单 API 来说,实现起来会相对复杂,但这些协议也提供了更丰富的灵活性。

总的来说,选择合适的技术实现取决于业务需求和现有架构,各种技术都有其适用的场景。

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/770967.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

牛客周赛 Round 38(A,B,C,D,E,F,G)

比赛链接 官方讲解&#xff08;不分P不分段直接两小时怼上来是坏文明 &#xff09; 这场的题很棒&#xff0c;思维有难度&#xff0c;考察的知识点广泛&#xff0c;有深度&#xff0c;很透彻。感觉学到了很多。建议补题。 A 小红的正整数自增 思路&#xff1a; 签到。 可以…

6.5 Batch Normalization

在训练神经网络时&#xff0c;往往需要标准化&#xff08;normalization&#xff09;输入数据&#xff0c;使得网络的训练更加快速和有效。 然而SGD&#xff08;随机梯度下降&#xff09;等学习算法会在训练中不断改变网络的参数&#xff0c;隐藏层的激活值的分布会因此发生变…

VTK对属性参数的设置,以及用vtkFieldData存储属性数据的方法

数据集的属性&#xff08;属性数据&#xff09;是与数据集结构相关联的。而数据集又是建立在点和单元的基础上&#xff0c;所以数据属性很自然地是与点和单元相关联。即每个点或每个单元都有与其对应的数据属性。 数据属性的值称为属性数据。属性数据一般设置为一些有实际意义的…

学生宿舍智能控电柜安装调试技术

学生宿舍智能控电柜安装调试石家庄光大远通电器有限公司宿舍控电限电管理系统是一种用于管理学生宿舍用电的智能系统&#xff0c;主要功能包括: 1.实时监控和控制:该系统能够实时监测和记录宿舍的用电情况&#xff0c;包括电器使用情况、电量消耗等。管理人员可以通过电脑或手机…

探索酷开科技独特魅力|加入酷开会员让观影之旅更畅快|酷开系统

你是否渴望一场震撼心灵的观影之旅&#xff1f;不妨走进酷开系统的世界&#xff0c;徜徉在剧集的海洋&#xff0c;满足无限观影的渴望&#xff01;还在担心剧荒吗&#xff1f;还在为无聊的周末发愁吗酷开系统为你赶走无聊&#xff0c;它拥有海量的影视资源&#xff0c;4大片库、…

nginx详解(持续更新)

nginx定义 nginx安装 nginx目录 程序相关命令 服务相关命令 虚拟主机&#xff08;server&#xff09; 路由匹配&#xff08;location&#xff09; 代理&#xff08;proxy_pass&#xff09; 正向代理 反向代理 负载均衡&#xff08;upstream&#xff09; 负载均衡策略 动静分…

C++ 简单模拟实现 STL 中的 list 与 queue

目录 一&#xff0c;list 1&#xff0c; list 的节点与迭代器 2&#xff0c;list 的数据结构、一些简单的功能、构造函数 3&#xff0c;list 的对元素操作 4&#xff0c;C 11 的一些功能 5&#xff0c;完整代码&#xff1a; 二&#xff0c;queue 一&#xff0c;list std…

开源 OLAP 及其在不同场景下的需求

目录 一、开源 OLAP 综述 二、OLAP场景思考 2.1 面向客户的报表 2.2 面向经营的报表 2.3 末端运营分析 2.4 用户画像 2.5 订单分析 2.6 OLAP技术需求思考 三、开源数据湖/流式数仓解决方案 3.1 离线数仓体系——Lambda架构 3.2 实时数据湖解决方案 3.3 实时分析解决…

Java毕业设计-基于springboot开发的校园台球厅人员与设备管理系统-毕业论文+答辩PPT(附源代码+演示视频)

文章目录 前言一、毕设成果演示&#xff08;源代码在文末&#xff09;二、毕设摘要展示1、开发说明2、需求分析3、系统功能结构 三、系统实现展示1、系统功能模块2、管理员功能模块3、用户功能模块 四、毕设内容和源代码获取总结 Java毕业设计-基于springboot开发的校园台球厅人…

Linux-1.常见指令以及权限理解

目录 本节目标 使用 XShell 远程登录 Linux 关于 Linux 桌面 下载安装 XShell 查看 Linux 主机 ip 使用 XShell 登陆主机 XShell 下的复制粘贴 Linux下基本指令 登录Linux服务器 新建多用户 全屏 1.快速认识5~6个命令 2.详细谈论课件的所有指令 01. ls 指令 02…

初识redis(一)

前言 引用的是这本书的原话 Redis[1]是一种基于键值对&#xff08;key-value&#xff09;的NoSQL数据库&#xff0c;与很多键值对数据库不同的是&#xff0c;Redis中的值可以是由string&#xff08;字符串&#xff09;、hash&#xff08;哈希&#xff09;、list&#xff08;列…

33.HarmonyOS App(JAVA)鸿蒙系统app数据库增删改查

33.HarmonyOS App(JAVA)鸿蒙系统app数据库增删改查 关系数据库 关系对象数据库&#xff08;ORM&#xff09; 应用偏好数据库 分布式数据库 关系型数据库&#xff08;Relational Database&#xff0c;RDB&#xff09;是一种基于关系模型来管理数据的数据库。HarmonyOS关系型…

Pandas与Jupyter Notebook的完美结合【第153篇—数据分析】

&#x1f47d;发现宝藏 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。【点击进入巨牛的人工智能学习网站】。 利用Python进行数据分析&#xff1a;Pandas与Jupyter Notebook的完美结合 在数据科学和分析…

职场口才提升之道

职场口才提升之道 在职场中&#xff0c;口才的重要性不言而喻。无论是与同事沟通协作&#xff0c;还是向上级汇报工作&#xff0c;亦或是与客户洽谈业务&#xff0c;都需要具备良好的口才能力。一个出色的职场人&#xff0c;除了拥有扎实的专业技能外&#xff0c;还应具备出色…

web自动化测试系列-selenium的安装和运行(一)

目录 web自动化系列之如何安装selenium 1.web自动化中的三大亮点技术 2.web自动化能解决什么问题 &#xff1f; 3.为什么是selenium ? 4.selenium特点 5.selenium安装 6.下载浏览器及驱动 7.测试代码 web自动化系列之如何安装selenium web自动化 &#xff0c;一个老生…

【C++】vector介绍

个人主页 &#xff1a; zxctscl 如有转载请先通知 文章目录 1. 前言2. vector的介绍3. Member functions3.1 (constructor)3.2 (destructor) 4. Capacity4.1 resize4.2 reserve4.3 shrink_to_fit 5. vector 增删查改5.1 push_back5.2 insert5.3 pop_back5.4 find5.5 erase 1. 前…

【C++】模板与泛型编程

文章目录 1. 泛型编程2. 函数模板2.1 函数模板概念2.2 函数模板格式2.3 函数模板的原理2.4 函数模板的实例化2.5 模板参数的匹配原则 3. 类模板3.1 类模板的定义格式3.2 类模板的实例化 4. 非类型模板参数5. 模板的特化5.1 概念5.2 函数模板特化5.3 全特化5.4 偏特化5.5 类模板…

Docker 搭建Redis集群

目录 1. 3主3从架构说明 2. 3主3从Redis集群配置 2.1关闭防火墙启动docker后台服务 2.2 新建6个docker容器实例 2.3 进去任意一台redis容器&#xff0c;为6台机器构建集群关系 2.4 进去6381&#xff0c;查看集群状态 3. 主从容错切换迁移 3.1 数据读写存储 3.1.1 查看…

【JDBC编程】基于MySql的Java应用程序中访问数据库与交互数据的技术

꒰˃͈꒵˂͈꒱ write in front ꒰˃͈꒵˂͈꒱ ʕ̯•͡˔•̯᷅ʔ大家好&#xff0c;我是xiaoxie.希望你看完之后,有不足之处请多多谅解&#xff0c;让我们一起共同进步૮₍❀ᴗ͈ . ᴗ͈ აxiaoxieʕ̯•͡˔•̯᷅ʔ—CSDN博客 本文由xiaoxieʕ̯•͡˔•̯᷅ʔ 原创 CSDN …

java switch用法

满足那个条件&#xff0c;就从那个入口进入&#xff0c;没有break就继续&#xff08;是这样设计的&#xff0c;需要自己加break;&#xff09;&#xff0c;一般都是要加break的。 switch (表达式) 表达式只能是【整型、char、String.】 import java.util.Scanner;public class…