一.多进程
当计算机运行程序时,就会创建包含代码和状态的进程。这些进程会通过计算机的一个或多个CPU执行。不过,同一时刻每个CPU只会执行一个进程,然后不同进程间快速切换,给我们一种错觉,感觉好像多个程序在同时进行。例如:有一个大型工厂,该工厂负责生产电脑,工厂有很多的车间用来生产不同的电脑部件。每个车间又有很多工人互相合作共享资源来生产某个电脑部件。这里的工厂相当于一个爬虫工程,每个车间相当于一个进程,每个工人就相当于线程。线程是CPU调度的基本单元。
也就是进程间是独立的,这表现在内存空间,上下文环境;而线程运行在进程空间内.也就是同一进程产生的线程共享同一内存空间.
需要注意的是单核CPU系统中,真正的并发是不可能的.
1.顺序执行
2.多进程并发 注意除了时间的加速意外也要看看函数返回值的写法,带有多进程的map,是返回一个列表
import requests
import re
import time
from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool
def spyder(url):# res = []res = {'init:':'hello'}print('hahah:{}'.format(url))time.sleep(1)# res.append(url)res.update({'entr:'+url:url})return resdef use_process():urls = ["https://www.qiushibaike.com/text/page/{}/".format(str(i)) for i in range(0, 4)]start_1 = time.time()#获取函数返回结果res1 = []for url in urls:res_ = spyder(url)res1.append(res_)end_1 = time.time()print("单进程:", end_1 - start_1)print('res1:', res1)# 获取函数返回结果# 进程池start_2 = time.time()pool = Pool(processes=2)res2 = pool.map(spyder, urls)pool.close()pool.join()print('res2:', res2)end_2 = time.time()print("2进程:", end_2 - start_2)# 获取函数返回结果# 进程池start_3 = time.time()pool = Pool(processes=4)res3 = pool.map(spyder, urls)pool.close()pool.join()print('res2:', res3)end_3 = time.time()print("4进程:", end_3 - start_3)
if __name__ == "__main__":use_process()
二.多线程
实际上由于GIL(全局解释器锁)的限制,哪个线程想要执行代码就需要去申请锁,否则只能等着,所以这个锁阻碍了真正的多线程并发,这是解释器cpython的锅,一般不推荐用多线程,而是用多进程multiprocess来绕过GIL.
2.1 thread多线程
import time
import _thread
from threading import Thread
# 使用线程锁,防止线程死锁
mutex = _thread.allocate_lock()
def test(d_num):d_num.append(89)print("test: %s"% str(d_num))
def test1(d_num):print("test1: %s"% str(d_num))
def main():d_num = [100, 58]t1 = Thread(target=test, args=(d_num,))t2 = Thread(target=test1, args=(d_num,))t1.start()time.sleep(1)t2.start()time.sleep(1)if __name__ == '__main__':main()
2.2 多线程队列版
import time
import _thread
from threading import Thread
import queue
# 使用线程锁,防止线程死锁
mutex = _thread.allocate_lock()
frame_queue = queue.Queue()
def test(d_num):print("test: %s" % str(d_num))for i in range(d_num):frame_queue.put(i)def test1():while 1:if frame_queue.empty() != True:# 从队列中取出图片value = frame_queue.get()print('==value:', value)time.sleep(1)else:break
def main():d_num = 10t1 = Thread(target=test, args=(d_num,))t1.start()t2 = Thread(target=test1)t2.start()if __name__ == '__main__':main()
2.3 注意传参与多进程的区别,线程池
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_supportdef func(a, b):return a + bdef main():a_args = [1, 2, 3]second_arg = 1with Pool() as pool:L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])print('L:', L)M = pool.starmap(func, zip(a_args, repeat(second_arg)))print('M:', M)N = pool.map(partial(func, b=second_arg), a_args)print('N:', N)
main()
import requests
import re
import time
from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool
def spyder(url):# res = []res = {'init:':'hello'}print('hahah:{}'.format(url))time.sleep(1)# res.append(url)res.update({'entr:'+url:url})return resdef use_process():urls = ["https://www.qiushibaike.com/text/page/{}/".format(str(i)) for i in range(0, 4)]start_1 = time.time()#获取函数返回结果res1 = []for url in urls:res_ = spyder(url)res1.append(res_)end_1 = time.time()print("单进程:", end_1 - start_1)print('res1:', res1)# 获取函数返回结果# 进程池start_2 = time.time()pool = Pool(processes=2)res2 = pool.map(spyder, urls)pool.close()pool.join()print('res2:', res2)end_2 = time.time()print("2进程:", end_2 - start_2)# 获取函数返回结果# 进程池start_3 = time.time()pool = Pool(processes=4)res3 = pool.map(spyder, urls)pool.close()pool.join()print('res2:', res3)end_3 = time.time()print("4进程:", end_3 - start_3)def use_threadpool():urls = [["https://www.qiushibaike.com/text/page/{}/".format(str(i))] for i in range(0, 4)]print('urls:', urls)# 线程池start = time.time()pool = ThreadPool(processes=4)res = pool.starmap(spyder, urls)pool.close()pool.join()end = time.time()print('res:', res)print("4线程:", end - start)
if __name__ == "__main__":# use_process()use_threadpool()
实际应用将图片路径和名字传入,用zip方式打包传参
import osimport cv2
import time
import itertools
from multiprocessing.dummy import Pool as ThreadPoolSIZE = (75,75)
SAVE_DIRECTORY='thumbs'
def save_img(filename,save_path):save_path+= filename.split('/')[-1]im = cv2.imread(filename)im=cv2.resize(im,SIZE)cv2.imwrite(save_path,im)if __name__ == '__main__':path='./data/testlabel'print(path)output_path='./data/thumbs/'if not os.path.exists(output_path):os.mkdir(output_path)print(output_path)imgs_list_path=[os.path.join(path,i) for i in os.listdir(path)]print(len(imgs_list_path))start_time=time.time()pool = ThreadPool(processes=8)print(list(zip(imgs_list_path,[output_path]*len(imgs_list_path))))pool.starmap(save_img,zip(imgs_list_path,[output_path]*len(imgs_list_path)))pool.close()pool.join()end_time=time.time()print('use time=',end_time-start_time)
三.共享变量
import numpy as np
from multiprocessing import shared_memory
from multiprocessing import Process
from multiprocessing.managers import SharedMemoryManager
import time
import glob
import os
import shutilsmm = SharedMemoryManager()
smm.start()
screen_img = np.zeros((480, 640, 3), dtype=np.uint8)
share_screen_img = smm.SharedMemory(screen_img.nbytes)def change(name):a = np.array([0, 0, 0, 0, 0, 0])existing_shm = shared_memory.SharedMemory(name=name)b = np.ndarray((6, ), dtype=np.int64, buffer=existing_shm.buf)b[:] = a[:]print("changed")def show(name):while True:existing_shm = shared_memory.SharedMemory(name=name)show_img = np.ndarray((480, 640, 3),dtype=np.uint8,buffer=existing_shm.buf)print('show_img:', show_img.sum())time.sleep(0.30)if __name__ == '__main__':tmp = np.ndarray(screen_img.shape,dtype=screen_img.dtype,buffer=share_screen_img.buf)print('init tmp:', tmp.sum())# p1 = Process(target=change, args=(sm.name, ))p2 = Process(target=show, args=(share_screen_img.name, ))p2.start()time.sleep(2)tmp[:] = np.ones((480, 640, 3), dtype=np.uint8)# p1.join()# print('final b:', b)p2.join()