【Easylive】视频在线人数统计系统实现详解 WebSocket 及其在在线人数统计中的应用

【Easylive】项目常见问题解答(自用&持续更新中…) 汇总版

视频在线人数统计系统实现详解

1. 系统架构概述

您实现的是一个基于Redis的视频在线人数统计系统,主要包含以下组件:

  1. 心跳上报接口:客户端定期调用以维持在线状态
  2. Redis存储结构:使用两种键存储在线信息
  3. 过期监听机制:通过Redis的键过期事件自动减少在线人数
  4. 计数维护逻辑:确保在线人数的准确性

2. 核心实现细节

2.1 数据结构设计

系统使用了两种Redis键:

  1. 用户播放键 (userPlayOnlineKey)
    • 格式:video:play:user:{fileId}:{deviceId}
    • 作用:标记特定设备是否在线
    • 过期时间:8秒

  2. 在线计数键 (playOnlineCountKey)
    • 格式:video:play:online:{fileId}
    • 作用:存储当前视频的在线人数
    • 过期时间:10秒

2.2 心跳上报流程 (reportVideoPlayOnline)

public Integer reportVideoPlayOnline(String fileId, String deviceId) {// 构造Redis键String userPlayOnlineKey = String.format(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER, fileId, deviceId);String playOnlineCountKey = String.format(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_ONLINE, fileId);// 新用户上线处理if (!redisUtils.keyExists(userPlayOnlineKey)) {// 设置用户键(8秒过期)redisUtils.setex(userPlayOnlineKey, fileId, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 8);// 增加在线计数(10秒过期)return redisUtils.incrementex(playOnlineCountKey, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 10).intValue();}// 已有用户续期处理redisUtils.expire(playOnlineCountKey, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 10);redisUtils.expire(userPlayOnlineKey, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 8);// 返回当前在线人数Integer count = (Integer) redisUtils.get(playOnlineCountKey);return count == null ? 1 : count;
}

工作流程

  1. 客户端每5-7秒调用一次/reportVideoPlayOnline接口
  2. 服务端检查用户键是否存在:
    • 不存在:创建用户键(8秒过期),增加计数键(10秒过期)
    • 存在:续期两个键的过期时间
  3. 返回当前在线人数

2.3 过期监听机制 (RedisKeyExpirationListener)

@Override
public void onMessage(Message message, byte[] pattern) {String key = message.toString();// 只处理用户播放键的过期事件if (!key.startsWith(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_ONLINE_PREIFX + Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER_PREFIX)) {return;}// 从key中提取fileIdInteger userKeyIndex = key.indexOf(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER_PREFIX) + Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER_PREFIX.length();String fileId = key.substring(userKeyIndex, userKeyIndex + Constants.LENGTH_20);// 减少对应视频的在线计数redisComponent.decrementPlayOnlineCount(String.format(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_ONLINE, fileId));
}

工作流程

  1. Redis在用户键(8秒)过期时发送通知
  2. 监听器收到通知后:
    • 验证是否为用户播放键
    • 从键名中提取视频ID(fileId)
    • 减少对应视频的在线计数

2.4 计数递减逻辑 (decrementPlayOnlineCount)

public void decrementPlayOnlineCount(String key) {redisUtils.decrement(key);
}

作用:简单地减少指定键的计数值

3. 关键设计原理

3.1 双键设计的意义

  1. 用户播放键
    • 作为"心跳"存在的证据
    • 过期时间(8秒)短于计数键(10秒),确保先检测到用户离线

  2. 在线计数键
    • 集中存储当前在线人数
    • 稍长的过期时间防止误删

3.2 时间参数设计

8秒用户键过期:假设客户端每5-7秒上报一次,8秒确保能检测到中断
10秒计数键过期:比用户键多2秒,防止竞态条件
客户端上报频率:建议5-7秒一次,平衡准确性和服务器压力

3.3 容错机制

  1. 计数键续期:每次心跳都会延长计数键的过期时间
  2. 空值处理:当计数键不存在时返回1作为默认值
  3. 精确递减:只在用户键过期时才减少计数,避免重复递减

4. 工作流程图

Client Server Redis Listener 心跳上报(fileId, deviceId) 检查user:{fileId}:{deviceId}存在? 不存在 设置user键(8s) 增加online计数(10s) 存在 续期user键(8s) 续期online键(10s) alt [新用户] [已有用户] 返回当前在线人数 user键过期通知 减少对应online计数 loop [过期监听] Client Server Redis Listener

