Python学习_线程Thread学习 GIL锁 队列queue 线程池

进程和线程的目的:提高执行效率

IO操作利用极少量CPU
IO密集型(不用CPU):多线程
计算密集型(用CPU):多进程

1、单进程单线程,主进程,主线程
2、自定义线程:
主进程:
主线程
子线程

进程:
优点:同时利用多个cpu,能够同时进行多个操作
缺点:耗费资源(重新开辟内存空间)

线程:
优点:共享内存,执行开销小,IO操作的时候,创造并发操作
缺点:抢占资源,不利于资源的管理和保护

进程不是越多越好,cpu个数=进程个数
线程也不是越多越好,具体案例具体分析,请求上下文切换耗时

计算机中执行任务的最小单元:线程

一.使用线程
1.创建线程,参考threading_demo1
t = threading.Thread(target=func, args=(arg,))
t.start()
2.主线程是否等待子线程,默认False
t.setDaemon(True/False)
3.主线程等待,子线程执行
join()
join(2) # 最多等待2s
4.线程池
python内部没有提供,需要自定义

二.Queue,生产者消费者
Queue:
最大个数
get,等
get_nowait,不等

三.使用进程
1.创建进程(windows环境下此操作必须在main下面执行)
p = multiprocessing.Process(target=func, args=(arg,))
p.start()
2.daemon
False,等待
True,不等待
3.join([timeout ]) 跟线程一样,主线程等待,子线程执行
4.数据共享 参考: Multiprocessing2 和 Multiprocessing3
m = Manager()
dic = m.dict()
5.进程池
p = Pool(5)
p.apply() 每一个任务是排队进行的, apply()是阻塞的 里面有 进程.join()
p.apply_async() 每一个任务都并发执行, 可以设置回调函数, apply_async()是异步非阻塞的 里面有 daemon = True

1.GIL是什么?
GIL全称Global Interpreter Lock,即全局解释器锁。 作用就是,限制多线程同时执行,保证同一时间内只有一个线程在执行。
GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。
python 与 python解释器是两个概念,切不可混为一谈,也就是说,GIL只存在于使用C语言编写的解释器CPython中。
通俗地说,就是如果你不用Python官方推荐的CPython解释器,而使用其他语言编写的Python解释器(比如 JPython: 运行在Java上的解释器,
直接把python代码编译成Java字节码执行 ),就不会有GIL问题。然而因为CPython是大部分环境下默认的Python执行环境。
所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。
所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL。

2.GIL有什么作用?
为了更有效的利用多核处理器的性能,就出现了多线程的编程方式,而随之带来的就是线程间数据的一致性和状态同步的完整性。
python为了利用多核,开始支持多线程,但线程是非独立的,所以同一进程里线程是数据共享,当各个线程访问数据资源时会出现竞状态,
即数据可能会同时被多个线程占用,造成数据混乱,这就是线程的不安全。而解决多线程之间数据完整性和状态同步最简单的方式就是加锁。
GIL能限制多线程同时执行,保证同一时间内只有一个线程在执行。

3.GIL有什么影响?
GIL无疑就是一把全局排他锁。毫无疑问全局锁的存在会对多线程的效率有不小影响。甚至就几乎等于Python是个单线程的程序。

4.如何避免GIL带来的影响?
方法一:用进程+协程 代替 多线程的方式
在多进程中,由于每个进程都是独立的存在,所以每个进程内的线程都拥有独立的GIL锁,互不影响。
但是,由于进程之间是独立的存在,所以进程间通信就需要通过队列的方式来实现。

方法二:更换解释器
像JPython和IronPython这样的解析器由于实现语言的特性,他们不需要GIL的帮助。
然而由于用了Java/C#用于解析器实现,他们也失去了利用社区众多C语言模块有用特性的机会。所以这些解析器也因此一直都比较小众。


