Java NIO实现高性能HTTP代理

NIO采用多路复用IO模型,相比传统BIO(阻塞IO),通过轮询机制检测注册的Channel是否有事件发生,可以实现一个线程处理客户端的多个连接,极大提升了并发性能。
在5年前,本人出于对HTTP正向代理的好奇新,那时候也在学习JAVA,了解到了NIO,就想用NIO写一个正向代理软件,当时虽然实现了正向代理,但是代码逻辑及其混乱,而且没有经过测试也许有不少的bug
近期因为找工作,又复习起了以往的一些JAVA知识,包括JVM内存模型、GC垃圾回收机制等等,其中也包括NIO。现在回头再看NIO,理解也更深刻了一点。
在多路复用IO模型中,会有一个线程不断去轮询多个socket的状态,只有当socket真正有读写事件时,才真正调用实际的IO读写操作。因为在多路复用IO模型中,只需要使用一个线程就可以管理多个socket,系统不需要建立新的进程或者线程,也不必维护这些线程和进程,并且只有在真正有socket 读写事件进行时,才会使用IO资源,所以它大大减少了资源占用。在Java NIO中,是通过selector.select()去查询每个通道是否有到达事件,如果没有事件,则一直阻塞在那里,因此这种方式会导致用户线程的阻塞。多路复用IO模式,通过一个线程就可以管理多个socket,只有当socket 真正有读写事件发生才会占用资源来进行实际的读写操作。因此,多路复用IO比较适合连接数比较多的情况。
本HTTP代理软件只能代理HTTP和HTTPS协议,分享出来共广大网友参考和学习
1.Bootstrap类
此类用于创建和启动一个HTTP代理服务
package org.example;import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;public class Bootstrap {private final Logger logger = LogManager.getLogger(Bootstrap.class);private AbstractEventLoop serverEventLoop;private int port;public Bootstrap() {port = 8888;serverEventLoop = new ServerEventLoop(this);}public Bootstrap bindPort(int port) {try {this.port = port;this.serverEventLoop.bind(port);} catch (Exception e) {logger.error("open server socket channel error.", e);}return this;}public void start() {serverEventLoop.getSelector().wakeup();logger.info("Proxy server started at port {}.", port);}public AbstractEventLoop getServerEventLoop() {return serverEventLoop;}
}
2.ServerEventLoop
事件循环,单线程处理事件循环。包括客户端的连接和读写请求,目标服务器的连接和读写事件,在同一个事件循环中处理。
package org.example;import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.example.common.HttpRequestParser;import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;public class ServerEventLoop extends AbstractEventLoop {private final Logger logger = LogManager.getLogger(ServerEventLoop.class);public ServerEventLoop(Bootstrap bootstrap) {super(bootstrap);}@Overrideprotected void processSelectedKey(SelectionKey key) {if (key.isValid() && key.isAcceptable()) {if (key.attachment() instanceof Acceptor acceptor) {acceptor.accept();}}if (key.isValid() && key.isReadable()) {if (key.attachment() instanceof ChannelHandler channelHandler) {channelHandler.handleRead();}}if (key.isValid() && key.isConnectable()) {key.interestOpsAnd(~SelectionKey.OP_CONNECT);if (key.attachment() instanceof ChannelHandler channelHandler) {channelHandler.handleConnect();}}if (key.isValid() && key.isWritable()) {key.interestOpsAnd(~SelectionKey.OP_WRITE);if (key.attachment() instanceof ChannelHandler channelHandler) {channelHandler.handleWrite();}}}@Overridepublic void bind(int port) throws Exception {ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);SelectionKey key = serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);key.attach(new Acceptor(serverSocketChannel));serverSocketChannel.bind(new InetSocketAddress(port));}class Acceptor {ServerSocketChannel ssc;public Acceptor(ServerSocketChannel ssc) {this.ssc = ssc;}public void accept() {try {SocketChannel socketChannel = ssc.accept();socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ, new ClientChannelHandler(socketChannel));logger.info("accept client connection");} catch (IOException e) {logger.error("accept error");}}}abstract class ChannelHandler {Logger logger;SocketChannel channel;ByteBuffer writeBuffer;public ChannelHandler(SocketChannel channel) {this.logger = LogManager.getLogger(this.getClass());this.channel = channel;this.writeBuffer = null;}abstract void handleRead();public void handleWrite() {doWrite();}public abstract void onChannelClose();public ByteBuffer doRead() {ByteBuffer buffer = ByteBuffer.allocate(4096);try {int len = channel.read(buffer);if (len == -1) {logger.info("read end-of-stream, close channel {}", channel);channel.close();onChannelClose();}if (len > 0) {buffer.flip();}} catch (IOException e) {logger.error("read channel error");try {channel.close();onChannelClose();} catch (IOException ex) {logger.error("close channel error.");}}return buffer;}public void doWrite() {if (writeBuffer != null) {try {while (writeBuffer.hasRemaining()) {channel.write(writeBuffer);}} catch (IOException e) {logger.error("write channel error.");try {channel.close();onChannelClose();} catch (IOException ex) {logger.error("close channel error");}}writeBuffer = null;}}public void handleConnect() {}}class ClientChannelHandler extends ChannelHandler {HttpRequestParser requestParser;private SelectableChannel proxyChannel;public ClientChannelHandler(SocketChannel sc) {super(sc);this.channel = sc;this.requestParser = new HttpRequestParser();this.proxyChannel = null;}@Overridepublic void handleRead() {if (requestParser.isParsed()) {if (proxyChannel != null) {SelectionKey proxyKey = proxyChannel.keyFor(selector);if (proxyKey != null && proxyKey.isValid() && proxyKey.attachment() instanceof ProxyChannelHandler proxyHandler) {//需要等待ProxyHandler的写入缓存为空后才可读取客户端的数据if (proxyHandler.writeBuffer == null) {ByteBuffer buffer = doRead();if (buffer.hasRemaining() && proxyKey.isValid()) {proxyHandler.writeBuffer = buffer;proxyKey.interestOpsOr(SelectionKey.OP_WRITE);}}}}} else {ByteBuffer buffer = doRead();requestParser.putFromByteBuffer(buffer);if (requestParser.isParsed()) {//连接到目标服务器ByteBuffer buf = null;if (requestParser.getMethod().equals(HttpRequestParser.HTTP_METHOD_CONNECT)) {//回写客户端连接成功SelectionKey clientKey = channel.keyFor(selector);if (clientKey != null && clientKey.isValid() && clientKey.attachment() instanceof ClientChannelHandler clientHandler) {clientHandler.writeBuffer = ByteBuffer.wrap((requestParser.getProtocol() + " 200 Connection Established\r\n\r\n").getBytes());clientKey.interestOpsOr(SelectionKey.OP_WRITE);}} else {//将缓存的客户端的数据通过代理转发byte[] allBytes = requestParser.getAllBytes();buf = ByteBuffer.wrap(allBytes);}this.proxyChannel = connect(requestParser.getAddress(), buf);}}}@Overridepublic void onChannelClose() {try {if (proxyChannel != null) {proxyChannel.close();}} catch (IOException e) {logger.error("close channel error");}}private SocketChannel connect(String address, ByteBuffer buffer) {String host = address;int port = 80;if (address.contains(":")) {host = address.split(":")[0].trim();port = Integer.parseInt(address.split(":")[1].trim());}SocketAddress target = new InetSocketAddress(host, port);SocketChannel socketChannel = null;SelectionKey proxyKey = null;int step = 0;try {socketChannel = SocketChannel.open();socketChannel.configureBlocking(false);step = 1;ProxyChannelHandler proxyHandler = new ProxyChannelHandler(socketChannel);proxyHandler.setClientChannel(channel);proxyHandler.writeBuffer = buffer;proxyKey = socketChannel.register(selector, SelectionKey.OP_CONNECT, proxyHandler);proxyKey.interestOpsOr(SelectionKey.OP_WRITE);step = 2;socketChannel.connect(target);} catch (IOException e) {logger.error("connect error.");switch (step) {case 2:proxyKey.cancel();case 1:try {socketChannel.close();} catch (IOException ex) {logger.error("close channel error.");}socketChannel = null;break;}}return socketChannel;}}class ProxyChannelHandler extends ChannelHandler {private SelectableChannel clientChannel;public ProxyChannelHandler(SocketChannel sc) {super(sc);clientChannel = null;}@Overridepublic void handleConnect() {try {if (channel.isConnectionPending() && channel.finishConnect()) {SelectionKey proxyKey = channel.keyFor(selector);proxyKey.interestOpsOr(SelectionKey.OP_READ);}} catch (IOException e) {try {channel.close();onChannelClose();} catch (IOException ex) {logger.error("close channel error.");}logger.error("finish connection error.");}}@Overridepublic void handleRead() {if (clientChannel != null) {SelectionKey clientKey = clientChannel.keyFor(selector);if (clientKey != null && clientKey.isValid() && clientKey.attachment() instanceof ClientChannelHandler clientHandler) {if (clientHandler.writeBuffer == null) {ByteBuffer buffer = doRead();if (buffer.hasRemaining() && clientKey.isValid()) {clientHandler.writeBuffer = buffer;clientKey.interestOpsOr(SelectionKey.OP_WRITE);}}}}}@Overridepublic void onChannelClose() {try {if (clientChannel != null) {clientChannel.close();}} catch (IOException e) {logger.error("close channel error");}}public void setClientChannel(SocketChannel client) {this.clientChannel = client;}}
}
3.AbstractEventLoop
事件循环的抽象类
package org.example;import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executors;public abstract class AbstractEventLoop implements Runnable {private final Logger logger = LogManager.getLogger(AbstractEventLoop.class);protected Selector selector;protected Bootstrap bootstrap;public AbstractEventLoop(Bootstrap bootstrap) {this.bootstrap = bootstrap;openSelector();Executors.newSingleThreadExecutor().submit(this);}public void bind(int port) throws Exception {throw new Exception("not support");}@Overridepublic void run() {while (true) {try {if (selector.select() > 0) {processSelectedKeys();}} catch (Exception e) {logger.error("select error.", e);}}}private void processSelectedKeys() {Set<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> iterator = keys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();processSelectedKey(key);}}protected abstract void processSelectedKey(SelectionKey key);public Selector openSelector() {try {this.selector = Selector.open();return this.selector;} catch (IOException e) {logger.error("open selector error.", e);}return null;}public Selector getSelector() {return selector;}
}
4.HttpRequestParser
用于解析HTTP请求报文中的请求头,可以获取主机和端口号
package org.example.common;import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;public class HttpRequestParser {private final Logger logger = LogManager.getLogger(HttpRequestParser.class);public static final String COLON = ":";public static final String REQUEST_HEADER_HOST_PREFIX = "host:";private UnboundedByteBuffer requestBytes = new UnboundedByteBuffer();private List<String> headers = new ArrayList<>();public static final String HTTP_METHOD_GET = "GET";public static final String HTTP_METHOD_POST = "POST";public static final String HTTP_METHOD_PUT = "PUT";public static final String HTTP_METHOD_DELETE = "DELETE";public static final String HTTP_METHOD_TRACE = "TRACE";public static final String HTTP_METHOD_OPTIONS = "OPTIONS";public static final String HTTP_METHOD_HEAD = "HEAD";public static final String HTTP_METHOD_CONNECT = "CONNECT";private String address;private String protocol;private String method;private boolean parsed = false;private StringBuffer reqHeaderBuffer = new StringBuffer();public void putFromByteBuffer(ByteBuffer buffer) {for (; buffer.hasRemaining(); ) {byte b = buffer.get();requestBytes.addByte(b);reqHeaderBuffer.append((char) b);if (b == '\n' && reqHeaderBuffer.charAt(reqHeaderBuffer.length() - 2) == '\r') {if (reqHeaderBuffer.length() == 2) {parsed = true;logger.debug("Request header line end.");break;}String headerLine = reqHeaderBuffer.substring(0, reqHeaderBuffer.length() - 2);logger.debug("Request header line parsed {}", headerLine);headers.add(headerLine);if (headerLine.startsWith(HTTP_METHOD_GET)|| headerLine.startsWith(HTTP_METHOD_POST)|| headerLine.startsWith(HTTP_METHOD_PUT)|| headerLine.startsWith(HTTP_METHOD_DELETE)|| headerLine.startsWith(HTTP_METHOD_TRACE)|| headerLine.startsWith(HTTP_METHOD_OPTIONS)|| headerLine.startsWith(HTTP_METHOD_HEAD)|| headerLine.startsWith(HTTP_METHOD_CONNECT)) {this.protocol = headerLine.split(" ")[2].trim();this.method = headerLine.split(" ")[0].trim();} else if (headerLine.toLowerCase().startsWith(REQUEST_HEADER_HOST_PREFIX)) {this.address = headerLine.toLowerCase().replace(REQUEST_HEADER_HOST_PREFIX, "").trim();}reqHeaderBuffer.delete(0, reqHeaderBuffer.length());}}}public boolean isParsed() {return parsed;}public String getAddress() {return address;}public String getProtocol() {return protocol;}public String getMethod() {return method;}public byte[] getAllBytes() {return requestBytes.toByteArray();}
}
5.UnboundedByteBuffer
无界的字节缓冲区,每次会以两倍的容量扩容,可以用于追加存入客户端的请求数据,实现粘包
package org.example.common;public class UnboundedByteBuffer {private byte[] bytes;private int size;private int cap;private final int DEFAULT_CAP = 4096;private final int MAX_CAP = 1 << 30;public UnboundedByteBuffer() {this.cap = DEFAULT_CAP;this.bytes = new byte[this.cap];this.size = 0;}public void addBytes(byte[] data) {ensureCapacity(data.length);System.arraycopy(data, 0, bytes, size, data.length);this.size += data.length;}private void ensureCapacity(int scale) {if (scale + this.size > this.cap) {int tmpCap = this.cap;while (scale + this.size > tmpCap) {tmpCap = tmpCap << 1;}if (tmpCap > MAX_CAP) {return;}byte[] newBytes = new byte[tmpCap];System.arraycopy(this.bytes, 0, newBytes, 0, this.size);this.bytes = newBytes;}}public byte[] toByteArray() {byte[] ret = new byte[this.size];System.arraycopy(this.bytes, 0, ret, 0, this.size);return ret;}public void addByte(byte b) {ensureCapacity(1);this.bytes[this.size++] = b;}
}
以上实现是在单个事件循环线程中处理所有事件,一个更好的方案是将客户端的Channel和代理服务器与目标服务器的Channel区分开,分别在两个事件循环中处理。基本实现也和本文中的代码大体一致,两者在理论上应该存在性能差距,实际经过本人测试可以每秒处理客户端的上千个连接。代码传送门

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/60098.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

栈和队列(Java)

一.栈&#xff08;Stack&#xff09; 1.定义 栈是限定仅在表尾进行插入或删除操作的线性表 一般的表尾称为栈顶 表头称为栈底 栈具有“后进先出”的特点 2.对栈的模拟 栈主要具有以下功能&#xff1a; push(Object item)&#xff1a;将元素item压入栈顶。 pop()&am…

Angular 和 Vue2.0 对比

前言 &#xff1a;“业精于勤&#xff0c;荒于嬉&#xff1b;行成于思&#xff0c;毁于随” 很久没写博客了&#xff0c;大多记录少进一步探查。 Angular 和 Vue2.0 对比&#xff1a; 一.概念 1.1 Angular 框架&#xff1a; 是一款由谷歌开发的开源web前端框架&#xff08;核…

基于Multisim数字电子秒表0-60S电路(含仿真和报告)

【全套资料.zip】数字电子秒表电路Multisim仿真设计数字电子技术 文章目录 功能一、Multisim仿真源文件二、原理文档报告资料下载【Multisim仿真报告讲解视频.zip】 功能 1.秒表最大计时值为60秒&#xff1b; 2. 2位数码管显示&#xff0c;分辨率为1秒&#xff1b; 3.具有清零…

安卓智能指针sp、wp、RefBase浅析

目录 前言一、RefBase1.1 引用计数机制1.2 设计目的1.3 主要方法1.4 如何使用1.5 小结 二、sp和wp2.1 引用计数机制2.2 设计目的2.3 主要方法2.3.1 sp2.3.2 wp 2.4 如何使用2.5 小结 四、参考链接 前言 安卓底层binder中&#xff0c;为什么 IInterface要继承自RefBase &#x…

【论文笔记】Prefix-Tuning: Optimizing Continuous Prompts for Generation

&#x1f34e;个人主页&#xff1a;小嗷犬的个人主页 &#x1f34a;个人网站&#xff1a;小嗷犬的技术小站 &#x1f96d;个人信条&#xff1a;为天地立心&#xff0c;为生民立命&#xff0c;为往圣继绝学&#xff0c;为万世开太平。 基本信息 标题: Prefix-Tuning: Optimizin…

【Web前端】从回调到现代Promise与Async/Await

异步编程是一种让程序能够在等待某些操作完成的同时继续执行其他任务的关键技术&#xff0c;打破了传统编程中顺序执行代码的束缚。这种编程范式允许开发者构建出能够即时响应用户操作、高效处理网络请求和资源加载的应用程序。通过异步编程&#xff0c;JavaScript 能够在执行耗…

【CSS】“flex: 1“有什么用?

flex 属性的组成 flex 属性是一个复合属性&#xff0c;包含以下三个子属性&#xff1a; flex-grow&#xff1a;决定元素在容器中剩余空间的分配比例。默认值为 0&#xff0c;表示元素不会扩展。当设置为正数时&#xff0c;元素会按照设定比例扩展。flex-shrink&#xff1a;决…

计算机课程管理:Spring Boot与工程认证的协同创新

3系统分析 3.1可行性分析 通过对本基于工程教育认证的计算机课程管理平台实行的目的初步调查和分析&#xff0c;提出可行性方案并对其一一进行论证。我们在这里主要从技术可行性、经济可行性、操作可行性等方面进行分析。 3.1.1技术可行性 本基于工程教育认证的计算机课程管理平…

【SpringBoot】18 上传文件到数据库(Thymeleaf + MySQL)

Git仓库 https://gitee.com/Lin_DH/system 介绍 使用 Thymeleaf 写的页面&#xff0c;将&#xff08;txt、jpg、png&#xff09;格式文件上传到 MySQL 数据库中。 依赖 pom.xml <!-- https://mvnrepository.com/artifact/com.mysql/mysql-connector-j --><depende…

Sharding运行模式、元数据、持久化详解

运行模式 单机模式 能够将数据源和规则等元数据信息持久化&#xff0c;但无法将元数据同步至多个Sharding实例&#xff0c;无法在集群环境中相互感知。 通过某一实例更新元数据之后&#xff0c;会导致其他实例由于获取不到最新的元数据而产生不一致的错误。 适用于工程师在本…

挖掘web程序中的OAuth漏洞:利用redirect_uri和state参数接管账户

本文探讨了攻击者如何利用OAuth漏洞&#xff0c;重点是滥用redirect_uri和state参数以接管用户账户。如果redirect_uri参数验证不严&#xff0c;可能会导致未经授权的重定向到恶意服务器&#xff0c;从而使攻击者能够捕获敏感信息。同样&#xff0c;state参数的错误实现可能使O…

Python世界:力扣题解1712,将数组分成三个子数组的方案数,中等

Python世界&#xff1a;力扣题解1712&#xff1a;将数组分成三个子数组的方案数&#xff0c;中等 任务背景思路分析代码实现测试套件本文小结 任务背景 问题来自力扣题目1712. Ways to Split Array Into Three Subarrays&#xff0c;大意如下&#xff1a; A split of an intege…

Java集合基础——针对实习面试

目录 Java集合基础什么是Java集合&#xff1f;说说List,Set,Queue,Map的区别&#xff1f;说说List?说说Set?说说Map&#xff1f;说说Queue?为什么要用集合&#xff1f;如何选用集合&#xff1f; Java集合基础 什么是Java集合&#xff1f; Java集合&#xff08;Java Collect…

基于单片机的客车载客状况自动检测系统(论文+源码)

1系统整体设计 本课题为客车载客状况自动检测系统&#xff0c;在此以STM32单片机为核心控制器&#xff0c;结合压力传感器、红外传感器、蜂鸣器、语音提示模块、继电器、液晶等构成整个客车载客状况自动检测系统&#xff0c;整个系统架构如图2.1所示&#xff0c;在此通过两个红…

渗透测试(socket,namp,scapy)

socket:可以用来实现不同虚拟机或者不同计算机之间的通信。 socket常用函数&#xff1a; sock.bind(host,port) //host可接受client范围&#xff0c;以及连接的端口 sock.listen()//sever开启监听连接 sock.accpet()//返回 sock&#xff0c;addr 用来接受和发送数据 addr…

【mongodb】数据库的安装及连接初始化简明手册

NoSQL(NoSQL Not Only SQL )&#xff0c;意即"不仅仅是SQL"。 在现代的计算系统上每天网络上都会产生庞大的数据量。这些数据有很大一部分是由关系数据库管理系统&#xff08;RDBMS&#xff09;来处理。 通过应用实践证明&#xff0c;关系模型是非常适合于客户服务器…

内网对抗-信息收集篇SPN扫描DC定位角色区域定性服务探针安全防护凭据获取

知识点&#xff1a; 1、信息收集篇-网络架构-出网&角色&服务&成员 2、信息收集篇-安全防护-杀毒&防火墙&流量监控 3、信息收集篇-密码凭据-系统&工具&网站&网络域渗透的信息收集&#xff1a; 在攻防演练中&#xff0c;当完成边界突破后进入内…

OpenWebUI,RAG+外部知识库+AI写文的开源应用

引言 自从去年AI火起来之后&#xff0c;很多人便热衷于寻找适合自用的AI开源项目&#xff0c;把各家大模型API接入到自己的AI程序里&#xff0c;便可以通过AI辅助完成一系列日常任务&#xff0c;比如内容翻译/润色/总结/撰写、格式转换、数据分类、代码分析、角色扮演等等。 …

qt QErrorMessage详解

1、概述 QErrorMessage是Qt框架中用于显示错误消息的一个对话框类。它提供了一个简单的模态对话框&#xff0c;用于向用户显示错误或警告消息。QErrorMessage通常用于应用程序中&#xff0c;当需要向用户报告错误但不希望中断当前操作时。它提供了一个标准的错误消息界面&…

一文了解Android的Doze模式

Android 的 Doze 模式是一项省电功能&#xff0c;主要用于减少设备的功耗&#xff0c;特别是在屏幕关闭且设备长时间未被使用的情况下。Doze 模式在 Android 6.0&#xff08;API Level 23&#xff09;首次引入&#xff0c;并在后续版本中不断改进&#xff0c;以便更智能地管理后…