5. 系统优势

  1. 实时性高:秒级检测用户离线
  2. 性能优异:完全基于Redis内存操作
  3. 扩展性强:轻松支持大量并发用户
  4. 准确可靠:双重验证机制防止误计数
  5. 资源节约:自动清理不活跃用户的记录

6. 潜在优化方向

  1. 批量上报:允许客户端一次上报多个视频的状态
  2. 分布式锁:在极高并发下保证计数准确
  3. 异常处理:增加Redis操作失败的重试机制
  4. 监控指标:添加在线人数变化的监控和报警
  5. 动态过期:根据系统负载动态调整过期时间

这个实现很好地平衡了准确性、实时性和性能要求,是一个非常典型的在线人数统计解决方案。

WebSocket 及其在在线人数统计中的应用

WebSocket 基础介绍

什么是 WebSocket?

WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,它使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。

与传统 HTTP 轮询的区别

特性WebSocketHTTP 轮询
连接方式持久化连接每次请求新建连接
通信方向全双工半双工
实时性毫秒级依赖轮询间隔(通常秒级)
服务器推送支持不支持
资源消耗连接初期开销大,后期开销小每次请求都有完整HTTP开销
适用场景高实时性应用实时性要求不高的应用

基于 WebSocket 的在线人数统计实现

系统架构设计

客户端A ──┐├─── WebSocket 服务器 ─── Redis 集群
客户端B ──┘           ││数据库(持久化)

核心实现代码