首先让我们了解一下并发和并行的概念:什么是并发什么是并行,他们的区别是什么?
    举个简单的例子:
    你吃饭吃到一半,电话来了,你一直到吃完了以后才去接,这就说明你不支持并发也不支持并行。    
    你吃饭吃到一半,电话来了,你停了下来接了电话,接完后电话以后继续吃饭,这说明你支持并发。
    你吃饭吃到一半,电话来了,你一边打电话一边吃饭,这说明你支持并行。

创建Thread线程的两种方式:

第一种 直接创建threading.Thread类的对象:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time# 多线程模块threading 提供了一个比thread模块更高层的API来提供线程的并发性。这些线程并发运行并共享内存。
# 该模块在较低级别thread模块之上构建更高级别的线程接口
# 方法一:直接创建threading.Thread类的对象,初始化时将可调用对象作为参数传入。
# 方法二:通过继承Thread类,重写它的run方法。
def func(name):time.sleep(2)print("{0} hello ! ".format(name))# 方法一
if __name__ == '__main__':# 创建线程t1 = threading.Thread(target=func, args=(111,))# setDaemon() :主线程是否等待子线程# 设置此线程是否被主线程守护回收。默认False不回收,需要在start()方法前调用;# 设为True相当于像主线程中注册守护,主线程结束时会将其一并回收
    t1.setDaemon(True)t1.start()# join([timeout]):主线程等待子线程执行完毕【如果设置时间10,则最多等待10s】# 等待至线程中止 或者 抛出异常 或者 超时发生里面的参数是可选的,代表线程运行的最大时间,即如果超过这个时间,# 不管这个此线程有没有执行完毕都会被回收,然后主线程或函数都会接着执行的。print("aaa")t1.join()print("bbb")t2 = threading.Thread(target=func, args=(222,))t2.setDaemon(True)t2.start()# t2.join()
t3 = threading.Thread(target=func, args=(333,))t3.setDaemon(True)t3.start()# t3.join()print("main end")

 

第二种 通过继承Thread类,重写它的run方法。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import timecount = 0# 方法二:通过继承Thread类,重写它的run方法。
class Counter(threading.Thread):def __init__(self, lock, threadName):"""@summary: 初始化对象。@param lock: 琐对象。@param threadName: 线程名称。"""super(Counter, self).__init__(name=threadName)# 注意:一定要显式的调用父类的初始化函数。self.lock = lockdef run(self):"""@summary: 重写父类run方法,在线程启动后执行该方法内的代码。"""global countself.lock.acquire()for i in range(10000):count = count + 1self.lock.release()lock = threading.Lock()
for i in range(5):Counter(lock, "thread-" + str(i)).start()
time.sleep(2)
# 确保线程都执行完毕
print(count)

 

事件Event:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading# Event 用于主线程控制其他线程的执行【RLock()控制单个线程,Event控制多个线程,可类比红绿灯机制】
# 1.Event().wait() 插入在进程中插入一个标记(flag) 默认为 False 当flag为False时 程序会停止运行 进入阻塞状态
# 2.Event().set() 使flag为True  然后程序会停止阻塞 进入运行状态
# 3.Event().clear() 使flag为False  然后程序会停止运行 进入阻塞状态
# 4.Event().is_set() 判断flag  是否为True  是的话 返回True  不是 返回False
def do_thing(event, thread_name):print(thread_name, "start")# 开始阻塞,类似红灯,所有车辆(线程)停止进入等待状态
    event.wait()print(thread_name, "execute")event_obj = threading.Event()for i in range(10):t = threading.Thread(target=do_thing, args=(event_obj, "thread_" + str(i)))t.start()event_obj.clear() # 让灯变红
inp = input("是否使其他线程进入运行状态:")
if inp == "true":event_obj.set()  # 让灯变绿 (将 False 设置为 True)

 

 

