netty使用http和webSocket

1:pom.xml配置

		<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.73.Final</version></dependency>

2:Netty作为HTTP服务器

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;public class HttpServer {private final int port;public HttpServer(int port) {this.port = port;}public void start() throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();// 添加 HTTP 编解码器和自定义的ChannelHandlerp.addLast(new HttpServerCodec());p.addLast(new HttpObjectAggregator(1024 * 1024)); // 设置最大聚合大小为1MBp.addLast(new LargeJsonHandler());}}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);// 绑定端口,开始接受进来的连接ChannelFuture f = b.bind(port).sync();// 等待服务器 socket 关闭f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception {new HttpServer(8080).start();}
}class LargeJsonHandler extends SimpleChannelInboundHandler<FullHttpRequest> {@Overridepublic void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {if (HttpUtil.is100ContinueExpected(request)) {send100Continue(ctx);}ByteBuf content = request.content();String jsonStr = content.toString(CharsetUtil.UTF_8);// 在这里对 JSON 数据进行处理System.out.println(jsonStr);// 发送响应FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");response.content().writeBytes("OK".getBytes(CharsetUtil.UTF_8));response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());ctx.writeAndFlush(response);}private static void send100Continue(ChannelHandlerContext ctx) {FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);ctx.writeAndFlush(response);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {// 写一个空的buf,并刷新写出区域。完成后关闭sock channel连接。//ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}

注意:如果发送的JSO数据如果大于1M,是会分包发送的,每次发送都会执行channelReadComplete方法,所以不可以关闭通道,发送完数据才执行channelRead0方法

3:Netty作为webSocket服务器

package com.example.slave.netty.server;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;/*** @Description:* @Author: xu* @Data: 2024-2024/1/4-11* @Version: V1.0*/
public class CustomWebSocket {private final int port;public CustomWebSocket(int port) {this.port = port;}public void start() throws Exception {//设置用于连接的boss组, 可在构造器中定义使用的线程数  监听端口接收客户端连接,一个端口一个线程,然后转给worker组//boss组用于监听客户端连接请求,有连接传入时就生成连接channel传给worker,等worker 接收请求 io多路复用,EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup)//定义使用的通道 可以选择是NIO或者是OIO 代表了worker在处理socket channel时的不同情况。oio只能1对1, nio则没有1对1对关系//当netty要处理长连接时最好使用NIO,不然如果要保证效率 需要创建大量的线程,和io多路复用一致.channel(NioServerSocketChannel.class)//.channel(OioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();// 添加 HTTP 编解码器和自定义的ChannelHandlerp.addLast(new HttpServerCodec());p.addLast(new HttpObjectAggregator(1024 * 1024)); // 设置最大聚合大小为1MBp.addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10));p.addLast(new MyWebSocketHandler());}}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);// 绑定端口,开始接受进来的连接ChannelFuture f = b.bind(port).sync();// 等待服务器 socket 关闭f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}}
public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {public static ChannelGroup channelGroup;static {channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);}//客户端与服务器建立连接的时候触发,@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("与客户端建立连接,通道开启!");//添加到channelGroup通道组channelGroup.add(ctx.channel());}//客户端与服务器关闭连接的时候触发,@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("与客户端断开连接,通道关闭!");channelGroup.remove(ctx.channel());}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {// 写一个空的buf,并刷新写出区域。完成后关闭sock channel连接。ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// 关闭发生异常的连接ctx.close();}//服务器接受客户端的数据信息,@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg){System.out.println("服务器收到的数据:" + msg.text());//sendMessage(ctx);sendAllMessage();}//给固定的人发消息private void sendMessage(ChannelHandlerContext ctx) {String message = "你好,"+ctx.channel().localAddress()+" 给固定的人发消息";ctx.channel().writeAndFlush(new TextWebSocketFrame(message));}//发送群消息,此时其他客户端也能收到群消息private void sendAllMessage(){String message = "我是服务器,这里发送的是群消息";channelGroup.writeAndFlush( new TextWebSocketFrame(message));}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/597479.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Spring实战】20 Spring Data REST+JPA构建基础的RESTful API

文章目录 1. 基础概念1&#xff09;Spring Data REST2&#xff09;JPA&#xff08;Java Persistence API&#xff09; 2. 添加依赖3. 创建JPA实体4. 创建JPA Repository5. 启用Spring Data REST6. 启动服务7. 测试8. 总结 Spring Data REST 是 Spring Framework 生态系统中的一…

加密算法和身份认证

前瞻概念 在了解加密和解密的过程前&#xff0c;我们先了解一些基础概念 明文&#xff1a;加密前的消息叫 “明文” &#xff08;plain text&#xff09;密文: 加密后的文本叫 “密文” (cipher text)密钥: 只有掌握特殊“钥匙”的人&#xff0c;才能对加密的文本进行解密,这里…

【深度学习:(Contrastive Learning) 对比学习】深入浅出讲解对比学习

对比学习允许模型从未标记的数据中提取有意义的表示。通过利用相似性和不相似性&#xff0c;对比学习使模型能够在潜在空间中将相似的实例紧密地映射在一起&#xff0c;同时将那些不同的实例分开。这种方法已被证明在计算机视觉、自然语言处理 &#xff08;NLP&#xff09; 和强…

每天刷两道题——第六天

1.1字母异位词分组 给你一个字符串数组&#xff0c;将字母异位词组合在一起。可以按任意顺序返回结果列表。字母异位词指的是由重新排列源单词的所有字母得到的一个新单词。 输入: strs [“eat”, “tea”, “tan”, “ate”, “nat”, “bat”] 输出: [[“bat”],[“nat”,…

STM32和ESP8266的WiFi模块控制与数据传输

基于STM32和ESP8266 WiFi模块的控制与数据传输是一种常见的嵌入式系统应用。在这种应用中&#xff0c;STM32作为主控制器负责控制和与外部传感器交互&#xff0c;而ESP8266 WiFi模块则用于实现无线通信和数据传输。本文将介绍如何在STM32上控制ESP8266模块&#xff0c;建立WiFi…

如何定义封装全局组件

这里以封装的svg组件为例 在src文件夹目录下创建一个index.ts文件&#xff1a;用于注册components文件夹内部全部全局组件&#xff01;&#xff01;&#xff01; import SvgIcon from ./SvgIcon/index.vue; import type { App, Component } from vue; const components: { [na…

3D 纹理的综合指南

在线工具推荐&#xff1a;3D数字孪生场景编辑器 - GLTF/GLB材质纹理编辑器 - 3D模型在线转换 - Three.js AI自动纹理开发包 - YOLO 虚幻合成数据生成器 - 三维模型预览图生成器 - 3D模型语义搜索引擎 我们经常看到超现实主义的视频游戏和动画电影角色出现在屏幕上。他们皮肤上的…

【Redux】自己动手实现redux和react-redux

1. React提供context的作用 在class组件的世界里&#xff0c;如果后代组件共享某些状态&#xff0c;比如主题色、语言键&#xff0c;则需要将这些状态提升到根组件&#xff0c;以props的方式从根组件向后代组件一层一层传递&#xff0c;这样则需要在每层写props.someData&#…

Java Arrays.copyOfRange的用法

Arrays.copyOfRange的使用方法&#xff1a; 将一个数组拷贝至另一个数组中 参数&#xff1a; original&#xff1a;第一个参数为要拷贝的数组对象 from&#xff1a;第二个参数为拷贝的开始位置&#xff08;包含&#xff09; to&#xff1a;第三个参数为拷贝的结束位置&#x…

django websocket

目录 核心代码 consumers.py from channels.generic.websocket import WebsocketConsumer from channels.exceptions import StopConsumer import datetime import time from asgiref.sync import async_to_sync class ChatConsumer(WebsocketConsumer):def websocket_conne…

ssm基于vue框架和elementui组件的手机官网论文

摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本手机官网就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短时间内处理完毕庞大的数据信息&#x…

Java 面向对象的三大特征之继承和多态

3、继承 extends 3.1什么是继承&#xff0c;有什么用&#xff1f; 继承&#xff1a;在现实世界当中也是存在的&#xff0c;例如&#xff1a;家里有矿&#xff0c;不用很努力也可以继承。 继承的作用&#xff1a; 基本作用&#xff1a;子类继承父类&#xff0c;代码可以得到复用…

java基础之Java8新特性-Lambda

目录 什么是Lambda表达式 Lambda表达式规范 基本语法 参数列表 函数体 注意事项 如何定义函数接口 1.保证接口中只能有一个抽象方法 2.使用FunctionalInterface注解标记该接口为函数接口 使用Lambda调用无参函数 使用Lambda调用有参函数 使用Lambda的精简写法 使用…

Linux第3步_安装Ubuntu操作系统

创建好虚拟机后&#xff0c;就可以安装Ubuntu操作系统了。 1、双击“VMware Workstation Pro”&#xff0c;得到下面的界面。 2、点击“编辑虚拟机设置”&#xff0c;见下图&#xff1a; 3、等几秒钟&#xff0c;得到下面的界面&#xff1a; 4、点击“CD/DVD”&#xff0c;得到…

vscode无识别已有的maven java项目(visual studio code not recognizing java project)

文章目录 事情经过尝试疑惑问题解决结论 事情经过 未安装任何Java Extension Pack使用 Maven 的 archetype:generate 命令来创建一个新的项目使用vscode打开了该目录然后安装Java Extension Pack等java插件配置了vscode settings.json中的 java.configuration.runtimes和 java…

Ubuntu envs setting

1. change the chmod of folders sudo chown -R $USER:$USER /home/anaconda3 2. torch.cuda.is_available()返回false change conda installation to pip. zai qi ta huan jing pei zhi dou mei wen ti de qing kuang xia , zai shi shi zhe ge fang fa. # CUDA 11.7 con…

Python-1-字符串类型及方法

众所周知&#xff0c;Python面向对象&#xff0c;功能强大 | ू•ૅω•́)ᵎᵎᵎ

mybatis-flex与springBoot整合

mybatis-flex基本使用 1.测试sql2.导入坐标3.框架搭建1.pojo层2.mapper层3.service层4.controller层5.启动类6.配置类7.EmpMapper.xml 4.启动测试 本片文章在springBoot3&#xff0c;jdk21下测试通过 注意官方网站为&#xff1a;https://mybatis-flex.com/ 请点击&#xff1a;直…

使用STM32和ESP8266构建智能家居网络

本文将介绍如何使用STM32微控制器和ESP8266 WiFi模块构建一个智能家居网络。我们将讨论智能家居网络的整体设计思路、硬件连接和软件开发。通过本文的指导和示例代码&#xff0c;读者将能够搭建一个智能家居系统&#xff0c;实现远程控制和数据监测。 一、智能家居网络的整体设…

C++ Optins接口封装设置自动重连

当数据库突然断开的时候&#xff0c;API将接收不到MySQL的连接&#xff0c;在代码里面写关于自动重连可以实现数据库断开之后重连。 1、首先在LXMysql.h创建Option函数 //mysql参数的设定 设置自动重连 在connect之前调用/*int STDCALL mysql_options(MYSQL *mysql, enum mys…