1. WebSocket 服务端实现 (Spring Boot)
@ServerEndpoint("/online/{videoId}")
@Component
public class VideoOnlineEndpoint {private static ConcurrentMap<String, Set<Session>> videoSessions = new ConcurrentHashMap<>();private static RedisTemplate<String, String> redisTemplate;@Autowiredpublic void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {VideoOnlineEndpoint.redisTemplate = redisTemplate;}@OnOpenpublic void onOpen(Session session, @PathParam("videoId") String videoId) {// 添加会话到视频组videoSessions.computeIfAbsent(videoId, k -> ConcurrentHashMap.newKeySet()).add(session);// 更新Redis计数String redisKey = "video:online:" + videoId;redisTemplate.opsForValue().increment(redisKey);redisTemplate.expire(redisKey, 10, TimeUnit.MINUTES);// 广播更新后的在线人数broadcastOnlineCount(videoId);}@OnClosepublic void onClose(Session session, @PathParam("videoId") String videoId) {// 从视频组移除会话Set<Session> sessions = videoSessions.get(videoId);if (sessions != null) {sessions.remove(session);// 更新Redis计数String redisKey = "video:online:" + videoId;redisTemplate.opsForValue().decrement(redisKey);// 广播更新后的在线人数broadcastOnlineCount(videoId);}}@OnErrorpublic void onError(Session session, Throwable error) {error.printStackTrace();}private void broadcastOnlineCount(String videoId) {String count = redisTemplate.opsForValue().get("video:online:" + videoId);String message = "ONLINE_COUNT:" + (count != null ? count : "0");Set<Session> sessions = videoSessions.get(videoId);if (sessions != null) {sessions.forEach(session -> {try {session.getBasicRemote().sendText(message);} catch (IOException e) {e.printStackTrace();}});}}
}
2. 客户端实现 (JavaScript)
const videoId = '12345'; // 当前观看的视频ID
const socket = new WebSocket(`wss://yourdomain.com/online/${videoId}`);// 连接建立时
socket.onopen = function(e) {console.log("WebSocket连接已建立");
};// 接收消息
socket.onmessage = function(event) {if(event.data.startsWith("ONLINE_COUNT:")) {const count = event.data.split(":")[1];updateOnlineCountDisplay(count);}
};// 连接关闭时
socket.onclose = function(event) {if (event.wasClean) {console.log(`连接正常关闭,code=${event.code} reason=${event.reason}`);} else {console.log('连接异常断开');// 尝试重新连接setTimeout(() => connectWebSocket(), 5000);}
};// 错误处理
socket.onerror = function(error) {console.log(`WebSocket错误: ${error.message}`);
};function updateOnlineCountDisplay(count) {document.getElementById('online-count').innerText = count;
}
3. 心跳机制实现
// 客户端心跳
setInterval(() => {if(socket.readyState === WebSocket.OPEN) {socket.send("HEARTBEAT");}
}, 30000); // 30秒发送一次心跳// 服务端心跳检测 (Java)
@ServerEndpoint配置中添加:
@OnMessage
public void onMessage(Session session, String message) {if("HEARTBEAT".equals(message)) {session.getAsyncRemote().sendText("HEARTBEAT_ACK");}
}

方案优势分析

  1. 实时性极佳
    • 在线人数变化可实时推送到所有客户端
    • 无轮询延迟,通常达到毫秒级更新

  2. 精确计数
    • 基于实际连接状态计数
    • 避免Redis过期时间的估算误差

  3. 扩展功能容易
    • 可轻松扩展实现弹幕、实时评论等功能
    • 支持复杂的互动场景

  4. 减少无效请求
    • 相比HTTP轮询减少90%以上的请求量
    • 显著降低服务器压力

潜在挑战与解决方案

1. 连接保持问题

问题:移动网络不稳定导致频繁断开

解决方案
• 实现自动重连机制
• 使用心跳包检测连接状态
• 设置合理的超时时间

2. 大规模并发问题

问题:单视频热点导致连接数激增

解决方案
• 使用WebSocket集群
• 引入负载均衡(如Nginx)
• 实现连接分片策略

3. 状态同步问题

问题:集群环境下状态同步

解决方案
• 使用Redis Pub/Sub同步各节点状态
• 采用一致性哈希分配连接
• 实现分布式会话管理

性能优化建议

  1. 协议优化
    • 启用WebSocket压缩扩展
    • 使用二进制协议替代文本协议

  2. 资源控制
    • 实现连接数限制
    • 设置单个IP连接限制

  3. 监控体系
    • 建立连接数监控
    • 实现异常连接报警

  4. 优雅降级
    • WebSocket不可用时自动降级为长轮询
    • 提供兼容性方案

与传统方案的对比

指标WebSocket方案Redis键过期方案
实时性毫秒级秒级(依赖过期时间)
精确度100%准确有1-2秒延迟
实现复杂度较高较低
服务器负载连接初期高,维持期低持续中等负载
扩展性容易扩展其他实时功能仅限于计数
客户端兼容性需现代浏览器支持所有环境兼容
移动端表现可能因网络切换断开不受影响

适用场景建议

推荐使用WebSocket方案当:
• 需要实时显示精确在线人数
• 已经使用或计划使用WebSocket实现其他功能(如弹幕、聊天)
• 客户端环境可控(如自己的APP或现代浏览器)
• 有足够资源维护WebSocket基础设施

推荐保持Redis方案当:
• 实时性要求不是极高(秒级可接受)
• 需要支持老旧客户端
• 系统规模较小,希望简单维护
• 主要关注计数而非实时交互

混合方案设计

结合两种方案优势的折中实现:

// WebSocket连接时更新精确计数
@OnOpen
public void onOpen(Session session, @PathParam("videoId") String videoId) {// 更新内存中的精确计数incrementLocalCount(videoId);// 每10秒同步到Redis一次if(needSyncToRedis(videoId)) {redisTemplate.opsForValue().set("video:online:" + videoId, getLocalCount(videoId).toString());}
}// 对外提供查询接口
@GetMapping("/online/{videoId}")
public int getOnlineCount(@PathVariable String videoId) {// 优先返回本地精确计数Integer localCount = getLocalCount(videoId);if(localCount != null) {return localCount;}// 回退到Redis计数String count = redisTemplate.opsForValue().get("video:online:" + videoId);return count != null ? Integer.parseInt(count) : 0;
}

这种混合方案:
• 对WebSocket客户端提供精确计数
• 对非WebSocket客户端提供近似的Redis计数
• 平衡了精确性和兼容性

查看在线观看人数

通过轮询上报心跳,在服务端记录设备有没有不停地上报心跳,如果没有上报心跳,通过 Redis 的 key 的失效,会有一个通知没有再上报心跳,就会把在线人数 -1。

Redis在线人数统计实现详解

以下是带有详细注释的代码实现,解释了基于Redis的在线人数统计系统的工作原理:

/*** 客户端上报心跳接口* @param fileId 视频文件ID* @param deviceId 设备唯一标识* @return 当前在线人数*/
@RequestMapping("/reportVideoPlayOnline")
public ResponseVO reportVideoPlayOnline(@NotEmpty String fileId, @NotEmpty String deviceId){// 调用Redis组件处理心跳上报,并返回成功响应return getSuccessResponseVO(redisComponent.reportVideoPlayOnline(fileId, deviceId));
}/*** 处理视频在线人数统计的核心方法* @param fileId 视频文件ID* @param deviceId 设备唯一标识* @return 当前在线人数*/
public Integer reportVideoPlayOnline(String fileId, String deviceId){// 构建Redis键:用户级别的键,用于标记特定设备是否在线String userPlayOnlineKey = String.format(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER, fileId, deviceId);// 构建Redis键:视频级别的键,用于存储当前视频的总在线人数String playOnlineCountKey = String.format(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_ONLINE, fileId);// 检查是否是新的观看用户(该设备首次上报或已过期)if (!redisUtils.keyExists(userPlayOnlineKey)) {// 设置用户键,8秒后过期(如果8秒内没有下次心跳,则认为离线)redisUtils.setex(userPlayOnlineKey, fileId, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 8);// 增加视频的总在线人数计数,并设置10秒过期时间return redisUtils.incrementex(playOnlineCountKey, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 10).intValue();}// 以下是已有用户的处理逻辑:// 续期视频的总在线人数键(10秒)redisUtils.expire(playOnlineCountKey, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 10);// 续期用户级别的键(8秒)redisUtils.expire(userPlayOnlineKey, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 8);// 获取当前在线人数(防止并发问题导致的计数不准确)Integer count = (Integer) redisUtils.get(playOnlineCountKey);// 如果获取不到计数(极端情况),默认返回1return count == null ? 1 : count;
}/*** 减少在线人数计数* @param key 需要减少计数的Redis键*/
public void decrementPlayOnlineCount(String key) {// 对指定键的值进行原子递减redisUtils.decrement(key);
}/*** Redis键过期监听器,用于处理用户离线情况*/
@Component
@Slf4j
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {@Resourceprivate RedisComponent redisComponent;public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);}/*** 处理Redis键过期事件* @param message 过期消息* @param pattern 模式*/@Overridepublic void onMessage(Message message, byte[] pattern) {// 获取过期的键名String key = message.toString();// 只处理用户级别的在线状态键过期事件if (!key.startsWith(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_ONLINE_PREIFX + Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER_PREFIX)) {return;}// 从键名中提取视频ID// 计算用户键前缀的长度Integer userKeyIndex = key.indexOf(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER_PREFIX) + Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER_PREFIX.length();// 截取视频ID(假设ID长度为20)String fileId = key.substring(userKeyIndex, userKeyIndex + Constants.LENGTH_20);// 减少对应视频的在线人数计数redisComponent.decrementPlayOnlineCount(String.format(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_ONLINE, fileId));}
}

系统工作流程详解