队列Queue:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import queue
import threading
import time# 在一个FIFO(First In,First Out)队列中,先加先取。
# 在一个LIFO(Last In First Out)的队列中,最后加的先出来(操作起来跟stack一样)。
# priority队列,有序保存,优先级最低的先出来。# 构造一个FIFO队列,maxsize可以限制队列的大小。如果队列的大小达到了队列的上限,就会加锁,加入就会阻塞,直到队列的内容被消费掉。
# maxsize的值小于等于0,那么队列的尺寸就是无限制的
q = queue.Queue(maxsize=10)# lock = threading.RLock()def producer(name):while True:# Queue.put(self, block=True, timeout=None) 往队列里放数据。如果满了的话,blocking = False 直接报 Full异常。# 如果blocking = True,就是等一会,timeout必须为 0 或正数。# None为一直等下去,0为不等,正数n为等待n秒还不能存入,报Full异常。
        q.put(name)print("producer当前队列个数:%d" % q.qsize())def customer(name):while True:# Queue.get(self, block=True, timeout=None) 从队列里取数据。如果为空的话,blocking = False 直接报 empty异常。# 如果blocking = True,就是等一会,timeout必须为 0 或正数。# None为一直等下去,0为不等,正数n为等待n秒还不能读取,报empty异常。print(q.get())for i in range(1, 13):t = threading.Thread(target=producer, args=("put_thread_%d" % i,))t.start()for i in range(1, 11):t = threading.Thread(target=customer, args=("get_thread_%d" % i,))t.start()

 

全局解释器锁GIL:

#!/usr/bin/env python
# -*- coding:utf-8 -*-import threading
import timeglobal_num = 0
# 获得锁
lock = threading.RLock()# GIL全局解释器锁 只能锁住一个线程,多个线程锁定需要用到 threading 的Event对象
# GIL限制多线程同时执行,保证同一时间内只有一个线程在执行。
def func(thread_name):global global_numlock.acquire()  # 增加锁global_num += 1time.sleep(1)print("{0} set global to {1}".format(thread_name, global_num))lock.release()  # 释放锁for i in range(10):t = threading.Thread(target=func, args=("thread-" + str(i),))t.start()

 

 

简单版本线程池:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import queue
import threading
import time# 简单版本线程池
class ThreadPool(object):def __init__(self, max_num=20):# 创建一个最大长度为20的队列self.queue = queue.Queue(max_num)# 往队列添加20个元素(将Thread类传进去)for i in range(max_num):self.queue.put(threading.Thread)def get_thread(self):return self.queue.get()def add_thread(self):self.queue.put(threading.Thread)def func(pool, name):time.sleep(1)print(name, "hello!")# 每个子线程结束后,都给线程池队列增加一个threading.Thread类,队列每增加一个,下面for循环就可以取出来一个,get()取完3个又继续等待
    pool.add_thread()# 创建一个大小为3的线程池队列
p = ThreadPool(3)
# ret = p.get_thread()
# ret = threading.Thread
for i in range(100):# 获取Thread类,队列个数最大为3,最多可以循环取3个线程【当3个Thread类取出来后,队列为空,队列的get()方法默认会等待】thread = p.get_thread()# 使用Thread类创建线程并执行,将线程池队列加入到子线程里面去t = thread(target=func, args=(p, str(i)))t.start()# t.join()

 

