一、线程
1、概念
-
线程
在一个进程的内部,要同时干多件事,就需要同时运行多个“子任务”,我们把进程内的这些“子任务”叫做线程
是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。在Unix System V及SunOS中也被称为轻量进程(lightweight processes),但轻量进程更多指内核线程(kernel thread),而把用户线程(user thread)称为线程
线程通常叫做轻型的进程。线程是共享内存空间的并发执行的多任务,每一个线程都共享一个进程的资源
线程是最小的执行单元,而进程由至少一个线程组成。如何调度进程和线程,完全由操作系统决定,程序自己不能决定什么时候执行,执行多长时间
-
多线程
是指从软件或者硬件上实现多个线程并发执行的技术。具有多线程能力的计算机因有硬件支持而能够在同一时间执行多于一个线程,进而提升整体处理性能。具有这种能力的系统包括对称多处理机、多核心处理器以及芯片级多处理(Chip-level multithreading)或同时多线程(Simultaneous multithreading)处理器。
-
主线程:
任何进程都会有一个默认的主线程 如果主线程死掉 子线也程也死掉 所以 子线程依赖于主线程
-
GIL
其他语言,CPU 是多核是支持多个线程同时执行。但在 Python 中,无论是单核还是多核,同时只能由一个线程在执行。其根源是 GIL 的存在。
GIL 的全称是 Global Interpreter Lock(全局解释器锁),来源是 Python 设计之初的考虑,为了数据安全所做的决定。某个线程想要执行,必须先拿到 GIL,我们可以把 GIL 看作是“通行证”,并且在一个 Python 进程中,GIL 只有一个。拿不到通行证的线程,就不允许进入 CPU 执行。
并且由于 GIL 锁存在,Python 里一个进程永远只能同时执行一个线程(拿到 GIL 的线程才能执行),这就是为什么在多核CPU上,Python 的多线程效率并不高的根本原因。
-
模块
_thread模块:低级模块
threading模块:高级模块,对_thread进行了封装
2、使用_thread 模块 去创建线程
-
导入模块
import _thread
-
开启线程
_thread.start_new_thread(函数名,参数)
-
注意:
-
参数必须为元组类型
-
如果主线程执行完毕 子线程就会死掉
-
如果线程不需要传参数的时候 也必须传递一个空元组占位
-
-
实例
import win32api import _thread #引入线程的模块 比较老的模块 新的 threading def run(i):win32api.MessageBox(0,"您的{}号大宝贝上线了".format(i),"来自凤姐以及陆源凯的问候",2) for i in range(5):_thread.start_new_thread(run,(i,)) #发起多个线程 传参的情况 参数为元组# _thread.start_new_thread(run,()) #发起多个线程 不传参 页需要俩个参数 第二个为空元组 print('会先执行我') #如果主线程 不死 那么 所有的次线程 就都会正常执行 while True:pass
提高效率
import _thread import time def run():for i in range(10):print(i,'------------')time.sleep(1) """ for i in range(5): #50秒run() """ for i in range(5):_thread.start_new_thread(run,()) #发起五个线程去执行 时间大大缩短 for i in range(10): #循环10秒 计算 线程执行完毕所需要的时间 类似与一个劫停time.sleep(1) print('xxxx')
3、threading创建线程
-
导入模块
import threading
-
threading创建线程的方式
myThread = threading.Thread(target=函数名[,args=(参数,),name="你指定的线程名称"])
参数
-
target:指定线程执行的函数
-
name:指定当前线程的名称
-
args:传递个子线程的参数 ,(元组形式)
-
-
开启线程
myThread.start()
-
线程等待
myThread.join()
-
返回当前线程对象
-
threading.current_thread()
-
threading.currentThread()
-
-
获取当前线程的名称
-
threading.current_thread().name
-
threading.currentThread().getName()
-
-
设置线程名
setName()
Thread(target=fun).setName('name')
-
返回主线程对象
threading.main_thread()
-
获取当前活着的所有线程总数,包括主线程main
threading.active_count() 或 threading.activeCount()
-
判断线程是不是活的,即线程是否已经结束
-
Thread.is_alive()
-
Thread.isAlive()
-
-
线程守护
设置子线程是否随主线程一起结束
有一个布尔值的参数,默认为False,该方法设置子线程是否随主线程一起结束 True一起结束
Thread.setDaemon(True)
还有个要特别注意的:必须在start() 方法调用之前设置
if __name__ == '__main__':t = Thread(target=fun, args=(1,))t.setDaemon(True)t.start()print('over')
-
获取当前所有的线程名称
threading.enumerate() # 返回当前包含所有线程的列表
4、启动线程实现多任务
import time import threading def run1():# 获取线程名字print("启动%s子线程……"%(threading.current_thread().name))for i in range(5):print("lucky is a good man")time.sleep(1) def run2(name, word):print("启动%s子线程……" % (threading.current_thread().name))for i in range(5):print("%s is a %s man"%(name, word))time.sleep(1) if __name__ == "__main__":t1 = time.clock()# 主进程中默认有一个线程,称为主线程(父线程)# 主线程一般作为调度而存在,不具体实现业务逻辑 # 创建子线程# name参数可以设置线程的名称,如果不设置按顺序设置为Thread-nth1 = threading.Thread(target=run1, name="th1")th2 = threading.Thread(target=run2, args=("lucky", "nice")) #启动th1.start()th2.start() #等待子线程结束th1.join()th2.join() t2 = time.clock()print("耗时:%.2f"%(t2-t1))
5、线程间共享数据
概述
多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在每个进程中,互不影响。而多线程中,所有变量都由所有线程共享。所以,任何一个变量都可以被任意一个线程修改,因此,线程之间共享数
据最大的危险在于多个线程同时修改一个变量,容易把内容改乱了。
import time import threading money = 0 def run1():global moneymoney = 1print("run1-----------", money)print("启动%s子线程……"%(threading.current_thread().name))for i in range(5):print("lucky is a good man")time.sleep(1) def run2(name, word):print("run2-----------", money)print("启动%s子线程……" % (threading.current_thread().name))for i in range(5):print("%s is a %s man"%(name, word))time.sleep(1) if __name__ == "__main__":t1 = time.clock() th1 = threading.Thread(target=run1, name="th1")th2 = threading.Thread(target=run2, args=("lucky", "nice")) th1.start()th2.start()th1.join()th2.join() t2 = time.clock()print("耗时:%.2f"%(t2-t1))print("main-----------", money)
6、Lock线程锁(多线程内存错乱问题)
-
概述
Lock锁是线程模块中的一个类,有两个主要方法:acquire()和release() 当调用acquire()方法时,它锁定锁的执行并阻塞锁的执行,直到其他线程调用release()方法将其设置为解锁状态。锁帮助我们有效地访问程序中的共享资源,以防止数据损坏,它遵循互斥,因为一次只能有一个线程访问特定的资源。
-
作用
避免线程冲突
-
锁:确保了这段代码只能由一个线程从头到尾的完整执行阻止了多线程的并发执行,包含锁的某段代码实际上只能以单线程模式执行,所以效率大大的降低了 由于可以存在多个锁,不同线程持有不同的锁,并试图获取其他的锁, 可能造成死锁,导致多个线程只能挂起,只能靠操作系统强行终止
-
注意:
-
当前线程锁定以后 后面的线程会等待(线程等待/线程阻塞)
-
需要release 解锁以后才正常
-
不能重复锁定
-
-
内存错乱实例
import threading import time i = 1 def fun1():global itime.sleep(3)for x in range(1000000):i += xi -= xprint('fun1-----', i) def fun2():global itime.sleep(3)for x in range(1000000):i += xi -= xprint('fun2----', i)
t1 = threading.Thread(target=fun1) t2 = threading.Thread(target=fun2) t1.start() t2.start() t1.join() t2.join() print('mian----',i)
问题:两个线程对同一数据同时进行读写,可能造成数据值的不对,我们必须保证一个线程在修改money时其他的线程一定不能修改,线程锁解决数据混乱问题 + 线程锁Lock使用方法 ```pythonimport threading# 创建一个锁lock = threading.Lock()lock.acquire() #进行锁定 锁定成功返回Truelock.release() #进行解锁
-
Lock锁的使用:
import threading#创建一个lock对象 lock = threading.Lock()#初始化共享资源 abce = 0def sumOne():global abce#锁定共享资源lock.acquire()abce = abce + 1#释放共享资源lock.release()def sumTwo():global abce#锁定共享资源lock.acquire()abce = abce + 2#释放共享资源lock.release()#调用函数sumOne() sumTwo() print(abce)
在上面的程序中,lock是一个锁对象,全局变量abce是一个共享资源,sumOne()和sumTwo()函数扮作两个线程,在sumOne()函数中共享资源abce首先被锁定,然后增加了1,然后abce被释放。sumTwo()函数执行类似操作。 两个函数sumOne()和sumTwo()不能同时访问共享资源abce,一次只能一个访问共享资源。
-
解决资源混乱
import threading Lock = threading.Lock() i = 1 def fun1():global iif Lock.acquire(): # 判断是否上锁 锁定成功for x in range(1000000):i += xi -= xLock.release()print('fun1-----', i) def fun2():global iif Lock.acquire(): # 判断是否上锁 锁定成功for x in range(1000000):i += xi -= xLock.release()print('fun2----', i)
t1 = threading.Thread(target=fun1) t2 = threading.Thread(target=fun2) t1.start() t2.start() t1.join() t2.join() print('mian----',i)
+ 线程锁的简写(不需要手动解锁)
with lock: 代码段
实例(将上面上锁的代码段更改为) ```pythondef run2():for i in range(1000000):#简写,功能与上面一致with lock:...
结果一样
7、Timer定时执行
-
概述
Timer是Thread的子类,可以指定时间间隔后在执行某个操作
-
使用
import threading def go():print("走我了") # t = threading.Timer(秒数,函数名) t = threading.Timer(3,go) t.start() print('我是主线程的代码')
8、线程池ThreadPoolExecutor
-
模块
concurrent.futures
-
导入 Executor[ɪɡˈzekjətər]
from concurrent.futures import ThreadPoolExecutor
-
方法
-
submit(fun[, args]) 传入放入线程池的函数以及传参
-
map(fun[, iterable_args]) 统一管理
区别:
-
submit与map参数不同 submit每次都需要提交一个目标函数和对应参数 map只需要提交一次目标函数 目标函数的参数 放在一个可迭代对象(列表、字典...)里就可以
-
-
使用
from concurrent.futures import ThreadPoolExecutor import time # import threadpool #线程池 统一管理 线程 def go(str):print("hello",str)time.sleep(2)
name_list = ["lucky","卢yuan凯","姚青","刘佳俊","何必喆"] pool = ThreadPoolExecutor(5) #控制线程的并发数
+ 线程池运行的方式 + 方式一 ```python# 逐一传参扔进线程池for i in name_list:pool.submit(go, i)
简写 ```python all_task = [pool.submit(go, i) for i in name_list] ``` + 方式二 ```python # 统一放入进程池使用 pool.map(go, name_list) # 多个参数 # pool.map(go, name_list1, name_list2...) ``` **map(fn, *iterables, timeout=None)** fn: 第一个参数 fn 是需要线程执行的函数; iterables:第二个参数接受一个可迭代对象; timeout: 第三个参数 timeout 跟 wait() 的 timeout 一样,但由于 map 是返回线程执行的结果,如果 timeout小于线程执行时间会抛异常 TimeoutError。 **注意:**使用 map 方法,无需提前使用 submit 方法,map 方法与 python 高阶函数 map 的含义相同,都是将序列中的每个元素都执行同一个函数。
-
获取返回值
-
方式一
import random from concurrent.futures import ThreadPoolExecutor, as_completed import time # import threadpool #线程池 统一管理 线程 def go(str):print("hello", str)time.sleep(random.randint(1, 4))return str
-
name_list = ["lucky","卢yuan凯","姚青","刘佳俊","何必喆"] pool = ThreadPoolExecutor(5) #控制线程的并发数 all_task = [pool.submit(go, i) for i in name_list] # 统一放入进程池使用 for future in as_completed(all_task):print("finish the task")obj_data = future.result()print("obj_data is ", obj_data) ``` **as_completed** 当子线程中的任务执行完后,使用 result() 获取返回结果 该方法是一个生成器,在没有任务完成的时候,会一直阻塞,除非设置了 timeout。 当有某个任务完成的时候,会yield这个任务,就能执行for循环下面的语句,然后继续阻塞住,循环到所有任务结束,同时,先完成的任务会先返回给主线程
-
方式二
for result in pool.map(go, name_list):print("task:{}".format(result))
-
wait 等待线程执行完毕 在继续向下执行
from concurrent.futures import ThreadPoolExecutor, wait import time # 参数times用来模拟下载的时间 def down_video(times):time.sleep(times)print("down video {}s finished".format(times))return times
executor = ThreadPoolExecutor(max_workers=2)
通过submit函数提交执行的函数到线程池中,submit函数立即返回,不阻塞
task1 = executor.submit(down_video, (3)) task2 = executor.submit(down_video, (1))
done方法用于判定某个任务是否完成
print("任务1是否已经完成:", task1.done())
time.sleep(4)
print(wait([task1, task2])) print('wait') print("任务1是否已经完成:", task1.done()) print("任务1是否已经完成:", task2.done())
result方法可以获取task的执行结果
print(task1.result())
+ **线程池与线程对比** 线程池是在程序运行开始,创建好的n个线程,并且这n个线程挂起等待任务的到来。而多线程是在任务到来得时候进行创建,然后执行任务。线程池中的线程执行完之后不会回收线程,会继续将线程放在等待队列中;多线程程序在每次任务完成之后会回收该线程。由于线程池中线程是创建好的,所以在效率上相对于多线程会高很多。线程池也在高并发的情况下有着较好的性能;不容易挂掉。多线程在创建线程数较多的情况下,很容易挂掉。 ### 9、队列模块queue + 导入队列模块 import queue + 概述 queue是python标准库中的线程安全的队列(FIFO)实现,提供了一个适用于多线程编程的先进先出的数据结构,即队列,用来在生产者和消费者线程之间的信息传递 + 基本FIFO队列 queue.Queue(maxsize=0) FIFO即First in First Out,先进先出。Queue提供了一个基本的FIFO容器,使用方法很简单,**maxsize是个整数**,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果**maxsize小于或者等于0**,队列大小没有限制。 举个栗子: ```pythonimport queue q = queue.Queue() for i in range(5):q.put(i) while not q.empty():print q.get()
-
一些常用方法
-
task_done()
意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕。
如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)。
-
join()
阻塞调用线程,直到队列中的所有任务被处理掉。
只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done()(意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。
-
put(item[, block[, timeout]])
将item放入队列中。
-
如果可选的参数block为True且timeout为空对象(默认的情况,阻塞调用,无超时)。
-
如果timeout是个正整数,阻塞调用进程最多timeout秒,如果一直无空空间可用,抛出Full异常(带超时的阻塞调用)。
-
如果block为False,如果有空闲空间可用将数据放入队列,否则立即抛出Full异常
-
其非阻塞版本为
put_nowait
等同于put(item, False)
-
-
get([block[, timeout]])
从队列中移除并返回一个数据。block跟timeout参数同
put
方法其非阻塞方法为
get_nowait()
相当与get(False)
-
empty()
如果队列为空,返回True,反之返回False
-
10、案例
中国历年电影票房 | 中国票房 | 中国电影票房排行榜
我们抓取从1994年到2021年的电影票房.
import requests from lxml import etree from concurrent.futures import ThreadPoolExecutor def get_page_source(url):resp = requests.get(url)resp.encoding = 'utf-8'return resp.text def parse_html(html):try:tree = etree.HTML(html)trs = tree.xpath("//table/tbody/tr")[1:]result = []for tr in trs:year = tr.xpath("./td[2]//text()")year = year[0] if year else ""name = tr.xpath("./td[3]//text()")name = name[0] if name else ""money = tr.xpath("./td[4]//text()")money = money[0] if money else ""d = (year, name, money)if any(d):result.append(d)return resultexcept Exception as e:print(e) # 调bug专用 def download_one(url, f):page_source = get_page_source(url)data = parse_html(page_source)for item in data:f.write(",".join(item))f.write("\n") def main():f = open("movie.csv", mode="w", encoding='utf-8')lst = [str(i) for i in range(1994, 2022)]with ThreadPoolExecutor(10) as t:# 方案一# for year in lst:# url = f"http://www.boxofficecn.com/boxoffice{year}"# # download_one(url, f)# t.submit(download_one, url, f) # 方案二t.map(download_one, (f"http://www.boxofficecn.com/boxoffice{year}" for year in lst), (f for i in range(len(lst)))) if __name__ == '__main__':main()
二、进程VS线程
-
多任务的实现原理
首先,要实现多任务,通常我们会设计Master-Worker模式,Master负责分配任务,Worker负责执行任务,因此,多任务环境下,通常是一个Master,多个Worker。
如果用多进程实现Master-Worker,主进程就是Master,其他进程就是Worker。
如果用多线程实现Master-Worker,主线程就是Master,其他线程就是Worker。
-
多进程
主进程就是Master,其他进程就是Worker
-
优点
稳定性高:多进程模式最大的优点就是稳定性高,因为一个子进程崩溃了,不会影响主进程和其他子进程。(当然主进程挂了所有进程就全挂了,但是Master进程只负责分配任务,挂掉的概率低)著名的Apache最早就是采用多进程模式。
-
缺点
创建进程的代价大:在Unix/Linux系统下,用fork调用还行,在Windows下创建进程开销巨大
操作系统能同时运行的进程数也是有限的:在内存和CPU的限制下,如果有几千个进程同时运行,操作系统连调度都会成问题
-
-
多线程
主线程就是Master,其他线程就是Worker
-
优点
多线程模式通常比多进程快一点,但是也快不到哪去
在Windows下,多线程的效率比多进程要高
-
缺点
任何一个线程挂掉都可能直接造成整个进程崩溃:所有线程共享进程的内存。在Windows上,如果一个线程执行的代码出了问题,你经常可以看到这样的提示:“该程序执行了非法操作,即将关闭”,其实往往是某个线程出了问题,但是操作系统会强制结束整个进程
-
-
计算密集型 vs IO密集型
-
计算密集型(多进程适合计算密集型任务)
要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数
-
IO密集型 (线程适合IO密集型任务)
涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用
-
-
GIL
多线程存在GIL锁,同一时刻只能有一条线程执行;在多进程中,每一个进程都有独立的GIL,不会发生GIL冲突;但在这个例子中,爬虫属于IO密集型,多进程适用于CPU计算密集型,所以用时较长,速度慢于多线程并发。
高效编程
一、多任务原理
-
概念
现代操作系统比如Mac OS X,UNIX,Linux,Windows等,都是支持“多任务”的操作系统
-
什么叫多任务?
就是操作系统可以同时运行多个任务
-
单核CPU实现多任务原理
操作系统轮流让各个任务交替执行,QQ执行2us(微秒),切换到微信,在执行2us,再切换到陌陌,执行2us……。表面是看,每个任务反复执行下去,但是CPU调度执行速度太快了,导致我们感觉就像所有任务都在同时执行一样
-
多核CPU实现多任务原理
真正的秉性执行多任务只能在多核CPU上实现,但是由于任务数量远远多于CPU的核心数量,所以,操作系统也会自动把很多任务轮流调度到每个核心上执行
-
并发与并行
-
并发
CPU调度执行速度太快了,看上去一起执行,任务数多于CPU核心数
-
并行
真正一起执行,任务数小于等于CPU核心数
-
并发是逻辑上的同时发生,并行更多是侧重于物理上的同时发生。
-
-
实现多任务的方式
-
多进程模式
启动多个进程,每个进程虽然只有一个线程,但是多个进程可以一起执行多个任务
-
多线程模式
启动一个进程,在一个进程的内部启动多个线程,这样多个线程也可以一起执行多个任务
-
多进程+多线程
启动多个进程,每个进程再启动多个线程
-
协程
-
多进程+协程
-
二、进程
1、概念
-
什么是进程?
是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。
-
对于操作系统
一个任务就是一个进程。比方说打开浏览器就是启动一个浏览器的进程,在打开一个记事本就启动一个记事本进程,如果打开两个记事本就启动两个记事本进程
2、使用进程
-
单进程现象
需要等待代码执行完后再执行下一段代码
import time def run1():while 1:print("lucky is a good man")time.sleep(1) def run2():while 1:print("lucky is a nice man")time.sleep(1) if __name__ == "__main__":run1()# 不会执行run2()函数,只有上面的run1()结束才能执行run2()run2()
-
启动进程实现多任务
-
multiprocessing模块
跨平台的多进程模块,提供了一个Process类用来示例化一个进程对象
-
Process类
作用:创建进程(子进程)
-
__name__
这是 Windows 上多进程的实现问题。在 Windows 上,子进程会自动 import 启动它的这个文件,而在 import 的时候是会执行这些语句的。如果你这么写的话就会无限递归创建子进程报错。所以必须把创建子进程的部分用那个 if 判断保护起来,import 的时候
__name__
不是__main__
,就不会递归运行了。参数 说明 target 指定进程执行的任务 args 给进程函数传递的参数,是一个元组 注意:此时进程被创建,但是不会启动进程执行
-
启动进程实现多任务
from multiprocessing import Process
创建子进程
P = Process(target=run,args=("nice",),name='当前进程名称')
-
target指定 子进程运行的函数
-
args 指定传递的参数 , 是元组类型
-
启动进程:Process对象.start()
获取进程信息
-
os.getpid() 获取当前进程id号
-
os.getppid() 获取当前进程的父进程id号
-
multiprocessing.current_process().name 获取当前进程名称
父子进程的先后顺序
-
默认 父进程的结束不能影响子进程 让父进程等待子进程结束再执行父进程
-
p.join() 阻塞当前进程,直到调用join方法的那个进程执行完,再继续执行当前进程。
-
全局变量在过个进程中不能共享
注意: 在子线程中修改全局变量时对父进程中的全局变量没有影响
-
-
示例代码
import time from multiprocessing import Process def run1(name):while 1:print("%s is a good man"%name)time.sleep(1) def run2():while 1:print("lucky is a nice man")time.sleep(1) if __name__ == "__main__":# 程序启动时的进程称为主进程(父进程)# 创建进程并启动p = Process(target=run1, args=("lucky",))p.start() # 主进程执行run2()函数run2()
-
-
主进程负责调度
主进程主要做的是调度相关的工作,一般不负责具体业务逻辑
import time from multiprocessing import Process def run1():for i in range(7):print("lucky is a good man")time.sleep(1) def run2(name, word):for i in range(5):print("%s is a %s man"%(name, word))time.sleep(1) if __name__ == "__main__":t1 = time.time() # 创建两个进程分别执行run1、run2p1 = Process(target=run1)p2 = Process(target=run2, args=("lucky", "cool")) # 启动两个进程p1.start()p2.start() # 查看耗时t2 = time.time()print("耗时:%.2f"%(t2-t1))
-
父子进程的先后顺序
主进程的结束不能影响子进程,所以可以等待子进程的结束再结束主进程,等待子进程结束,才能继续运行主进程
p.join() 阻塞当前进程,直到调用join方法的那个进程执行完,再继续执行当前进程。
import time from multiprocessing import Process def run1():for i in range(7):print("lucky is a good man")time.sleep(1) def run2(name, word):for i in range(5):print("%s is a %s man"%(name, word))time.sleep(1) if __name__ == "__main__":t1 = time.time() p1 = Process(target=run1)p2 = Process(target=run2, args=("lucky", "cool")) p1.start()p2.start() # 主进程的结束不能影响子进程,所以可以等待子进程的结束再结束主进程# 等待子进程结束,才能继续运行主进程p1.join()p2.join() t2 = time.time()print("耗时:%.2f"%(t2-t1))
3、全局变量在多个子进程中不能共享
原因:
在创建子进程时对全局变量做了一个备份,父进程中num变量与子线程中的num不是一个变量
from multiprocessing import Process #全局变量在进程中 不能共享 num = 10 def run():print("我是子进程的开始")global numnum+=1print(num)print("我是子进程的结束") if __name__=="__main__":p = Process(target=run)p.start()p.join() print(num)
尝试列表是否能共享
from multiprocessing import Process #全局变量在进程中 不能共享 mylist = [] def run():print("我是子进程的开始")global mylistmylist.append(1)mylist.append(2)mylist.append(3)print("我是子进程的结束") if __name__=="__main__":p = Process(target=run)p.start()p.join() print(mylist)
4、启动大量子进程
-
获取CPU核心数
print('CPU number:' + str(multiprocessing.cpu_count()))
-
导入
from multiprocesssing import Pool
-
开启并发数
pp = Pool([参数]) #开启并发数 默认是你的核心数
-
创建子进程,并放入进程池管理
apply_async为非阻塞模式(并发执行)
pp.apply_async(run,args=(i,)) #args参数 可以为元组 或者是列表[]
-
关闭进程池
pp.close()关闭进程池
-
join()
在调用join之前必须先调用close,调用close之后就不能再继续添加新的进程了
pp.join()
进程池对象调用join,会等待进程池中所有的子进程结束完毕再去执行父进程
-
实例
# Pool类:进程池类 from multiprocessing import Pool import time import random import multiprocessing def run(index):print('CPU number:' + str(multiprocessing.cpu_count()))print("子进程 %d 启动"%(index))t1 = time.time()time.sleep(random.random()* 5+2)t2 = time.time()print("子进程 %d 结束,耗时:%.2f" % (index, t2-t1)) if __name__ == "__main__":print("启动主进程……") # 创建进程池对象# 由于pool的默认值为CPU的核心数,假设有4核心,至少需要5个子进程才能看到效果# Pool()中的值表示可以同时执行进程的数量pool = Pool(2)for i in range(1, 7):# 创建子进程,并将子进程放到进程池中统一管理pool.apply_async(run, args=(i,)) # 等待子进程结束# 关闭进程池:在关闭后就不能再向进程池中添加进程了# 进程池对象在调用join之前必须先关闭进程池pool.close()#pool对象调用join,主进程会等待进程池中的所有子进程结束才会继续执行主进程pool.join() print("结束主进程……")
get方法:获取进程的返回值
from multiprocessing import Lock, Pool import time def function(index):print('Start process: ', index)time.sleep(2)print('End process', index)return index
if name == 'main': pool = Pool(processes=3) for i in range(4): result = pool.apply_async(function, (i,)) print(result.get()) #获取每个 子进程的返回值 print("Started processes") pool.close() pool.join() print("Subprocess done.")
注意:这样来获取每个进程的返回值 那么就会变成单进程 ### 5、map方法 + 概述 如果你现在有一堆数据要处理,每一项都需要经过一个方法来处理,那么map非常适合 比如现在你有一个数组,包含了所有的URL,而现在已经有了一个方法用来抓取每个URL内容并解析,那么可以直接在map的第一个参数传入方法名,第二个参数传入URL数组。 + 概述
```pythonfrom multiprocessing import Poolimport requestsfrom requests.exceptions import ConnectionErrordef scrape(url):try:print(requests.get(url))except ConnectionError:print('Error Occured ', url)finally:print('URL', url, ' Scraped')if __name__ == '__main__':pool = Pool(processes=3)urls = ['https://www.baidu.com','http://www.meituan.com/','http://blog.csdn.net/','http://xxxyxxx.net']pool.map(scrape, urls)
在这里初始化一个Pool,指定进程数为3,如果不指定,那么会自动根据CPU内核来分配进程数。
然后有一个链接列表,map函数可以遍历每个URL,然后对其分别执行scrape方法。
6、单进程与多进程复制文件对比
-
单进程复制文件
import time def copy_file(path, toPath):with open(path, "rb") as fp1:with open(toPath, "wb") as fp2:while 1:info = fp1.read(1024)if not info:breakelse:fp2.write(info)fp2.flush() if __name__ == "__main__":t1 = time.time() for i in range(1, 5):path = r"/Users/lucky/Desktop/file/%d.mp4"%itoPath = r"/Users/lucky/Desktop/file2/%d.mp4"%icopy_file(path, toPath) t2 = time.time()print("单进程耗时:%.2f"%(t2-t1))
-
多进程复制文件
import time from multiprocessing import Pool import os def copy_file(path, toPath):with open(path, "rb") as fp1:with open(toPath, "wb") as fp2:while 1:info = fp1.read(1024)if not info:breakelse:fp2.write(info)fp2.flush() if __name__ == "__main__":t1 = time.time()path = r"/Users/xialigang/Desktop/视频"dstPath = r"/Users/xialigang/Desktop/1视频"fileList = os.listdir(path)pool = Pool() for i in fileList:newPath1 = os.path.join(path, i)newPath2 = os.path.join(dstPath, i)pool.apply_async(copy_file, args=(newPath1, newPath2)) pool.close()pool.join() t2 = time.time()print("耗时:%.2f"%(t2-t1))
7、进程间通信
-
队列共享
-
导入
from multiprocessing import Queue
-
使用
que = Queue() #创建队列
que.put(数据) #压入数据
que.get() #获取数据
-
队列常用函数
Queue.empty() 如果队列为空,返回True, 反之False
Queue.full() 如果队列满了,返回True,反之False
Queue.get([block[, timeout]]) 获取队列,timeout等待时间
Queue.get_nowait() 相当Queue.get(False)
Queue.put(item) 阻塞式写入队列,timeout等待时间
Queue.put_nowait(item) 相当Queue.put(item, False)
-
特点:先进先出
-
注意:
get方法有两个参数,blocked和timeout,意思为阻塞和超时时间。默认blocked是true,即阻塞式。
当一个队列为空的时候如果再用get取则会阻塞,所以这时候就需要吧blocked设置为false,即非阻塞式,实际上它就会调用get_nowait()方法,此时还需要设置一个超时时间,在这么长的时间内还没有取到队列元素,那就抛出Queue.Empty异常。
当一个队列为满的时候如果再用put放则会阻塞,所以这时候就需要吧blocked设置为false,即非阻塞式,实际上它就会调用put_nowait()方法,此时还需要设置一个超时时间,在这么长的时间内还没有放进去元素,那就抛出Queue.Full异常。
另外队列中常用的方法
-
队列的大小
Queue.qsize() 返回队列的大小 ,不过在 Mac OS 上没法运行。
实例
import multiprocessing queque = multiprocessing.Queue() #创建 队列 #如果在子进程 和主进程 之间 都压入了数据 那么在主进程 和 子进程 获取的就是 对方的数据 def fun(myque):# print(id(myque)) #获取当前的队列的存储地址 依然是拷贝了一份myque.put(['a','b','c']) #在子进程里面压入数据# print("子进程获取",myque.get())#获取队列里面的值 if __name__=='__main__':# print(id(queque))queque.put([1,2,3,4,5]) #将列表压入队列 如果主进程也压入了数据 那么在主进程取的就是在主进程压入的数据 而不是子进程的p = multiprocessing.Process(target=fun,args=(queque,))p.start()p.join()print("主进程获取",queque.get())#在主进程进行获取print("主进程获取",queque.get())#在主进程进行获取# print("主进程获取",queque.get(block=True, timeout=1))#在主进程进行获取
-
-
字典共享
-
导入
import multiprocess
-
概述
Manager是一个进程间高级通信的方法 支持Python的字典和列表的数据类型
-
创建字典
myDict = multiprocess.Manager().dict()
实例
import multiprocessing
-
def fun(mydict): # print(mylist) mydict['x'] = 'x' mydict['y'] = 'y' mydict['z'] = 'z'
if name=='main': # Manager是一种较为高级的多进程通信方式,它能支持Python支持的的任何数据结构。 mydict = multiprocessing.Manager().dict() p = multiprocessing.Process(target=fun,args=(mydict,)) p.start() p.join() print(mydict)
- 列表共享 + 导入 import multiprocess + 创建列表 myDict = multiprocess.Manager().list() 实例(字典与列表共享) ```pythonimport multiprocessing def fun(List):# print(mylist)List.append('x')List.append('y')List.append('z') if __name__=='__main__':# Manager是一种较为高级的多进程通信方式,它能支持Python支持的的任何数据结构。List = multiprocessing.Manager().list()p = multiprocessing.Process(target=fun,args=(List,))p.start()p.join()print(List)
-
注意
进程名.terminate() 强行终止子进程
-
deamon
在这里介绍一个属性,叫做deamon。每个进程程都可以单独设置它的属性,如果设置为True,当父进程结束后,子进程会自动被终止。
进程.daemon = True
设置在start()方法之前
import multiprocessing import time def fun():time.sleep(100) if __name__=='__main__':p = multiprocessing.Process(target=fun)p.daemon = Truep.start()print('over')
-
进程名.terminate() 强行终止子进程
import multiprocessing import time def fun():time.sleep(100) if __name__=='__main__':p = multiprocessing.Process(target=fun)p.start()p.terminate()p.join()print('over')
8、进程实现生产者消费者
生产者消费者模型描述:
生产者是指生产数据的任务,消费者是指消费数据的任务。
当生产者的生产能力远大于消费者的消费能力,生产者就需要等消费者消费完才能继续生产新的数据,同理,如果消费者的消费能力远大于生产者的生产能力,消费者就需要等生产者生产完数据才能继续消费,这种等待会造成效率的低下,为了解决这种问题就引入了生产者消费者模型。
生产者/消费者问题可以描述为:两个或者更多的进程(线程)共享同一个缓冲区,其中一个或多个进程(线程)作为“生产者”会不断地向缓冲区中添加数据,另一个或者多个进程(线程)作为“消费者”从缓冲区中取走数据。
-
代码
from multiprocessing import Process from multiprocessing import Queue import time def product(q):print("启动生产子进程……")for data in ["good", "nice", "cool", "handsome"]:time.sleep(2)print("生产出:%s"%data)# 将生产的数据写入队列q.put(data)print("结束生产子进程……") def t(q):print("启动消费子进程……")while 1:print("等待生产者生产数据")# 获取生产者生产的数据,如果队列中没有数据会阻塞,等待队列中有数据再获取value = q.get()print("消费者消费了%s数据"%(value))print("结束消费子进程……") if __name__ == "__main__":q = Queue() p1 = Process(target=product, args=(q,))p2 = Process(target=customer, args=(q,)) p1.start()p2.start() p1.join()# p2子进程里面是死循环,无法等待它的结束# p2.join()# 强制结束子进程p2.terminate() print("主进程结束")
9、案例(抓取斗图)
from multiprocessing import Process,Queue
from concurrent.futures import ThreadPoolExecutor
from lxml import etree
import time
import requests
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36"
}
def get_img_src(url, q):"""进程1: 负责提取页面中所有的img的下载地址将图片的下载地址通过队列. 传输给另一个进程进行下载"""
resp = requests.get(url, headers=headers)tree = etree.HTML(resp.text)srcs = tree.xpath("//li[@class='list-group-item']//img[@referrerpolicy='no-referrer']/@data-original")for src in srcs:q.put(src.strip())resp.close()
def download_img(q):"""进程2: 将图片的下载地址从队列中提取出来. 进行下载."""with ThreadPoolExecutor(20) as t:while 1:try:s = q.get(timeout=20)t.submit(donwload_one, s)except Exception as e:print(e)break
def donwload_one(s):# 单纯的下载功能resp = requests.get(s, headers=headers)file_name = s.split("/")[-1]# 请提前创建好img文件夹with open(f"img/{file_name}", mode="wb") as f:f.write(resp.content)print("一张图片下载完毕", file_name)resp.close()
if __name__ == '__main__':t1 = time.time()q = Queue() # 两个进程必须使用同一个队列. 否则数据传输不了p_list = []for i in range(1, 11):url = f"https://www.pkdoutu.com/photo/list/?page={i}"p = Process(target=get_img_src, args=(url, q))p_list.append(p)for p in p_list:p.start()p2 = Process(target=download_img, args=(q,))p2.start()for p in p_list:p.join()p2.join()print((time.time()-t1)/60)
# 0.49572664896647134