  1. 心跳上报机制
    • 客户端每隔5-7秒调用/reportVideoPlayOnline接口上报心跳
    • 服务端通过Redis记录设备最后一次活跃时间

  2. 双键设计原理
    用户键(userPlayOnlineKey)
    ◦ 格式:video:play:user:{fileId}:{deviceId}
    ◦ 作用:标记特定设备是否在线
    ◦ 过期时间:8秒(如果8秒内没有心跳则认为离线)
    计数键(playOnlineCountKey)
    ◦ 格式:video:play:online:{fileId}
    ◦ 作用:存储当前视频的总在线人数
    ◦ 过期时间:10秒(比用户键稍长,防止竞态条件)

  3. 新用户上线处理

    if (!redisUtils.keyExists(userPlayOnlineKey)) {redisUtils.setex(userPlayOnlineKey, fileId, 8);return redisUtils.incrementex(playOnlineCountKey, 10);
    }
    

    • 当用户键不存在时,创建用户键并增加总计数

  4. 已有用户续期处理

    redisUtils.expire(playOnlineCountKey, 10);
    redisUtils.expire(userPlayOnlineKey, 8);
    

    • 续期两个键的过期时间,保持活跃状态

  5. 离线检测机制
    • 当用户键8秒过期时,触发RedisKeyExpirationListener
    • 监听器从键名提取videoId,减少对应视频的在线计数

  6. 容错处理

    Integer count = (Integer) redisUtils.get(playOnlineCountKey);
    return count == null ? 1 : count;
    

    • 防止极端情况下计数键丢失,返回默认值1

设计优势分析

  1. 精确计数:基于实际心跳而非估算,结果准确
  2. 自动清理:通过Redis过期机制自动清理不活跃用户
  3. 低延迟:键过期通知机制实现秒级离线检测
  4. 高性能:完全基于内存操作,无数据库IO
  5. 可扩展:Redis集群支持横向扩展

关键参数说明

参数说明
用户键过期时间8秒客户端应每5-7秒上报一次心跳
计数键过期时间10秒比用户键稍长,防止竞态条件
视频ID长度20需与业务系统保持一致

这个实现方案在保证准确性的同时,具有优秀的性能和可扩展性,非常适合中小规模的实时在线人数统计场景。

自看

通过Redis计数器来给视频的在线观看人数进行增加和减少,也就是通过心跳来不停上报当前用户是否正在观看,当浏览器关闭时,该用户就不会再持续上报心跳,此时该用户的Redis Key则会失效,Redis Key失效的时候会发送消息通知,根据这个消息通知得知失效,再去减少在线观看人数。

Netty与视频在线人数统计的结合

Netty基础介绍

Netty是一个异步事件驱动的网络应用框架,用于快速开发高性能、高可靠性的网络服务器和客户端程序。它基于Java NIO(Non-blocking I/O)构建,主要特点包括:

