进程和线程的目的:提高执行效率
IO操作利用极少量CPU
IO密集型(不用CPU):多线程
计算密集型(用CPU):多进程
1、单进程单线程,主进程,主线程
2、自定义线程:
主进程:
主线程
子线程
进程:
优点:同时利用多个cpu,能够同时进行多个操作
缺点:耗费资源(重新开辟内存空间)
线程:
优点:共享内存,执行开销小,IO操作的时候,创造并发操作
缺点:抢占资源,不利于资源的管理和保护
进程不是越多越好,cpu个数=进程个数
线程也不是越多越好,具体案例具体分析,请求上下文切换耗时
计算机中执行任务的最小单元:线程
一.使用线程
1.创建线程,参考threading_demo1
t = threading.Thread(target=func, args=(arg,))
t.start()
2.主线程是否等待子线程,默认False
t.setDaemon(True/False)
3.主线程等待,子线程执行
join()
join(2) # 最多等待2s
4.线程池
python内部没有提供,需要自定义
二.Queue,生产者消费者
Queue:
最大个数
get,等
get_nowait,不等
三.使用进程
1.创建进程(windows环境下此操作必须在main下面执行)
p = multiprocessing.Process(target=func, args=(arg,))
p.start()
2.daemon
False,等待
True,不等待
3.join([timeout ]) 跟线程一样,主线程等待,子线程执行
4.数据共享 参考: Multiprocessing2 和 Multiprocessing3
m = Manager()
dic = m.dict()
5.进程池
p = Pool(5)
p.apply() 每一个任务是排队进行的, apply()是阻塞的 里面有 进程.join()
p.apply_async() 每一个任务都并发执行, 可以设置回调函数, apply_async()是异步非阻塞的 里面有 daemon = True
1.GIL是什么?
GIL全称Global Interpreter Lock,即全局解释器锁。 作用就是,限制多线程同时执行,保证同一时间内只有一个线程在执行。
GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。
python 与 python解释器是两个概念,切不可混为一谈,也就是说,GIL只存在于使用C语言编写的解释器CPython中。
通俗地说,就是如果你不用Python官方推荐的CPython解释器,而使用其他语言编写的Python解释器(比如 JPython: 运行在Java上的解释器,
直接把python代码编译成Java字节码执行 ),就不会有GIL问题。然而因为CPython是大部分环境下默认的Python执行环境。
所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。
所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL。
2.GIL有什么作用?
为了更有效的利用多核处理器的性能,就出现了多线程的编程方式,而随之带来的就是线程间数据的一致性和状态同步的完整性。
python为了利用多核,开始支持多线程,但线程是非独立的,所以同一进程里线程是数据共享,当各个线程访问数据资源时会出现竞状态,
即数据可能会同时被多个线程占用,造成数据混乱,这就是线程的不安全。而解决多线程之间数据完整性和状态同步最简单的方式就是加锁。
GIL能限制多线程同时执行,保证同一时间内只有一个线程在执行。
3.GIL有什么影响?
GIL无疑就是一把全局排他锁。毫无疑问全局锁的存在会对多线程的效率有不小影响。甚至就几乎等于Python是个单线程的程序。
4.如何避免GIL带来的影响?
方法一:用进程+协程 代替 多线程的方式
在多进程中,由于每个进程都是独立的存在,所以每个进程内的线程都拥有独立的GIL锁,互不影响。
但是,由于进程之间是独立的存在,所以进程间通信就需要通过队列的方式来实现。
方法二:更换解释器
像JPython和IronPython这样的解析器由于实现语言的特性,他们不需要GIL的帮助。
然而由于用了Java/C#用于解析器实现,他们也失去了利用社区众多C语言模块有用特性的机会。所以这些解析器也因此一直都比较小众。
首先让我们了解一下并发和并行的概念:什么是并发什么是并行,他们的区别是什么?
举个简单的例子:
你吃饭吃到一半,电话来了,你一直到吃完了以后才去接,这就说明你不支持并发也不支持并行。
你吃饭吃到一半,电话来了,你停了下来接了电话,接完后电话以后继续吃饭,这说明你支持并发。
你吃饭吃到一半,电话来了,你一边打电话一边吃饭,这说明你支持并行。
创建Thread线程的两种方式:
第一种 直接创建threading.Thread类的对象:
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading import time# 多线程模块threading 提供了一个比thread模块更高层的API来提供线程的并发性。这些线程并发运行并共享内存。 # 该模块在较低级别thread模块之上构建更高级别的线程接口 # 方法一:直接创建threading.Thread类的对象,初始化时将可调用对象作为参数传入。 # 方法二:通过继承Thread类,重写它的run方法。 def func(name):time.sleep(2)print("{0} hello ! ".format(name))# 方法一 if __name__ == '__main__':# 创建线程t1 = threading.Thread(target=func, args=(111,))# setDaemon() :主线程是否等待子线程# 设置此线程是否被主线程守护回收。默认False不回收,需要在start()方法前调用;# 设为True相当于像主线程中注册守护,主线程结束时会将其一并回收 t1.setDaemon(True)t1.start()# join([timeout]):主线程等待子线程执行完毕【如果设置时间10,则最多等待10s】# 等待至线程中止 或者 抛出异常 或者 超时发生里面的参数是可选的,代表线程运行的最大时间,即如果超过这个时间,# 不管这个此线程有没有执行完毕都会被回收,然后主线程或函数都会接着执行的。print("aaa")t1.join()print("bbb")t2 = threading.Thread(target=func, args=(222,))t2.setDaemon(True)t2.start()# t2.join() t3 = threading.Thread(target=func, args=(333,))t3.setDaemon(True)t3.start()# t3.join()print("main end")
第二种 通过继承Thread类,重写它的run方法。
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading import timecount = 0# 方法二:通过继承Thread类,重写它的run方法。 class Counter(threading.Thread):def __init__(self, lock, threadName):"""@summary: 初始化对象。@param lock: 琐对象。@param threadName: 线程名称。"""super(Counter, self).__init__(name=threadName)# 注意:一定要显式的调用父类的初始化函数。self.lock = lockdef run(self):"""@summary: 重写父类run方法,在线程启动后执行该方法内的代码。"""global countself.lock.acquire()for i in range(10000):count = count + 1self.lock.release()lock = threading.Lock() for i in range(5):Counter(lock, "thread-" + str(i)).start() time.sleep(2) # 确保线程都执行完毕 print(count)
事件Event:
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading# Event 用于主线程控制其他线程的执行【RLock()控制单个线程,Event控制多个线程,可类比红绿灯机制】 # 1.Event().wait() 插入在进程中插入一个标记(flag) 默认为 False 当flag为False时 程序会停止运行 进入阻塞状态 # 2.Event().set() 使flag为True 然后程序会停止阻塞 进入运行状态 # 3.Event().clear() 使flag为False 然后程序会停止运行 进入阻塞状态 # 4.Event().is_set() 判断flag 是否为True 是的话 返回True 不是 返回False def do_thing(event, thread_name):print(thread_name, "start")# 开始阻塞,类似红灯,所有车辆(线程)停止进入等待状态 event.wait()print(thread_name, "execute")event_obj = threading.Event()for i in range(10):t = threading.Thread(target=do_thing, args=(event_obj, "thread_" + str(i)))t.start()event_obj.clear() # 让灯变红 inp = input("是否使其他线程进入运行状态:") if inp == "true":event_obj.set() # 让灯变绿 (将 False 设置为 True)
队列Queue:
#!/usr/bin/env python # -*- coding:utf-8 -*- import queue import threading import time# 在一个FIFO(First In,First Out)队列中,先加先取。 # 在一个LIFO(Last In First Out)的队列中,最后加的先出来(操作起来跟stack一样)。 # priority队列,有序保存,优先级最低的先出来。# 构造一个FIFO队列,maxsize可以限制队列的大小。如果队列的大小达到了队列的上限,就会加锁,加入就会阻塞,直到队列的内容被消费掉。 # maxsize的值小于等于0,那么队列的尺寸就是无限制的 q = queue.Queue(maxsize=10)# lock = threading.RLock()def producer(name):while True:# Queue.put(self, block=True, timeout=None) 往队列里放数据。如果满了的话,blocking = False 直接报 Full异常。# 如果blocking = True,就是等一会,timeout必须为 0 或正数。# None为一直等下去,0为不等,正数n为等待n秒还不能存入,报Full异常。 q.put(name)print("producer当前队列个数:%d" % q.qsize())def customer(name):while True:# Queue.get(self, block=True, timeout=None) 从队列里取数据。如果为空的话,blocking = False 直接报 empty异常。# 如果blocking = True,就是等一会,timeout必须为 0 或正数。# None为一直等下去,0为不等,正数n为等待n秒还不能读取,报empty异常。print(q.get())for i in range(1, 13):t = threading.Thread(target=producer, args=("put_thread_%d" % i,))t.start()for i in range(1, 11):t = threading.Thread(target=customer, args=("get_thread_%d" % i,))t.start()
全局解释器锁GIL:
#!/usr/bin/env python # -*- coding:utf-8 -*-import threading import timeglobal_num = 0 # 获得锁 lock = threading.RLock()# GIL全局解释器锁 只能锁住一个线程,多个线程锁定需要用到 threading 的Event对象 # GIL限制多线程同时执行,保证同一时间内只有一个线程在执行。 def func(thread_name):global global_numlock.acquire() # 增加锁global_num += 1time.sleep(1)print("{0} set global to {1}".format(thread_name, global_num))lock.release() # 释放锁for i in range(10):t = threading.Thread(target=func, args=("thread-" + str(i),))t.start()
简单版本线程池:
#!/usr/bin/env python # -*- coding:utf-8 -*- import queue import threading import time# 简单版本线程池 class ThreadPool(object):def __init__(self, max_num=20):# 创建一个最大长度为20的队列self.queue = queue.Queue(max_num)# 往队列添加20个元素(将Thread类传进去)for i in range(max_num):self.queue.put(threading.Thread)def get_thread(self):return self.queue.get()def add_thread(self):self.queue.put(threading.Thread)def func(pool, name):time.sleep(1)print(name, "hello!")# 每个子线程结束后,都给线程池队列增加一个threading.Thread类,队列每增加一个,下面for循环就可以取出来一个,get()取完3个又继续等待 pool.add_thread()# 创建一个大小为3的线程池队列 p = ThreadPool(3) # ret = p.get_thread() # ret = threading.Thread for i in range(100):# 获取Thread类,队列个数最大为3,最多可以循环取3个线程【当3个Thread类取出来后,队列为空,队列的get()方法默认会等待】thread = p.get_thread()# 使用Thread类创建线程并执行,将线程池队列加入到子线程里面去t = thread(target=func, args=(p, str(i)))t.start()# t.join()
复杂版本线程池
#!/usr/bin/env python # -*- coding:utf-8 -*- import queue import threading import time import contextlibStopEvent = object()# 高级版线程池 class ThreadPool(object):def __init__(self, max_num):# 队列大小无限制self.q = queue.Queue()# 最多创建的线程数(线程池最大容量)self.max_num = max_num# 任务执行完毕,线程终止标识self.cancel = False# 强行终止所有线程标识self.terminal = False# 真实创建的线程列表self.generate_list = []# 空闲线程数量self.free_list = []def run(self, func, args, callback=None):"""线程池执行一个任务:param func: 任务函数:param args: 任务函数所需的参数:param callback: 任务执行失败或成功后执行的回调函数:return: 如果线程池已经终止,则返回True否则None"""if self.cancel:return# 将任务封装到元组中并加入到队列中w = (func, args, callback,)self.q.put(w)# 如果没有空闲线程,并且已创建的线程数必须小于线程池的大小限制if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:self.generate_thread()def generate_thread(self):"""创建一个线程"""t = threading.Thread(target=self.call)t.start()def call(self):"""循环去获取任务函数并执行任务函数"""# threading.currentThread 获取当前线程current_thread = threading.currentThread()self.generate_list.append(current_thread)# 从队列中取任务并执行event = self.q.get()# 判断是否是停止标识(是否是元组)while event != StopEvent:# 获取到任务包并解析func, arguments, callback = event# print("args", arguments)# 默认任务执行状态是成功(True)status = Truetry:# 执行任务result = func(*arguments)except Exception as e:result = e # 或者 status = Nonestatus = False# 执行回调函数if callback is not None:try:callback(result, status)except Exception as e:pass# 通过管理上下文优化下面被注释部分的代码 with self.worker_state(self.free_list, current_thread):if self.terminal:event = StopEventelse:event = self.q.get()"""# terminal默认False,不终止线程if self.terminal:event = StopEventelse:# 执行任务后(不管成功与否,将线程设置为空闲)self.free_list.append(current_thread)# 阻塞等待获取下一个任务event = self.q.get()# 获取到新任务后,将线程从空闲状态移除,继续循环self.free_list.remove(current_thread)"""else:# 如果从队列获取到的不是元组,则表示不是任务,从generate_list里面删除线程 self.generate_list.remove(current_thread)# 线程生命周期结束,等待系统回收,死掉了。。。 @contextlib.contextmanagerdef worker_state(self, state_list, worker_thread):"""用来记录线程池中正在等待的线程"""state_list.append(worker_thread)try:yieldfinally:state_list.remove(worker_thread)def close(self):"""执行完所有的任务后,所有的线程终止"""print("xxx", self.q.qsize())self.cancel = True# 获取创建的所有线程数量,加入对应数量的终止标识current_thread_size = len(self.generate_list)while current_thread_size:print("current_thread_size", current_thread_size)self.q.put(StopEvent)current_thread_size -= 1def terminate(self):"""强行终止所有线程, 并清空队列"""# 当任务还有的时候,self.terminal = True 使正在q.get()阻塞中的线程 获取到的下一个 event = StopEvent 来终止线程self.terminal = True# 清空队列 self.q.queue.clear()# 当任务全部执行完后,正在q.get()阻塞中的线程因为无法获取下一个 event = StopEvent 而无法终止,手动增加StopEventmax_num = len(self.generate_list)while max_num:self.q.put(StopEvent)max_num -= 1# 清空队列, empty()方法无法清空队列, 这里存在问题,如果清空太快,线程无法拿到StopEvent# 解决办法1:在加入StopEvent 前,提前清空# 解决办法2:while 的条件判断修改为 self.generate_list,如下注释代码# while self.generate_list:# self.q.put(StopEvent)# print("qsize() ", self.q.qsize())# print(self.q.queue.clear())# print("qsize() ", self.q.qsize())# __init__() pool = ThreadPool(10)# 回调函数 def callback():pass# 任务函数 def action(i):time.sleep(0.1)print("___", i)# 任务数量50个 for i in range(50):# 将任务放在队列中,着手开始处理任务# 1.创建线程(有空闲线程则不再创建,不能高于线程池的限制,根据任务个数判断)# 2.线程去队列中取任务,# 3.执行任务 pool.run(action, (i,), callback)# 变量名在命名时要注意,应避免和python的函数名、关键字冲突,否则报 callable 错误 time.sleep(2) print(len(pool.generate_list), len(pool.free_list)) pool.close() # 当任务全部执行完后,终止所有在q.get()阻塞等待的线程 # pool.terminate() # print(len(pool.generate_list), len(pool.free_list))