复杂版本线程池

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import queue
import threading
import time
import contextlibStopEvent = object()# 高级版线程池
class ThreadPool(object):def __init__(self, max_num):# 队列大小无限制self.q = queue.Queue()# 最多创建的线程数(线程池最大容量)self.max_num = max_num# 任务执行完毕,线程终止标识self.cancel = False# 强行终止所有线程标识self.terminal = False# 真实创建的线程列表self.generate_list = []# 空闲线程数量self.free_list = []def run(self, func, args, callback=None):"""线程池执行一个任务:param func: 任务函数:param args: 任务函数所需的参数:param callback: 任务执行失败或成功后执行的回调函数:return: 如果线程池已经终止,则返回True否则None"""if self.cancel:return# 将任务封装到元组中并加入到队列中w = (func, args, callback,)self.q.put(w)# 如果没有空闲线程,并且已创建的线程数必须小于线程池的大小限制if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:self.generate_thread()def generate_thread(self):"""创建一个线程"""t = threading.Thread(target=self.call)t.start()def call(self):"""循环去获取任务函数并执行任务函数"""# threading.currentThread 获取当前线程current_thread = threading.currentThread()self.generate_list.append(current_thread)# 从队列中取任务并执行event = self.q.get()# 判断是否是停止标识(是否是元组)while event != StopEvent:# 获取到任务包并解析func, arguments, callback = event# print("args", arguments)# 默认任务执行状态是成功(True)status = Truetry:# 执行任务result = func(*arguments)except Exception as e:result = e  # 或者 status = Nonestatus = False# 执行回调函数if callback is not None:try:callback(result, status)except Exception as e:pass# 通过管理上下文优化下面被注释部分的代码
            with self.worker_state(self.free_list, current_thread):if self.terminal:event = StopEventelse:event = self.q.get()"""# terminal默认False,不终止线程if self.terminal:event = StopEventelse:# 执行任务后(不管成功与否,将线程设置为空闲)self.free_list.append(current_thread)# 阻塞等待获取下一个任务event = self.q.get()# 获取到新任务后,将线程从空闲状态移除,继续循环self.free_list.remove(current_thread)"""else:# 如果从队列获取到的不是元组,则表示不是任务,从generate_list里面删除线程
            self.generate_list.remove(current_thread)# 线程生命周期结束,等待系统回收,死掉了。。。
@contextlib.contextmanagerdef worker_state(self, state_list, worker_thread):"""用来记录线程池中正在等待的线程"""state_list.append(worker_thread)try:yieldfinally:state_list.remove(worker_thread)def close(self):"""执行完所有的任务后,所有的线程终止"""print("xxx", self.q.qsize())self.cancel = True# 获取创建的所有线程数量,加入对应数量的终止标识current_thread_size = len(self.generate_list)while current_thread_size:print("current_thread_size", current_thread_size)self.q.put(StopEvent)current_thread_size -= 1def terminate(self):"""强行终止所有线程, 并清空队列"""# 当任务还有的时候,self.terminal = True 使正在q.get()阻塞中的线程 获取到的下一个 event = StopEvent 来终止线程self.terminal = True# 清空队列
        self.q.queue.clear()# 当任务全部执行完后,正在q.get()阻塞中的线程因为无法获取下一个 event = StopEvent 而无法终止,手动增加StopEventmax_num = len(self.generate_list)while max_num:self.q.put(StopEvent)max_num -= 1# 清空队列, empty()方法无法清空队列, 这里存在问题,如果清空太快,线程无法拿到StopEvent# 解决办法1:在加入StopEvent 前,提前清空# 解决办法2:while 的条件判断修改为 self.generate_list,如下注释代码# while self.generate_list:#     self.q.put(StopEvent)# print("qsize() ", self.q.qsize())# print(self.q.queue.clear())# print("qsize() ", self.q.qsize())# __init__()
pool = ThreadPool(10)# 回调函数
def callback():pass# 任务函数
def action(i):time.sleep(0.1)print("___", i)# 任务数量50个
for i in range(50):# 将任务放在队列中,着手开始处理任务# 1.创建线程(有空闲线程则不再创建,不能高于线程池的限制,根据任务个数判断)# 2.线程去队列中取任务,# 3.执行任务
    pool.run(action, (i,), callback)# 变量名在命名时要注意,应避免和python的函数名、关键字冲突,否则报 callable 错误
time.sleep(2)
print(len(pool.generate_list), len(pool.free_list))
pool.close()  # 当任务全部执行完后,终止所有在q.get()阻塞等待的线程
# pool.terminate()
# print(len(pool.generate_list), len(pool.free_list))

 

转载于:https://www.cnblogs.com/xiaojiulin/p/10572956.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/363387.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