  1. 高性能:支持百万级并发连接
  2. 低延迟:非阻塞I/O模型减少等待时间
  3. 高扩展性:模块化设计,可灵活扩展
  4. 协议支持:内置HTTP、WebSocket、TCP/UDP等协议支持

为什么考虑用Netty实现在线人数统计?

当前基于HTTP轮询+Redis的实现存在以下可优化点:
HTTP开销大:每次轮询都需要完整的HTTP请求/响应头
实时性有限:依赖轮询间隔(通常秒级)
服务器压力:高并发时大量无效轮询请求

Netty可以解决这些问题,提供真正的实时通信能力

基于Netty的在线人数统计设计

系统架构

客户端App/Web ──▶ Netty服务器集群 ──▶ Redis集群││ (WebSocket/TCP长连接)▼
用户行为数据(心跳、上下线)

核心组件实现

1. Netty服务器初始化
public class VideoOnlineServer {public void start(int port) {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();// 心跳检测(15秒无读写则关闭连接)pipeline.addLast("idleStateHandler", new IdleStateHandler(15, 0, 0, TimeUnit.SECONDS));// 自定义协议解码/编码pipeline.addLast("decoder", new OnlineMessageDecoder());pipeline.addLast("encoder", new OnlineMessageEncoder());// 业务逻辑处理器pipeline.addLast("handler", new OnlineMessageHandler());}});ChannelFuture f = b.bind(port).sync();f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}
}
2. 消息处理器实现
public class OnlineMessageHandler extends SimpleChannelInboundHandler<OnlineMessage> {// 视频ID到Channel组的映射private static Map<String, ChannelGroup> videoGroups = new ConcurrentHashMap<>();@Overrideprotected void channelRead0(ChannelHandlerContext ctx, OnlineMessage msg) {switch (msg.getType()) {case CONNECT: // 连接初始化handleConnect(ctx, msg.getVideoId(), msg.getDeviceId());break;case HEARTBEAT: // 心跳handleHeartbeat(ctx, msg.getVideoId(), msg.getDeviceId());break;case DISCONNECT: // 主动断开handleDisconnect(ctx, msg.getVideoId(), msg.getDeviceId());break;}}private void handleConnect(ChannelHandlerContext ctx, String videoId, String deviceId) {// 加入视频频道组ChannelGroup group = videoGroups.computeIfAbsent(videoId, k -> new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));group.add(ctx.channel());// 更新Redis计数long count = RedisUtils.increment("video:online:" + videoId);// 广播新在线人数broadcastCount(videoId, count);}private void handleHeartbeat(ChannelHandlerContext ctx, String videoId, String deviceId) {// 更新设备最后活跃时间(Redis)RedisUtils.setex("device:active:" + videoId + ":" + deviceId, "1", 15); // 15秒过期// 可选择性返回当前人数ctx.writeAndFlush(new OnlineMessage(HEARTBEAT_ACK, getOnlineCount(videoId)));}
}
3. 客户端断连处理
@Override
public void channelInactive(ChannelHandlerContext ctx) {// 从所有视频组中移除该ChannelvideoGroups.values().forEach(group -> group.remove(ctx.channel()));// 更新Redis计数(需要维护设备到视频ID的映射)String deviceId = getDeviceId(ctx.channel());String videoId = getVideoId(ctx.channel());long count = RedisUtils.decrement("video:online:" + videoId);// 广播新人数broadcastCount(videoId, count);
}@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {// 处理空闲连接if (evt instanceof IdleStateEvent) {ctx.close(); // 关闭超时未心跳的连接}
}

与传统方案的对比

特性Netty实现方案HTTP轮询+Redis方案
实时性毫秒级依赖轮询间隔(通常秒级)
协议开销仅心跳数据(几十字节)完整HTTP头(通常几百字节)
服务器压力长连接维护,无重复握手每次轮询都新建连接
并发能力单机支持10万+连接受限于HTTP服务器性能
实现复杂度较高简单
移动网络适应性需处理频繁重连天然适应

关键设计考虑

