使用rust写一个Web服务器——多线程版本

文章目录

    • 模拟慢请求
    • 多线程Web服务器实现
      • 为每个请求单独生成一个线程
      • 限制创建线程的数量
      • ThreadPool的初始化
      • ThreadPool的存储
      • ThreadPool的设计
    • 关闭和资源清理
      • 为ThreadPool实现Drop
      • 停止工作线程
      • 测试

仓库地址: 1037827920/web-server: 使用rust编写的简单web服务器 (github.com)

模拟慢请求

一个单线程版本的web服务器只能一次处理一个请求,可是如果一个请求持续的时间太长,就会导致其他请求有可能饥饿,下面使用sleep方式让每次请求持续5s,模拟真实的慢请求:

use std::{fs,io::{prelude::*, BufReader},net::{TcpListener, TcpStream},thread,time::Duration,
};fn main() {// 监听本地8080端口let listener = TcpListener::bind("localhost:8080").unwrap();for stream in listener.incoming() {let stream = stream.unwrap();// 处理连接handle_connection(stream);}
}fn handle_connection(mut stream: TcpStream) {let buf_reader = BufReader::new(&mut stream);// 使用next而不是lines,因为我们只需要读取第一行,判断具体的request方法let request_line = buf_reader.lines().next().unwrap().unwrap();// match方法不会像之前的方法那样自动做引用或解引用,因此我们需要显式调用let (status_line, filename) = match &request_line[..] {"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), // 请求 / 资源"GET /sleep HTTP/1.1" => { // 请求 /sleep 资源thread::sleep(Duration::from_secs(5));("HTTP/1.1 200 OK", "hello.html")}_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),};// 读取文件内容let contents = fs::read_to_string(filename).unwrap();let length = contents.len();// 格式化HTTP Responselet response =format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");// 将response写入streamstream.write_all(response.as_bytes()).unwrap();
}

运行代码后访问localhost:8080/sleep,然后紧接着继续运行localhost:8080,会发现后者的请求必须等待前者完成后才能被处理,下面使用线程池改善吞吐量

多线程Web服务器实现

线程池: 包含一组已经生成的线程,它们时刻等待着接收并处理新的任务,当程序接收到新任务时,它会将线程池中的一个线程指派给该任务,在该线程忙着处理时,新来的任务交给池中剩余的线程进行处理,最终,当执行任务的线程处理完后,它会被重新放入到线程池中,准备处理新任务。注意: 需要限制线程池中的线程数量,以保护服务器免受拒绝服务攻击(DoS)的影响:如果针对每个请求创建一个新线程,那么一个人向我们的服务器发出1000万个请求,会直接耗尽资源,导致后续用户的请求无法被处理,这也是拒绝服务名称的来源。

因此,需要对线程池进行一定的架构设计,首先是设定最大线程数的上限,其次是维护一个请求队列。池中的线程去队列中依次弹出请求并处理。

为每个请求单独生成一个线程

修改main函数,每次处理一个任务就创建一个新的线程并执行任务

use std::{fs,io::{prelude::*, BufReader},net::{TcpListener, TcpStream},thread,time::Duration,
};fn main() {let listener = TcpListener::bind("localhost:8080").unwrap();for stream in listener.incoming() {let stream = stream.unwrap();thread::spawn(|| {handle_connection(stream);});}
}fn handle_connection(mut stream: TcpStream) {let buf_reader = BufReader::new(&mut stream);// 使用next而不是lines,因为我们只需要读取第一行,判断具体的request方法let request_line = buf_reader.lines().next().unwrap().unwrap();// match方法不会像之前的方法那样自动做引用或解引用,因此我们需要显式调用let (status_line, filename) = match &request_line[..] {"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), // 请求 / 资源"GET /sleep HTTP/1.1" => { // 请求 /sleep 资源thread::sleep(Duration::from_secs(5));("HTTP/1.1 200 OK", "hello.html")}_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),};// 读取文件内容let contents = fs::read_to_string(filename).unwrap();let length = contents.len();// 格式化HTTP Responselet response =format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");// 将response写入streamstream.write_all(response.as_bytes()).unwrap();
}