html5表单与PHP交互

1、示例代码 前端&#xff1a; <!DOCTYPE html><html><head><meta charset"utf-8"> <title>html5表单与PHP交互</title></head><body><form action"http://localhost/jh.php" method"post"…

文件加密 1

一、 1、NTFS分区才能使用EFS加密&#xff1b; 2、我的电脑&#xff0d;&#xff0d;工具&#xff0d;&#xff0d;文件夹选项&#xff0d;&#xff0d;查看&#xff0d;&#xff0d;取消简单文件共享&#xff1b; 3、右键点击要加密的文件或文件夹&#xff0d;&#xff0d;属性…

Java中抽象类和接口之间的区别

一些受欢迎的访谈问题是“抽象类和接口之间有什么区别”&#xff0c;“什么时候使用抽象类以及什么时候使用接口”。 因此&#xff0c;在本文中&#xff0c;我们将讨论这个主题。 在探讨它们之间的差异之前&#xff0c;让我们先介绍一下它们。 抽象类 创建抽象类以捕获子类的…

【DP】【期望】$P1850$换教室

【DP】【期望】\(P1850\)换教室 链接 题目描述 有 \(2n\) 节课程安排在$ n$ 个时间段上。在第 \(i\)&#xff08;\(1 \leq i \leq n\)&#xff09;个时间段上&#xff0c;两节内容相同的课程同时在不同的地点进行&#xff0c;其中&#xff0c;牛牛预先被安排在教室 \(c_i\)上课…

封装cookie设置和获取的简易方法