  1. 连接管理
    • 使用ChannelGroup管理同视频的用户连接
    IdleStateHandler自动检测空闲连接

  2. 状态同步
    • Redis存储全局计数,避免Netty单点问题
    • 定期同步内存与Redis的数据

  3. 消息协议设计

    message OnlineMessage {enum Type {CONNECT = 0;HEARTBEAT = 1;DISCONNECT = 2;}Type type = 1;string videoId = 2;string deviceId = 3;int64 count = 4; // 用于服务端返回当前人数
    }
    
  4. 弹性设计
    • 客户端实现自动重连
    • 服务端优雅降级机制

性能优化技巧

  1. 对象池化:重用消息对象减少GC
  2. 零拷贝:使用CompositeByteBuf合并小数据包
  3. 事件循环:业务逻辑放入单独线程池
  4. 批量操作:合并Redis操作减少网络往返

适用场景建议

推荐使用Netty当:
• 需要真正的实时互动(如直播弹幕)
• 预期有超高并发(万级同时在线)
• 已经需要维护长连接(如游戏、IM)

保持当前方案当:
• 实时性要求不高
• 开发资源有限
• 客户端环境复杂(如需要支持老旧浏览器)

Netty方案虽然实现复杂度较高,但能为视频平台提供更实时、更高效的在线人数统计能力,并为未来扩展实时互动功能奠定基础。

Netty与WebSocket的关系及在实时统计中的应用

Netty和WebSocket是不同层次的技术,但它们可以紧密结合来构建高性能的实时通信系统。以下是它们的核心关系和在视频在线人数统计中的应用分析:

1. Netty与WebSocket的基础关系

维度NettyWebSocket二者关系
定位网络应用框架通信协议Netty是实现WebSocket协议的底层框架之一
层级传输层/应用层框架应用层协议Netty提供了对WebSocket协议的支持
功能处理TCP/UDP连接、编解码、并发等提供全双工通信能力Netty帮助高效实现WebSocket的通信能力
典型使用可作为WebSocket服务器的基础实现运行在Netty等框架之上开发者通过Netty API构建WebSocket服务

2. 技术栈组合原理

[WebSocket客户端] ←WebSocket协议→ [Netty WebSocket服务端] ←TCP→ [操作系统网络栈]
  1. 协议支持
    • Netty内置WebSocketServerProtocolHandler等组件
    • 自动处理WebSocket握手、帧编解码等底层细节

  2. 性能优势
    • Netty的Reactor线程模型优化WebSocket连接管理
    • 零拷贝技术提升WebSocket数据传输效率

  3. 扩展能力
    • 在WebSocket之上可添加自定义协议
    • 方便集成SSL/TLS等安全层

3. 在视频在线统计中的联合实现

基于Netty的WebSocket服务端示例

public class VideoWebSocketServer {public void start(int port) {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();// HTTP编解码器(用于WebSocket握手)pipeline.addLast(new HttpServerCodec());// 聚合HTTP请求pipeline.addLast(new HttpObjectAggregator(65536));// WebSocket协议处理器pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));// 自定义业务处理器pipeline.addLast(new OnlineStatsHandler());}});ChannelFuture f = b.bind(port).sync();f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}
}

在线统计业务处理器

public class OnlineStatsHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {// 视频频道组映射private static Map<String, ChannelGroup> videoGroups = new ConcurrentHashMap<>();@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {// 解析JSON消息:{"action":"heartbeat","videoId":"123"}JsonObject json = parseJson(msg.text());String videoId = json.getString("videoId");ChannelGroup group = videoGroups.computeIfAbsent(videoId, k -> new DefaultChannelGroup(ctx.executor()));switch (json.getString("action")) {case "join":group.add(ctx.channel());broadcastCount(videoId, group.size());break;case "heartbeat":// 更新Redis活跃记录redis.incr("active:" + videoId + ":" + ctx.channel().id());break;}}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {// 从所有组中移除并更新计数videoGroups.values().forEach(group -> {if (group.remove(ctx.channel())) {broadcastCount(getVideoId(ctx), group.size());}});}
}

4. 与传统HTTP轮询方案的对比

特性Netty+WebSocketHTTP轮询
连接方式1个持久连接频繁新建连接
头部开销握手后无冗余头每次请求都带完整HTTP头
实时性毫秒级依赖轮询间隔(通常秒级)
服务器压力连接数×心跳频率请求数×轮询频率
移动网络适应需处理网络切换天然适应
实现复杂度较高简单

5. 典型消息流程

  1. 连接建立

    客户端 → HTTP Upgrade请求 → Netty(完成WebSocket握手) → 建立持久连接
    
