Python中的多进程、多线程、协程

Python中的多线程、多进程、协程

一、概述

1. 多线程Thread (threading):

  • 优点:同一个进程中可以启动多个线程,充分利用IO时,cpu进行等待的时间
  • 缺点:相对于进程,多线程只能并发执行,不能利用多CPU,相对于协程,多线程的启动数目有限 ,占用内存资源,并且有线程切换的时间开销
  • 使用场景IO密集型计算、同时运行的任务数据要求不多

2. 多进程Process(multiprocessing):

  • 优点:可以利用多核CPU进行并行计算
  • 缺点:占用资源最多,可启动的数目比线程少
  • 使用场景CPU密集型计算

3. 协程Coroutine(asyncio):

  • 优点:内存开销最少、启动协程的数量最多
  • 缺点:支持的库有限制 (request对用的能使用协程的库为:aiohttp)、代码实现复杂
  • 使用场景IO密集型计算、超多任务运行、有现成的库支持的场景

4. Python比C/C++/Java速度慢的原因:

  1. Python是动态类型语言,边解释边执行。
  2. GIL,无法利用多核CPU并发 执行。

5. GIL

GIL名为:全局解释器锁(Global Interpreter Lock 缩写为: GIL),是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行,即便在多核心处理器上,使用GIL的解释器也只允许同一时刻执行一个线程。这是为了解决多线程之间数据完整性的状态同步问题。

6. 创建工具代码:

这个代码是一个基本的爬虫代码,创建这个代码的目的时为了下面实现多线程或多进程的时候让代码显的更加简洁,进而能更清晰的观察到多线程或多进程的实现方式。

utils.blog_spider.py

import requests
from bs4 import BeautifulSoupurls = [f'https://www.cnblogs.com/#p{page}'for page in range(1, 50 + 1)
]# 请求url,得到html
def craw(url):r = requests.get(url)# print(url, len(r.text))return r.text# 解析html
def parse(html):soup = BeautifulSoup(html, "html.parser")links = soup.find_all("a", class_="post-item-title")return [(link['href'], link.get_text()) for link in links]if __name__ == '__main__':for restult in parse(craw(urls[2])):print(restult)

二、多线程

1. 创建多线程的一般流程:

  1. 准备一个函数

    def my_func(a, b):doing(a, b)
    
  2. 创建一个线程

    import threading  # 导入线程包
    t = threading.Threading(target=my_func, args=(100, 200))  # 第一个参数为一个函数名,第二个参数时传入函数的参数
    
  3. 启动线程

    t.start()
    
  4. 等待结束

    t.join()
    

代码示例:

# 多线程爬虫
import threading
import timeimport utils.blog_spider as bgdef signal_thread():for url in bg.urls:bg.craw(url)def multi_thread():threads = []for url in bg.urls:threads.append(threading.Thread(target=bg.craw, args=(url,)))  # 添加线程for thread in threads:thread.start()for thread in threads:thread.join()if __name__ == '__main__':start = time.time()signal_thread()end = time.time()print("signal_thread cosst :", end - start, "seconds")start = time.time()multi_thread()end = time.time()print("multi_thread cosst :", end - start, "seconds")

输出结果:

在这里插入图片描述

在这里插入图片描述

2. 多线程之间的数据通信queue.Queue

queue.Queue可以用于多线程之间、线程安全的数据通信。

具体流程如下 :

  1. 导入队列库

    import queue
    
  2. 创建Queue

    q = queue.Queue()
    
  3. 添加元素

    q.put(item)
    
  4. 获取元素

    item = q.get()
    
  5. 查询状态

    # 查看元素的多少
    q.qsize()# 判断是否为空
    q.empty()# 判断是否已满
    q.full()
    

代码示例:

# 多线程数据通信
import queue
import utils.blog_spider as bg
import time
import random
import threadingdef do_craw(url_queue: queue.Queue, html_queue: queue.Queue):while True:url = url_queue.get()html = bg.craw(url)html_queue.put(html)   # 加入队列print(threading.current_thread().name, f"craw{url}", "url_queue.size=", url_queue.qsize())time.sleep(random.randint(1, 2))  # 进行随机休眠def do_parse(html_queue: queue.Queue, fout):while True:html = html_queue.get()  # 从队列中取数据results = bg.parse(html)for result in results:fout.write(str(result) + '\n')print(threading.current_thread().name, "results.size=", len(results), "html_queue.size=", html_queue.qsize())time.sleep(random.randint(1, 2))if __name__ == '__main__':url_queue = queue.Queue()html_queue = queue.Queue()for url in bg.urls:url_queue.put(url)for idx in range(3):t = threading.Thread(target=do_craw, args=(url_queue, html_queue), name=f"craw{idx}")t.start()fout = open("data.txt", "w")for idx in range(2):t = threading.Thread(target=do_parse, args=(html_queue, fout), name=f"parse{idx}")t.start()

在这里插入图片描述

3. 线程安全

由于线程的执行随时会切换,这会造成不可预料的结果,出现线程不安全的情况。

Lock用于解决线程安全问题,对线程进行加锁,这样会使得该线程运行结束之后在切换线程。

用法1:try-finally模式

import threading
lock = threading.Lock()   # 设置线程锁
lock.acquire()  # 获得锁
try:# do something
finally:lock.realse()   # 释放锁

用法2:with模式

import threading
lock = threading.Lock()
with lock:# do something

sleep语句一定会导致当前线程阻塞,会进行线程的切换(加锁则不会进行进程的切换)。

代码示例:

# 线程安全
import threading
import timelock = threading.Lock()class Account:def __init__(self, balance):self.balance = balancedef draw(account, amount):with lock:if account.balance >= amount:time.sleep(0.1)print(threading.current_thread().name, "取钱成功")account.balance -= amountprint(threading.current_thread().name, "余额", account.balance)else:print(threading.current_thread().name, "取钱失败,余额不足")if __name__ == '__main__':account = Account(1000)ta = threading.Thread(name="ta", target=draw, args=(account, 800))tb = threading.Thread(name='tb', target=draw, args=(account, 800))ta.start()tb.start()

在这里插入图片描述

4. 线程池

线程池中有一些线程,新来的任务放在任务队列中,在线程池中的线程空闲的时候会自动处理任务队列里的任务。

使用线程池的好处:

  1. 提升性能
    因为减去了大量新建、终止线程的开销,重用了线程资源;
  2. 使用场景
    适合处理突发性大量请求或需要大量线程完成任务,但实际任务处理时间较短。
  3. 防御功能
    能有效避免系统因为创建线程过多,而导致系统负荷过大相应变慢的问题。
  4. 代码优势
    使用线程池的语法比自己新建线程执行线程更加简洁。

使用方法:

  1. 用法1map函数,注意map结果和入参是顺序对应的

    from concurrent.futures import ThreadPoolExecutor, as_completed
    with ThreadPoolExecutor() as pool:results = pool.map(craw, urls)for result in results:print(result)
    
  2. 用法2future模式更强大,注意如果用as_complete方法,这样的线程执行顺序是不固定的,但是相应的效率会更高。(as_completed方法是不管哪个线程先执行完了,都会直接返回,不用按照顺序返回)

    with ThreadPoolExecutor() as pool:futures = [pool.submit(craw, url) for url in urls]for future in futures:print(future.result())for future in as_complated(futures):print(future.result())
    

代码示例:

import concurrent.futures
import utils.blog_spider as bg# 进行爬取html
with concurrent.futures.ThreadPoolExecutor() as pool:htmls = pool.map(bg.craw, bg.urls)  # 加入线程池htmls = list(zip(bg.urls, htmls))for url, html in htmls:print(url, len(html))print("craw over")# 进行解析html
with concurrent.futures.ThreadPoolExecutor() as pool:futures = {}for url, html in htmls:future = pool.submit(bg.parse, html)  # 加入线程池futures[future] = urlfor future, url in futures.items():print(url, future.result())# 顺序不定# for future in concurrent.futures.as_completed(futures):#     url = futures[future]#     print(url, future.result())

三、多进程