这样简单粗暴就能实现多线程的Web服务器,但是这样不能达到限制线程池中线程数的效果

限制创建线程的数量

利用线程池,继续修改main函数

fn main() {let listener = TcpListener::bind("localhost:8080").unwrap();// 首先创建一个包含4个线程的线程池let pool = ThreadPool::new(4);for stream in listener.incoming() {let stream = stream.unwrap();// 分发执行请求pool.execute(|| {handle_connection(stream)});}
}

可以看出,我们至少要实现ThreadPool这个结构体和execute方法

ThreadPool的初始化

首先要确定使用new还是build来初始化ThreadPool实例,new往往用于简单初始化一个实例,而build往往会完成更加复杂的构建工作,我们并不需要在初始化线程池的同时创建相应的线程,因此new更合适。

在src/lib.rs写入以下代码:

pub struct ThreadPool;impl ThreadPool {/// # 函数功能/// 创建一个新的线程池pub fn new(size: usize) -> ThreadPool {assert!(size > 0);ThreadPool}/// # 函数功能/// 执行传入的函数fpub fn execute<F>(&self, f: F)whereF: FnOnce() + Send + 'static{todo!();}
}

在src/main.rs中导入lib.rs的ThreadPool:

use <project_name>::ThreadPool;

ThreadPool的存储

ThreadPool作为一个线程池,肯定是要能够存储线程的对吧,继续修改ThreadPool,添加threads字段,使其能够存储线程

use std::thread::{self, Thread};pub struct ThreadPool {threads: Vec<thread::JoinHandle<()>>,
}impl ThreadPool {/// # 函数功能/// 创建一个新的线程池pub fn new(size: usize) -> ThreadPool {assert!(size > 0);// 使用with_capacity可以提前分配好内存空间,比Vec::new的性能好let mut threads = Vec::with_capacity(size);for _ in 0..size {// 创建线程并将其存储在vector中todo!();}ThreadPool { threads }}/// # 函数功能/// 执行传入的函数fpub fn execute<F>(&self, f: F)whereF: FnOnce() + Send + 'static{todo!();}
}

ThreadPool的设计

使用thread::spawn是生成线程的最好方式,但是它会立即执行传入的任务,我们需要的是创建线程和执行任务是要分离的。也就是说,我们可以先创建线程后这个线程就进入loop循环等待,直到有执行任务的信号过来这个线程才会执行任务。

可以考虑创建一个Worker结构体,存放id和对应的线程。作为ThreadPool和任务线程联系的桥梁,通过channel,ThreadPool持有Sender,通过execute方法将任务发送给Worker,而Worker持有Receiver,在loop循环中接收ThreadPool发送过来的任务。

ThreadPool结构体:

use std::{sync::{mpsc, Arc, Mutex},thread,
};pub struct ThreadPool {workers: Vec<Worker>,sender: mpsc::Sender<Job>,
}impl ThreadPool {/// # 函数功能/// 创建一个新的线程池pub fn new(size: usize) -> ThreadPool {assert!(size > 0);// 获得Sender和Receiverlet (sender, receiver) = mpsc::channel();// receiver会在多线程中移动,因此要保证线程安全,需要使用Arc和Mutex。Arc可以允许多个Worker同时持有Receiver,而Mutex可以确保一次只有一个Worker能从Receiver中获取任务,防止任务被多次执行let receiver = Arc::new(Mutex::new(receiver));let mut workers = Vec::with_capacity(size);for id in 0..size {workers.push(Worker::new(id, Arc::clone(&receiver)));}ThreadPool { workers, sender }}    /// # 函数功能/// 执行传入的函数fpub fn execute<F>(&self, f: F) whereF: FnOnce() + Send + 'static{let job = Box::new(f);// Sender往通道中发送任务self.sender.send(job).unwrap();}
}

