Rust学习笔记
Rust编程语言入门教程课程笔记
参考教材: The Rust Programming Language (by Steve Klabnik and Carol Nichols, with contributions from the Rust Community)
Lecture 20: Final Project: Building a Multithreaded Web Server
src/main.rs
use std::fs;
use std::{io::{prelude::*, BufReader},net::{TcpListener, TcpStream},
};
use std::thread;
use std::time::Duration;
use hello::ThreadPool;fn main() {// bind to a portlet listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4);// set a limit on the number of threads in the pool// listen for incoming connectionsfor stream in listener.incoming().take(2) {// only accept two connectionslet stream = stream.unwrap();// println!("Connection established!");// handle each connection// handle_connection(stream);// spawn a new thread for each connection// thread::spawn(|| {// handle_connection(stream);//suffer from ddos attack// });pool.execute(|| {handle_connection(stream);});}println!("Shutting down.");
}fn handle_connection(mut stream: TcpStream) {// let buf_reader = BufReader::new(&mut stream);// let http_request: Vec<_> = buf_reader// .lines()// .map(|result| result.unwrap())// .take_while(|line| !line.is_empty())// .collect();// println!("Request: {:#?}", http_request);let mut buffer = [0; 1024];stream.read(&mut buffer).unwrap();let get = b"GET / HTTP/1.1\r\n";let sleep = b"GET /sleep HTTP/1.1\r\n";//Reading a Request// Request Format// Method Request-URI HTTP-Version CRLF// headers CRLF// message-body// Example// Request: [// "GET / HTTP/1.1",// "Host: 127.0.0.1:7878",// "Connection: keep-alive",// "sec-ch-ua: \"Google Chrome\";v=\"119\", \"Chromium\";v=\"119\", \"Not?A_Brand\";v=\"24\"",// "sec-ch-ua-mobile: ?0",// "sec-ch-ua-platform: \"macOS\"",// "Upgrade-Insecure-Requests: 1",// "User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",// "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",// "Sec-Fetch-Site: none",// "Sec-Fetch-Mode: navigate",// "Sec-Fetch-User: ?1",// "Sec-Fetch-Dest: document",// "Accept-Encoding: gzip, deflate, br",// "Accept-Language: zh-CN,zh;q=0.9,en;q=0.8",// ] //Writing a Response// Response Format// HTTP-Version Status-Code Reason-Phrase CRLF// headers CRLF// message-body//let response = "HTTP/1.1 200 OK\r\n\r\n";// let contents = fs::read_to_string("hello.html").unwrap();// let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", contents);// write the response to the stream// stream.write(response.as_bytes()).unwrap();// stream.flush().unwrap();// if buffer.starts_with(get) {// let contents = fs::read_to_string("hello.html").unwrap();// let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", contents);// stream.write(response.as_bytes()).unwrap();// stream.flush().unwrap();// } else {// // other request// let status_line = "HTTP/1.1 404 NOT FOUND\r\n\r\n";// let contents = fs::read_to_string("404.html").unwrap();// let response = format!("{}{}", status_line, contents);// stream.write(response.as_bytes()).unwrap();// stream.flush().unwrap();// }// Refactoringlet (status_line, filename) = if buffer.starts_with(get) {("HTTP/1.1 200 OK\r\n\r\n", "hello.html")} else if buffer.starts_with(sleep) {// simulate a slow request// simulate a slow requeststd::thread::sleep(std::time::Duration::from_secs(5));("HTTP/1.1 200 OK\r\n\r\n", "hello.html")} else {("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")};let contents = fs::read_to_string(filename).unwrap();let response = format!("{}{}", status_line, contents);stream.write(response.as_bytes()).unwrap();stream.flush().unwrap();
}
src/lib.rs
use std::thread;
use std::sync::{mpsc, Arc, Mutex};enum Message {NewJob(Job),Terminate,
}struct Worker {id: usize,// thread: thread::JoinHandle<()>,thread: Option<thread::JoinHandle<()>>,
}impl Worker {fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {// let thread = thread::spawn(|| {});// let thread = thread::spawn(|| {// receiver;// });// let thread = thread::spawn(move || loop {// while let Ok(_) = receiver.lock().unwrap().recv().unwrap(){// println!("Worker {} got a job; executing.", id);// job.call_box();// }// });let thread = thread::spawn(move || loop {let message = receiver.lock().unwrap().recv().unwrap();match message {Message::NewJob(job) => {println!("Worker {} got a job; executing.", id);job.call_box();},Message::Terminate => {println!("Worker {} was told to terminate.", id);break;},}});Worker { id, thread: Some(thread) }}// fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {// let thread = thread::spawn(|| {// // loop {// // receiver.lock().unwrap().recv().unwrap();// // }// loop {// let job = receiver.lock().unwrap().recv().unwrap();// println!("Worker {} got a job; executing.", id);// job.call_box();// }// });// Worker { id, thread }// }
}pub struct ThreadPool {// threads: Vec<thread::JoinHandle<()>>,workers: Vec<Worker>,// sender: mpsc::Sender<Job>,sender: mpsc::Sender<Message>,
}impl ThreadPool {/// Create a new ThreadPool/// /// The size is the number of threads in the pool/// /// # Panics/// /// The `new` function will panic if the size is zero// // pub fn new(size: usize) -> ThreadPool {// assert!(size > 0);// let (sender, receiver) = mpsc::channel();// let mut workers = Vec::with_capacity(size);// for id in 0..size {// workers.push(Worker::new(id, receiver));// }// ThreadPool { workers, sender }// }pub fn new(size: usize) -> ThreadPool {assert!(size > 0);let (sender, receiver) = mpsc::channel();let receiver = Arc::new(Mutex::new(receiver));let mut workers = Vec::with_capacity(size);for id in 0..size {workers.push(Worker::new(id, Arc::clone(&receiver)));}ThreadPool { workers, sender }}pub fn execute<F>(&self, f: F)whereF: FnOnce() + Send + 'static,{let job = Box::new(f);//Box<dyn FnOnce() + Send + 'static>// self.sender.send(job).unwrap();self.sender.send(Message::NewJob(job)).unwrap();}// the execute method should be similar with thread::spawn// pub fn spawn<F, T>(f: F) -> JoinHandle<T>// where// F: FnOnce() -> T + Send + 'static,// T: Send + 'static,// {// Builder::new().spawn(f).expect("failed to spawn thread")// }}impl Drop for ThreadPool {fn drop(&mut self) {println!("Sending terminate message to all workers.");// for worker in &mut self.workers {// println!("Shutting down worker {}", worker.id);// worker.thread.join().unwrap();// }// for worker in &mut self.workers {// println!("Shutting down worker {}", worker.id);// // worker.thread.join().unwrap();// if let Ok(_) = worker.thread.join() {// println!("Worker {} shut down successfully.", worker.id);// } else {// println!("Worker {} failed to shut down.", worker.id);// }// }for _ in &mut self.workers {self.sender.send(Message::Terminate).unwrap();}println!("Shutting down all workers.");for worker in &mut self.workers {// println!("Shutting down worker {}", worker.id);// worker.thread.join().unwrap();if let Some(thread) = worker.thread.take() {if let Ok(_) = thread.join() {println!("Worker {} shut down successfully.", worker.id);} else {println!("Worker {} failed to shut down.", worker.id);}}}}
}// struct Job;
type Job = Box<dyn FnBox + Send + 'static>;trait FnBox {fn call_box(self: Box<Self>);
}impl <F: FnOnce()> FnBox for F {fn call_box(self: Box<F>) {(*self)()}
}
hello.html
<!DOCTYPE html>
<html lang="en"><head><meta charset="UTF-8"><title>Hello World</title>
</head><body><h1>Hello World</h1><p><?phpecho "Hello World";?></p>
</body></html>
404.html
<!DOCTYPE html>
<html lang="en"><head><meta charset="UTF-8"><title>404</title>
</head><body><h1>Oops!</h1><p>Page not found</p>
</body></html>