CPU密集型计算线程的自动的切换反而变成了负担,多线程甚至减慢了运行速度。multiprocessing模块就是Python为了解决GIL缺陷引入的一个模块,原理是用多进程在多CPU上并行执行。

多进程multiprocessing知识梳理(对比多线程threading

多线程:

  1. 引入模块

    from threading import Thread
    
  2. 新建

    t = Thread(target=func, args=(100, ))
    
  3. 启动

    t.start()
    
  4. 等待结束

    t.join()
    
  5. 数据通信

    import queue
    q = queue.Queue()
    q.put(item)
    item = q.get()
    
  6. 线程安全加锁

    from threading import Lock
    lock = Lock()
    with lock:# do something
    
  7. 池化技术

    from concurrent.futures import ThreadPoolExecutor
    with ThreadPoolExecutor() as executor:# 方法1results = executor.map(func, [1, 2, 3])# 方法2future = executor.submit(func, 1)result = future.result()
    

多进程:

  1. 引入模块:

    from multiprocessing import Process
    
  2. 新建

    p = process(target=f, args=('bob', ))
    
  3. 启动

    p.start()
    
  4. 等待结束

    p.join()
    
  5. 数据通信

    from multiprocessing import Queue
    q = Queue()
    q.put([42, None, 'hello'])
    item = q.get()
    
  6. 线程安全加锁

    from  multiprocessing import Lock
    lock = Lock()
    with lock:# do something
    
  7. 池化技术

    from concurrent.futures import ProcessPoolExecutor
    with ProcessPoolExecutor() as executor:# 方法1:results = executor.map(func, [1, 2, 3])# 方法2:future = executor.submit(func, 1)result = funture.result()
    

代码示例:

import math
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor  # 导入线程池与进程池
import timePRIMES = [112272535095293] * 100def is_prime(n):'''判断一个数是否是素数:param n::return:'''if n < 2:return Falseif n == 2:return Trueif n % 2 == 0:return Falsesqrt_n = int(math.floor(math.sqrt(n)))for i in range(3, sqrt_n + 1, 2):if n % i == 0:return Falsereturn Truedef signal_thread():'''单线程:return:'''for number in PRIMES:is_prime(number)def multi_thread():'''多线程:return:'''with ThreadPoolExecutor() as pool:pool.map(is_prime, PRIMES)def multi_process():'''多进程:return:'''with ProcessPoolExecutor() as pool:pool.map(is_prime, PRIMES)if __name__ == '__main__':start = time.time()signal_thread()end = time.time()print("signal_thread, cost:", end - start, "seconds")start = time.time()multi_thread()end = time.time()print("multi_thread, cost:", end - start, "seconds")start = time.time()multi_process()end = time.time()print("multi_process, cost:", end - start, "seconds")

输出结果:

在这里插入图片描述

四、协程

协程是在单线程内实现并发

  • 核心原理:用一个超级循环(其实就是while Treu)循环
  • 配合IO多路复用原理(IOCPU可以干其他事情)

异步IO库介绍:asyncio

1. 创建协程的一般流程:

  1. 导入库

    import asyncio
    
  2. 获取事件循环

    loop = asyncio.get_enent_loop()
    
  3. 定义协程

    async def myfunc(url):await get_url(url)
    
  4. 创建task任务列表

    task = [loop.create_task(myfunc(url)) for url in urls]
    
  5. 执行爬虫事件列表

    loop.run_until_complete(asyncio.wait(tasks))
    

注意:要用在异步IO编程中,依赖的库必须支持异步IO特性,爬虫引用中:requests不支持异步,需要用aiohttp

代码示例:

import asyncio
import aiohttp
import utils.blog_spider as bg   # 该模块是工具代码中的模块
import timeasync def async_craw(url):print("craw url:", url)async with aiohttp.ClientSession() as session:async with session.get(url) as resp:result = await resp.text()print(f"craw url: {url}, {len(result)}")loop = asyncio.get_event_loop()
tasks = [loop.create_task(async_craw(url=url))   # 添加任务for url in bg.urls]start = time.time()
loop.run_until_complete(asyncio.wait(tasks))   # 启动协程
end = time.time()
print("use time seconds :", end - start)

在这里插入图片描述

2. 控制协程的并发度

可以使用信号量(semaphore)来控制并发度

实现方式1:

sem = asyncio.Semaphore(10)
# later
async with sem:# work with shared resource

实现方式2:

sem = asyncio.Semaphore(10)
# later
await sem.acquire()
try:# work with shared resource
finally:sem.release()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/10180.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深入理解JavaScript数组

深入理解JavaScript数组&#xff1a;玩转数据结构 前言数组的基础知识什么是数组如何创建一个数组如何访问数组元素 数组的常用操作数组的长度 length更改数组项数组的遍历数组类型的检测数组的添加和删除元素&#xff08;头尾操作 push()、pop()、unshift()、shift()&#xff…

Android App开机启动

清单文件 <?xml version"1.0" encoding"utf-8"?> <manifest xmlns:android"http://schemas.android.com/apk/res/android"xmlns:tools"http://schemas.android.com/tools"><uses-permission android:name"andro…

enable_shared_from_this使用笔记

解决了&#xff1a; 不能通过原指针增加引用次数的问题 &#xff0c;通过weak_ptr实现。 class MyCar:public std::enable_shared_from_this<MyCar> { public:~MyCar() { std::cout << "free ~Mycar()" << std::endl; } };int main() { MyCar* _…

centos7下fastdfs分布式部署

需要先在159及120服务器上安装fastdfs服务 可参考&#xff1a;centos7部署FastDFS服务_centos fastdfs 增加到服务中-CSDN博客 1、整体架构&#xff0c;使用3个服务器节点&#xff0c;其中两台为跟踪器节点及存储节点&#xff0c;一台服务器搭建nginx做统一入口进行负载均衡 …

K8S安装并搭建集群

1. 先给每台机器安装docker环境 卸载旧的docker yum remove docker \docker-client \docker-client-latest \docker-common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-engine 配置docker的yum库 yum install -y yum-utilsyum-config-manager --a…

JavaEE之线程(3)_线程的开始、中断、等待、休眠线程、线程的状态

前言 在本栏的上一节&#xff08;https://blog.csdn.net/2301_80653026/article/details/138500558&#xff09;&#xff0c;我们重点讲解了五种不同的创建线程的方式&#xff0c;我们还介绍了Tread类的常见构造方法和常见属性&#xff0c;在这一节中我们将会继续介绍Tread类。…

简单的Python HTML 输出

1、问题背景 一名初学者在尝试将 Python 脚本输出到网页上时遇到了一些问题。他当前使用 Python 和 HTML 进行开发&#xff0c;并且遇到了以下问题&#xff1a; 担心自己的代码过于复杂&#xff0c;尤其是 WebOutput() 函数。希望通过 JavaScript 使用 HTML 模板文件更新数据。…

Go语言高级特性

目录 1. 并发编程 1.1 Goroutine轻量级线程 1.2 Channel通信机制 1.3 WaitGroup等待组 1.4 Mutex互斥锁 2. 垃圾回收机制 2.1 内存管理介绍 2.2 垃圾回收原理 2.3 性能调优策略 2.4 常见问题及解决方案 3. 接口与反射 3.1 接口定义与实现 3.2 空接口与类型断言 3…

【PG数据库】PostgreSQL 日志归档详细操作流程

1.1 日志归档的目的 pg数据库日志归档是将PostgreSQL数据库的日志文件进行归档的过程。 归档的主要目的是为了保留历史数据&#xff0c;确保数据的一致性和完整性&#xff0c;同时为数据恢复提供必要的支持。 pg数据库日志归档的目的包括&#xff1a; 1.数据恢复&#xff1…

Tomcat中服务启动失败,如何查看启动失败日志?

1. 查看 localhost.log 这个日志文件通常包含有关特定 web 应用的详细错误信息。运行以下命令查看 localhost.log 中的错误&#xff1a; sudo tail -n 100 /opt/tomcat/latest/logs/localhost.YYYY-MM-DD.log请替换 YYYY-MM-DD 为当前日期&#xff0c;或选择最近的日志文件日…

【打工日常】云原生之搭建一款轻量级的开源Markdown笔记工具

一、flatnotes介绍 1.flatnotes简介 一个自托管的&#xff0c;无数据库的笔记网络应用程序&#xff0c;利用平面文件夹的markdown文件进行存储。 2.项目特点 A.干净简单的用户界面B.响应式移动端界面C.原生/所见即所得Markdown编辑模式D.高级搜索功能E.笔记“标签”功能F.…

Java入门基础学习笔记12——变量详解

变量详解&#xff1a; 变量里的数据在计算机中的存储原理。 二进制&#xff1a; 只有0和1&#xff0c; 按照逢2进1的方式表示数据。 十进制转二进制的算法&#xff1a; 除二取余法。 6是110 13是1101 计算机中表示数据的最小单元&#xff1a;一个字节&#xff08;byte&…

【mysql篇】执行delete删除大量数据后,磁盘未清空,为什么?

目录 迁移脚本删除数据以及备份数据 解决方法OPTIMIZE TABLE二进制日志按月生成数据 最近某个项目虽说用户量不大&#xff0c;但是&#xff0c;单表的数据量越来越大&#xff0c;mysql一般单表超过千万级别后&#xff0c;性能直线下降&#xff0c;所以利用shardingphere按月做了…

用python进行接口测试(详细教程)

前言 其实我觉得接口测试很简单&#xff0c;比一般的功能测试还简单&#xff0c;现在找工作好多公司都要求有接口测试经验&#xff0c;也有好多人问我什么是接口测试&#xff0c;本着不懂也要装懂的态度&#xff0c;我会说&#xff1a;所谓接口测试就是通过测试不同情况下的入…

mac内存不足怎么清理?有哪些免费的软件工具?

当你的mac电脑使用一段时间之后&#xff0c;你可能就会发现&#xff0c;原本非常流畅的运行开始出现卡顿的现象&#xff0c;此时正是mac内存不足的外在表现。可mac内存不足怎么清理呢&#xff0c;别急&#xff0c;清理内存的方式方法有很多&#xff0c;小编将结合实际情况给大家…

免费实用在线AI工具集合

免费在线工具 https://orcc.online/ 在线录屏 https://orcc.online/recorder pdf在线免费转word文档 https://orcc.online/pdf 时间戳转换 https://orcc.online/timestamp Base64 编码解码 https://orcc.online/base64 URL 编码解码 https://orcc.online/url Hash(MD5/SHA…

网络安全专业岗位详解+自学学习路线图

很多网安专业同学一到毕业就开始迷茫&#xff0c;不知道自己能去做哪些行业&#xff1f;其实网络安全岗位还是蛮多的&#xff0c;下面我会介绍一些网络安全岗位&#xff0c;大家可以根据自身能力与喜好决定放哪个方向发展。 渗透测试/Web安全工程师 主要是模拟黑客攻击&#…

苹果电脑怎么安装crossover 如何在Mac系统中安装CrossOver CrossOver Mac软件安装说明

很多Mac的新用户在使用电脑的过程中&#xff0c;常常会遇到很多应用软件不兼容的情况。加上自己以前一直都是用Windows系统&#xff0c;总觉得Mac系统用得很难上手。 其实&#xff0c;用户可以在Mac上安装CrossOver&#xff0c;它支持用户在Mac上运行Windows软件&#xff0c;例…

Win10鼠标右键新增软件快速打开项

1、cmd 运行 regedit 2、找到该位置的shell文件夹 3、在shell文件夹下创建需要添加的软件名的文件夹&#xff0c;并修改相关信息 4、新建子文件夹command&#xff0c;并修改相关信息 5、效果

centos7下vim命令笔记-查找字符

一.查找字符 在 Vim 中&#xff0c;你可以使用正则表达式来查找字符。以下是一些基本的查找命令&#xff1a; 正向查找&#xff1a;在命令模式下&#xff0c;输入 / 后跟你想要查找的字符或者表达式&#xff0c;然后按回车。例如&#xff0c;查找字符 a&#xff0c;你可以输入…