(function() {var tool {expires: "expires", // 过期时间expirespath: "path", // 路径domain: "domain", // 域secure: "secure" // 安全设置 bool};//设置function setCookie(k, v, options) {if (!options) {document.cookie k…

高并发服务器逻辑处理瓶颈,如何解决?

https://mp.weixin.qq.com/s/GHHHvgURdZpNJ1Ec6RHgPg 高并发衡量指标 响应时间&#xff1a;系统对请求做出响应的时间&#xff0c;即一个http请求返回所用的时间&#xff1b;吞吐量&#xff1a;单位时间内处理的请求数量&#xff1b;QPS&#xff08;TPS&#xff09;&#xff1a…

Oracle学习笔记:blank_trimming的含义

blank_trimming 静态初始化参数控制 【字符串的尾随空格】是否自动截断&#xff01;以便【字符类型】的 【列】或【变量】之间在运算时不用考虑尾随空格的长度&#xff01;这样就和sql-92的标准兼容了 例子&#xff1a; DECLARE v_char1 VARCHAR2(2); v_char2 VARCHAR2(…

InterruptedException和中断线程的说明

如果没有将InterruptedException检查为异常&#xff0c;则可能甚至没人会注意到它-这实际上可以防止这些年来的几个错误。 但是由于必须对其进行处理&#xff0c;因此许多人不正确或不加考虑地处理它。 让我们以一个线程的简单示例为例&#xff0c;该线程定期进行一些清理&…

映射网络驱动器会自动断开的解决方法

映射的网络驱动器在一段时间自动断开&#xff0c;是由于服务器服务自动断开连接功能的默认超时期限造成的&#xff0c;我们可以通过以下两种方法来更改断开时间&#xff1a; 方法一&#xff1a;修改注册表编辑相应的键值来增加默认超时期限在注册表中找到下面的注册表项&#x…

一行上自动控制数据长度,并换行

有的在开发中&#xff0c;遇到传来的数据太长&#xff0c;渲染到页面上会超出可视页面&#xff0c;出现横向滚动条&#xff0c;想解决一个办法就是数据到一定程度换行。 div{word-wrap: break-word;word-break: break-all;width:90%; /*可以根据情况调整*/ } 更多专业前端知识…

springboot(十)-监控应用

微服务的特点决定了功能模块的部署是分布式的&#xff0c;大部分功能模块都是运行在不同的机器上&#xff0c;彼此通过服务调用进行交互&#xff0c;前后台的业务流会经过很多个微服务的处理和传递&#xff0c;出现了异常如何快速定位是哪个环节出现了问题&#xff1f; 在这种框…

【概率DP】$P2059$ 卡牌游戏

【概率DP】P2059 卡牌游戏 链接 题目描述 N个人坐成一圈玩游戏。一开始我们把所有玩家按顺时针从1到N编号。首先第一回合是玩家1作为庄家。每个回合庄家都会随机&#xff08;即按相等的概率&#xff09;从卡牌堆里选择一张卡片&#xff0c;假设卡片上的数字为X&#xff0c;则庄…

基于角色的访问控制'的权限管理的数据库的设计实现

RBAC基于角色的访问控制的权限管理系统数据库设计与实现 use [master] go -- 检查数据库 [RBAC]是否存在,如果存在则删除(只测试用,不然会丢数据.) -- Search from the sysdatabase to see that if the [RBAC] database exist. -- If exists then drop it else create it. if…

Thymeleaf –片段和angularjs路由器局部视图

百里香叶许多很酷的功能之一就是能够渲染模板片段–我发现这是与AngularJs一起使用的特别有用的功能。 可以将AngularJS $ routeProvider或AngularUI路由器配置为返回不同“路径”的部分视图&#xff0c;使用百里香叶返回这些部分视图确实效果很好。 考虑一个简单的CRUD流&am…

web3.js_1.x.x--API(一)event/Constant/deploy/options

/* 事件是使用EVM日志内置功能的方便工具&#xff0c;在DAPP的接口中&#xff0c;它可以反过来调用Javascript的监听事件的回调。事件在合约中可被继承。当被调用时&#xff0c;会触发参数存储到交易的日志中&#xff08;一种区块链上的特殊数据结构&#xff09;。 这些日志与合…

1022: 淘金(2017年中南大学研究生复试机试题 )

1022: 淘金 时间限制: 1 Sec 内存限制: 128 MB提交: 205 解决: 75[提交] [状态] [讨论版] [命题人:外部导入]题目描述 在一片n*m的土地上&#xff0c;每一块1*1的区域里都有一定数量的金子。这一天&#xff0c;你到这里来淘金&#xff0c;然而当地人告诉你&#xff0c;如果你…

CSS 定位 四种定位

absolute 生成绝对定位的元素&#xff0c;相对于static定位以外的第一个父元素进行定位。元素的位置通过“left”&#xff0c;“top”&#xff0c;“right”以及“bottom”属性进行定位。fixed 生成固定定位的元素&#xff0c;相对于浏览器窗口进行定位。元素的位置通过“left…

Apache CXF 3.0:CDI 1.1支持可替代Spring

在几周前刚刚发布Apache CXF 3.0时 &#xff0c;该项目又迈出了满足JAX-RS 2.0规范要求的又一个重要步骤&#xff1a;与CDI 1.1集成。 在此博客文章中&#xff0c;我们将看几个有关Apache CXF 3.0和Apache CXF 3.0如何协同工作的示例。 从3.0版开始&#xff0c; Apache CXF包含…

用堆来求中位数

维护一个大根堆和一个小根堆。使得大根堆堆顶&#xff08;最大的元素&#xff09;比小根堆堆顶&#xff08;最小的元素&#xff09;小&#xff0c;且两个堆的元素个数的差小于等于1。这样元素多的那个堆的堆顶就是已读入数的中位数。如果读入偶数个数&#xff0c;则中位数为两个…

JPA 2.1类型转换器–保留枚举的更好方法

可以使用JPA 2.0保留枚举&#xff0c;但是没有很好的方法来实现。 使用Enumerated批注&#xff0c;可以使用EnumType.ORDINAL或EnumType.STRING将枚举值映射到其数据库表示形式。 但是这两种选择都有一些缺点&#xff0c;我们将在本文的第一部分中进行讨论。 在第二部分中&…