  2. 心跳维持

    // 客户端每10秒发送
    {"action":"heartbeat","videoId":"123","timestamp":1620000000}// 服务端响应
    {"type":"ack","online":1524}
    
  3. 人数推送

    // 服务端主动推送
    {"type":"stats","videoId":"123","online":1525,"change":1}
    

6. 性能优化关键点

  1. 连接管理
    • 使用ChannelGroup管理视频房间的订阅者
    • 配置合理的IdleStateHandler检测死连接

  2. 序列化优化

    // 使用二进制协议代替JSON
    pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
    pipeline.addLast(new ProtobufEncoder());
    
  3. 集群扩展

    // 使用Redis Pub/Sub同步各节点状态
    redis.subscribe("video:123", (channel, message) -> {broadcastToLocalClients(message);
    });
    
  4. 监控指标
    • 跟踪每个视频频道的连接数
    • 监控消息吞吐量和延迟

Netty与WebSocket的结合为实时统计提供了高并发、低延迟的解决方案,特别适合需要精确到毫秒级的在线人数统计场景,同时为未来扩展实时弹幕、即时消息等功能奠定了基础。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/75819.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux 高级命令与常见操作:文本处理、系统管理与网络调试

下面是一份针对已经熟悉 Linux 基础命令的用户所整理的「高级命令与常见操作」笔记&#xff0c;涵盖文本处理、系统管理、网络调试与其他常用的进阶技巧。请你审核下面笔记&#xff0c;检查是否有过时的内容&#xff0c;如有请进行替换&#xff0c;确保其符合现代化需求&#x…

使用MFC ActiveX开发KingScada控件(OCX)

最近有个需求&#xff0c;要在KingScada上面开发一个控件。 原来是用的WinCC&#xff0c;WinCC本身是支持调用.net控件&#xff0c;就是winform控件的&#xff0c;winform控件开发简单&#xff0c;相对功能也更丰富。奈何WinCC不是国产的。 话说KingScada&#xff0c;国产组态软…

QScrollArea 内部滚动条 QSS 样式失效问题及解决方案

在使用 Qt 进行 UI 开发时,我们经常希望通过 QSS(Qt Style Sheets)自定义控件的外观,比如为 QScrollArea 的内部滚动条设置特定的样式。然而,有开发者遇到了这样的问题:在 UI 设计器中预览 QSS 显示效果正常,但程序运行时却显示为系统默认样式。经过反复测试和调试,最终…

使用OpenSceneGraph生成3D数据格式文件

OpenSceneGraph (OSG) 提供了多种方式来生成和导出3D数据格式文件。以下是详细的生成方法和示例代码&#xff1a; 一、基本文件生成方法 1. 使用osgDB::writeNodeFile函数 这是最直接的生成方式&#xff0c;支持多种格式&#xff1a; #include <osgDB/WriteFile>osg:…

JMeter接口性能测试从入门到精通

前言&#xff1a; 本文主要介绍了如何利用jmter进行接口的性能测试 1.在测试计划中添加线程组 1.1.线程组界面中元素含义 如果点击循环次数为永远&#xff1a; 2.添加HTTP取样器 2.1.填写登录接口的各个参数 2.2.在线程组下面增加查看结果树 请求成功的情况&#xff1a; 请求…

C++抽卡模拟器

近日在学校无聊&#xff0c;写了个抽卡模拟器供大家娱乐。 代码实现以下功能&#xff1a;抽卡界面&#xff0c;抽卡判定、动画播放、存档。 1.抽卡界面及判定 技术有限&#xff0c;不可能做的和原神一样精致。代码如下&#xff08;注&#xff1a;这不是完整代码&#xff0c;…

详解相机的内参和外参,以及内外参的标定方法

1 四个坐标系 要想深入搞清楚相机的内参和外参含义&#xff0c; 首先得清楚以下4个坐标系的定义&#xff1a; 世界坐标系&#xff1a; 名字看着很唬人&#xff0c; 其实没什么大不了的&#xff0c; 这个就是你自己定义的某一个坐标系。 比如&#xff0c; 你把房间的某一个点定…

学透Spring Boot — 011. 一篇文章学会Spring Test

系列文章目录 这是学透Spring Boot的第11篇文章。更多系列文章请关注 CSDN postnull 用户的专栏 文章目录 系列文章目录Spring Test的依赖Spring Test的核心功能SpringBootTest 加载Spring上下文依赖注入有问题时Spring配置有问题时 WebMvcTest 测试Web层&#xff08;Controll…

Mysql 数据库编程技术01

一、数据库基础 1.1 认识数据库 为什么学习数据库 瞬时数据&#xff1a;比如内存中的数据&#xff0c;是不能永久保存的。持久化数据&#xff1a;比如持久化至数据库中或者文档中&#xff0c;能够长久保存。 数据库是“按照数据结构来组织、存储和管理数据的仓库”。是一个长…

新一代AI架构实践:数字大脑AI+智能调度MCP+领域执行APP的黄金金字塔体系

新一代AI架构实践&#xff1a;数字大脑智能调度领域执行的黄金金字塔体系 一、架构本质的三层穿透性认知 1.1 核心范式转变&#xff08;CPS理论升级&#xff09; 传统算法架构&#xff1a;数据驱动 → 特征工程 → 模型训练 → 业务应用 新一代AI架构&#xff1a;物理规律建…

macOS可视化桌面配置docker加速器

macOS可视化桌面配置docker加速器 在镜像settings->docker Engine改为国内镜像修改为国内镜像重启docker(可视化界面启动或者使用命令行)使用命令重启可视化界面重启 在镜像settings->docker Engine改为国内镜像 修改为国内镜像 {"registry-mirrors": ["…

Nginx 基础使用(2025)

一、Nginx目录结构 [rootlocalhost ~]# tree /usr/local/nginx /usr/local/nginx ├── client_body_temp # POST 大文件暂存目录 ├── conf # Nginx所有配置文件的目录 │ ├── fastcgi.conf # fastcgi相…

用spring-webmvc包实现AI(Deepseek)事件流(SSE)推送

前后端&#xff1a; Spring Boot Angular spring-webmvc-5.2.2包 代码片段如下&#xff1a; 控制层&#xff1a; GetMapping(value "/realtime/page/ai/sse", produces MediaType.TEXT_EVENT_STREAM_VALUE)ApiOperation(value "获取告警记录进行AI分析…

基于Python的招聘推荐数据可视化分析系统

【Python】基于Python的招聘推荐数据可视化分析系统&#xff08;完整系统源码开发笔记详细部署教程&#xff09;✅ 目录 一、项目简介二、项目界面展示三、项目视频展示 一、项目简介 &#x1f680;&#x1f31f; 基于Python的招聘推荐数据可视化分析系统&#xff01;&#x1…

使用注解开发springMVC

引言 在学习过第一个springMVC项目建造过后&#xff0c;让我们直接进入真实开发中所必需的注解开发&#xff0c; 是何等的简洁高效&#xff01;&#xff01; 注&#xff1a;由于Maven可能存在资源过滤的问题&#xff0c;在maven依赖中加入 <build><resources>&l…

linux专题3-----禁止SSH的密码登录

要在linux系统中禁止密码登录&#xff0c;您可以通过修改 SSH 配置来实现。请按照以下步骤操作(此处以 Ubuntu为例)&#xff1a; 1、SSH 登录到您的服务器&#xff08;或直接在命令行模式下&#xff09;。 2、备份 SSH 配置文件&#xff1a; 在终端中运行以下命令以备份现有的…

基于LangChain和通义(Tongyi)实现NL2SQL的智能检索(无需训练)

在数据驱动的时代,如何高效地从数据库中获取信息成为了一个重要的挑战。自然语言到SQL(NL2SQL)技术提供了一种便捷的解决方案,使用户能够用自然语言查询数据库,而无需深入了解SQL语法。本文将探讨如何利用LangChain和通义(Tongyi)实现NL2SQL的智能检索,具体步骤如下: …

深度学习处理文本(10)

保存自定义层 在编写自定义层时&#xff0c;一定要实现get_config()方法&#xff1a;这样我们可以利用config字典将该层重新实例化&#xff0c;这对保存和加载模型很有用。该方法返回一个Python字典&#xff0c;其中包含用于创建该层的构造函数的参数值。所有Keras层都可以被序…

机器视觉3D中激光偏镜的优点

机器视觉的3D应用中,激光偏镜(如偏振片、波片、偏振分束器等)通过其独特的偏振控制能力,显著提升了系统的测量精度、抗干扰能力和适应性。以下是其核心优点: 1. 提升3D成像精度 抑制环境光干扰:偏振片可滤除非偏振的环境杂光(如日光、室内照明),仅保留激光偏振信号,大…

线程同步的学习与应用

1.多线程并发 1).多线程并发引例 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <assert.h> #include <pthread.h>int wg0; void *fun(void *arg) {for(int i0;i<1000;i){wg;printf("wg%d\n",wg);} } i…