Worker结构体:

// 闭包的大小编译是未知的,使用Box可以在堆上动态分配内存,从而存储闭包
type Job = Box<dyn FnOnce() + Send + 'static>;struct Worker {id: usize,thread: thread::JoinHandle<()>,
}impl Worker {fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {let thread = thread::spawn(move || loop {// Receiver会阻塞直到有任务let job = receiver.lock().unwrap().recv().unwrap();println!("Workder {id} got a job; executing");// 执行任务job();});// 让每个Worker都拥有自己的唯一idWorker { id, thread }}
}

关闭和资源清理

为ThreadPool实现Drop

当线程池被Drop时,需要等待所有的子线程完成它们的工作,然后再退出:

struct Worker {id: usize,// 因为Worker中的thread字段的JoinHandle类型没有实现copy trait,可以修改Worker的thread字段,使用Option,然后通过take可以拿走内部值的所有权,同时留下一个Nonethread: Option<thread::JoinHandle<()>>,
}
impl Worker {fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {let thread = thread::spawn(move || loop {let job = receiver.lock().unwrap().recv().unwrap();println!("Workder {id} got a job; executing");job();});// 让每个Worker都拥有自己的唯一idWorker { id, thread: Some(thread)}}
}impl Drop for ThreadPool {fn drop(&mut self) {for worker in &mut self.workers {println!("Shuting down worker {}", worker.id);if let Some(thread) = worker.thread.take() {thread.join().unwrap();}}}
}

停止工作线程

虽然调用了join,但是目标线程依然不会停止,原因在于它们在无限地loop循环等待,需要channel的drop机制:释放sender后,receiver会收到错误,然后再退出

pub struct ThreadPool {workers: Vec<Worker>,// 增加Option封装,这样可以用take拿走所有权sender: Option<mpsc::Sender<Job>>,
}impl ThreadPool {pub fn new(size: usize) -> ThreadPool {assert!(size > 0);let (sender, receiver) = mpsc::channel();let receiver = Arc::new(Mutex::new(receiver));let mut workers = Vec::with_capacity(size);for id in 0..size {workers.push(Worker::new(id, Arc::clone(&receiver)));}ThreadPool { workers, sender: Some(sender)}}pub fn execute<F>(&self, f: F) whereF: FnOnce() + Send + 'static{let job = Box::new(f);self.sender.as_ref().unwrap().send(job).unwrap();}
}impl Drop for ThreadPool {fn drop(&mut self) {// 主动调用drop关闭senderdrop(self.sender.take());for worker in &mut self.workers {println!("Shuting down worker {}", worker.id);if let Some(thread) = worker.thread.take() {thread.join().unwrap();}}}
}

当sender被关闭后,将关闭对应的channel,所以loop的receiver就会收到一个错误,根据错误再进一步的错误:

impl Worker {fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {let thread = thread::spawn(move || loop {let message = receiver.lock().unwrap().recv();match message {Ok(job) => {println!("Worker {id} got a job; executing");job();}Err(_) => {println!("Worker {id} disconnected; shutting down.");break;}}});Worker {id,thread: Some(thread),}}
}

测试

为了验证代码的正确性,修改main:

fn main() {let listener = TcpListener::bind("localhost:8080").unwrap();let pool = ThreadPool::new(4);for stream in listener.incoming().take(2) {let stream = stream.unwrap();pool.execute(|| {handle_connection(stream);});}println!("Shutting down.");
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/54266.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024年云南省职业院校技能大赛赛程规章(大数据赛项)

大家期待已久的职业院校技能大赛在各个省份已经陆续出新文件了&#xff0c;近日云南省的赛程规章也是出来了&#xff0c;我相信不仅是对云南&#xff0c;对其他省份也很有参考价值&#xff0c;小编为大家精简整理了一下文件的内容。 一、竞赛目标 为适应大数据产业对高素质技术…

【Spring Security】基于SpringBoot3.3.4版本②如何配置免鉴权Path

基于Spring Boot 3.3.4,详细说明Spring Security 6.3.3的使用 摘要本地开发环境说明SecurityFilterChain介绍application.ymlWen3SecurityProperties.java修改DemoWen3Security修改SecurityFilterChainIgnoredPathController.javaIgnoredPathController2.java启动工程测试测试…

《OpenCV 计算机视觉》—— Harris角点检测、SIFT特征检测

文章目录 一、Harris 角点检测1.基本思想2.检测步骤3.OpenCV实现 二、SIFT特征检测1. SIFT特征检测的基本原理2. SIFT特征检测的特点3. OpenCV 实现 一、Harris 角点检测 OpenCV中的Harris角点检测是一种基于图像灰度值变化的角点提取算法&#xff0c;它通过计算每个像素点的响…

[Unity Demo]从零开始制作空洞骑士Hollow Knight第十二集:制作完整地图和地图细节设置以及制作相机系统的跟随玩家和视角锁定功能

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、制作完整的地图和地图细节设置 1.制作地图前的设置2.制作地图前期该做的事3.制作地图之堆叠素材4.制作地图后期该做的事5.制作地图之修复意想不到的Bug二、…

C++仿函数的介绍以及priority_queue的介绍和模拟实现

目录 1.仿函数 1.1仿函数的介绍 1.2自定义类型使用仿函数 1.3自定义支持比较大小&#xff0c;但是比较的逻辑不是自己想要的逻辑 2.优先级队列priority_queue 2.1priority_queue的介绍 2.2priority_queue的使用 2.3priority_queue的模拟实现 1.仿函数 1.1仿函数的介绍…

微信小程序实战教程:如何使用map组件实现地图功能

在微信小程序中&#xff0c;map组件是一个非常实用的功能&#xff0c;它可以帮助我们快速实现地图展示、定位、标注等操作。本文将详细介绍如何在微信小程序中使用map组件&#xff0c;带你轻松掌握地图开发技能。 一、map组件概述 map组件是微信小程序官方提供的一个地图组件…

【C语言】指针篇 | 万字笔记

写在前面 在学习C语言过程&#xff0c;总有一个要点难点离不开&#xff0c;那就是大名鼎鼎的C语言指针&#xff0c;也是应为有指针的存在&#xff0c;使得C语言一直长盛不衰。因此不才把指针所学的所有功力都转换成这个笔记。希望对您有帮助&#x1f970;&#x1f970; 学习指…

彩虹易支付最新版源码及安装教程(修复BUG+新增加订单投诉功能)

该系统也没版本号&#xff0c;此版本目前是比较新的版本&#xff0c;增加了订单投诉功能&#xff0c;和一个好看的二次元模板。 此版本是全开源版&#xff0c;无一处加密文件,系统默认是安装后是打不开的&#xff0c; 本站特别修复了BUG文件&#xff0c;在PHP7.4环境下也没问…

【网络安全 | JAVA代码审计】基础安全问题和解决方法初探

未经许可,不得转载。 文章目录 SQL注入XSS文件上传XXE路径遍历SQL注入 漏洞代码: public String jdbc_sqli_vul(@RequestParam("username") String username) {StringBuilder result =

Java的学习(语法相关)

字符串存储的问题 char 和字符串都是字符的集合&#xff0c;它们之间的确有相似性&#xff0c;但在 Java 中它们有着不同的存储机制和处理方式。让我从 char 和 String 的本质区别入手来解释。 1. char 和 String 的区别 char 是基本类型&#xff1a;char 是 Java 中的基本数据…

微信小程序技术框架选型

“近期在对团队的微信小程序进行技术框架选型&#xff0c;故对目前主流的微信小程序技术框架进行了一些分析和比较&#xff0c;包括各框架的维护团队、社区链接、GitHub star数、优缺点对比等方面&#xff0c;为团队提供技术框架选型参考” 一、引言 随着移动互联网的快速发展…

【C++】多态(下)

个人主页~ 多态&#xff08;上&#xff09;~ 多态 四、多态的原理1、虚表的存储位置2、多态的原理3、动态绑定和静态绑定 五、单继承和多继承关系的虚函数表1、单继承中的虚函数表2、多继承中的虚函数表 六、多态中的一些小tips 四、多态的原理 1、虚表的存储位置 class A {…

CORE MVC 过滤器 (筛选器)

MVC FrameWork MVCFramework MVC Core 过滤器 分 同步、异步 1、 授权筛选器 IAuthorizationFilter&#xff0c;IAsyncAuthorizationFilter 管道中运行的第一类筛选器&#xff0c;用来确定发出请求的用户是否有权限发出当前请求 2、资源筛选器 IResourceFilter &#xff0c;…

elementPlus的tree组件点击后有白色背景

在使用elementPlus的tree组件时&#xff0c;需要对它进行样式的重写&#xff0c;下面是相关代码 <script setup> import { ref } from vue const data [{label: Level one 1,children: [{label: Level two 1-1,children: [{label: Level three 1-1-1}]}]},{label: Leve…

SFTP 是什么?如何在 Linux 终端上访问 SFTP

SFTP服务器是 Secure File Transfer Protocol&#xff08;安全文件传输协议&#xff09;的简称&#xff0c;它是一种基于SSH&#xff08;Secure Shell&#xff09;协议的文件传输服务器&#xff0c;专门用于在网络上安全地传输文件。SFTP服务器通过使用SSH进行数据加密和身份验…

[C++][第三方库][gtest]详细讲解

目录 1.介绍2.安装3.使用1.头文件包含2.框架初始化接口3.调用测试样例4.TEST宏5.断言宏6.示例 1.介绍 gtest是一个跨平台的C单元测试框架&#xff0c;由Google公司发布gtest是为了在不同平台上为编写C单元测试而生成的&#xff0c;它提供了丰富的断言、致命和非致命判断、参数…

微软准备了 Windows 11 24H2 ISO “OOBE/BypassNRO“命令依然可用

Windows 11 24H2 可能在未来几周内开始推出。 微软已经要求 OEM 遵循新的指南准备好 Windows 11 24H2 就绪的驱动程序&#xff0c;并且现在已经开始准备媒体文件 (.ISO)。 OEM ISO 的链接已在微软服务器上发布。 一个标有"X23-81971_26100.1742.240906-0331.ge_release_sv…

市场中的新兴力量与未来发展

在当前瞬息万变的全球金融市场中&#xff0c;期货交易以其高杠杆与灵活性&#xff0c;吸引了越来越多的投资者参与其中。大粤期货作为中国期货行业的新兴力量&#xff0c;凭借其创新的交易平台、广泛的产品线及专业的风险管理服务&#xff0c;迅速在市场中崭露头角。本文将介绍…

Vue3项目开发——新闻发布管理系统(九)(完结篇)

文章目录 十一、用户信息管理1、用户基本资料管理1.1 页面设计1.2 封装接口,更新信息2、更换头像2.1 静态结构2.2 选择图片预览2.3 上传头像3、重置密码3.1 页面设计3.2 封装接口,更新密码十二、项目打包十三、系统全部源码下载十一、用户信息管理 用户信息管理包括功能:基…

第四届机器人、自动化与智能控制国际会议(ICRAIC 2024)征稿

第四届机器人、自动化与智能控制国际会议&#xff08;ICRAIC 2024&#xff09;由湖南第一师范学院主办&#xff0c;南京师范大学、山东女子学院、爱迩思出版社&#xff08;ELSP&#xff09;协办。 大会将专注于机器人、数字化、自动化、人工智能等技术的开发和融合&#xff0c…