一、复习
# 线程# 线程是进程中的执行单位# 线程是CPU调度的最小单位# 线程之间资源共享## 线程的开启和关闭以及切换的时间开销远远小于进程# 线程本身可以在同一时间使用多个CPU # threading# 使用方法类似于multiprocess # python与线程# CPython解释器在解释代码中容易产生数据不安全的问题# GIL 全局解释器 锁的是线程
二、守护线程
例子1:
例子2:
例子3:
原因分析:
# 守护进程随着主进程代码的执行结束而结束 # 守护线程会在主线程结束之后等待其他子线程的结束才结束 # 原因解析: # 1.对主进程来说,运行完毕指的是主进程代码运行完毕 # 2.对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕# 1 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),# 然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束, # 2 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。# 因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,# 而线程必须保证非守护线程都运行完毕后才能结束。
例子4:进程
例子5:守护线程
三、线程锁
进程 多进程情况下一般不用加锁。除非是操作文件之类的东西
解决上述阻塞的办法:加锁
def func(lock):global nlock.acquire()temp=ntime.sleep(1)n=temp-1lock.release()
n=10
t_lst=[]
lock=Lock()
for i in range(10):t=Thread(target=func,args=(lock,))t.start()t_lst.append(t)
for t in t_lst:t.join()
print(n)
最终运行结果为:0
2、互斥锁-科学家吃面问题-死锁
只有一份面和一个叉子。科学家公平竞争,就会有一个人拿到叉子,一个人拿到面。这就是典型的死锁问题只有一份面和一个叉子。科学家公平竞争,就会有一个人拿到叉子,一个人拿到面。这就是典型的死锁问题
noodle_lock=Lock()
fork_lock=Lock()
def eat1(name):noodle_lock.acquire()print('%s拿到面条了'%name)fork_lock.acquire()print('%s拿到叉子了'%name)print('%s吃面'%name)fork_lock.release()noodle_lock.release()
def eat2(name):fork_lock.acquire()print('%s拿到面条了'%name)time.sleep(1)noodle_lock.acquire()print('%s拿到叉子了'%name)print('%s吃面'%name)noodle_lock.release()fork_lock.release()Thread(target=eat1,args=('alex',)).start()
Thread(target=eat2,args=('Egon',)).start()
Thread(target=eat1,args=('bossjin',)).start()
Thread(target=eat2,args=('nezha',)).start()
运行结果:
2、解决互斥锁问题-RLock介绍
4、解锁科学家吃面问题-递归锁
'''5 递归锁''' # 解决死锁问题,一个线程中可以acquire多次
noodle_lock=fork_lock=RLock() # 一个钥匙串上的两把钥匙
def eat1(name):noodle_lock.acquire()print('%s拿到面条了'%name)fork_lock.acquire()print('%s拿到叉子了'%name)print('%s吃面'%name)fork_lock.release()noodle_lock.release()
def eat2(name):fork_lock.acquire()print('%s拿到面条了'%name)time.sleep(1)noodle_lock.acquire()print('%s拿到叉子了'%name)print('%s吃面'%name)noodle_lock.release()fork_lock.release()Thread(target=eat1,args=('alex',)).start()
Thread(target=eat2,args=('Egon',)).start()
Thread(target=eat1,args=('bossjin',)).start()
Thread(target=eat2,args=('nezha',)).start()
运行结果:
四、信号量:控制多个线程同时访问同一段代码
from threading import Semaphore,Thread
# 控制有多个线程访问同一代码
import time
def func(sem,a,b):sem.acquire()time.sleep(1)print(a+b)sem.release()
sem=Semaphore(4)
for i in range(10):t=Thread(target=func,args=(sem,i,i+5))t.start()
运行结果:
四个数字,四个数字输出,最后输出剩余2个
五、事件
# 事件被创建的时候:False状态。导致wait()阻塞# True状态 wait 非阻塞# clear 设置状态为False# set 设置状态为True # 数据库 -文件夹 # 文件夹里有好多excel表格# 1.能够更方便的对数据进行增删查改# 2.安全访问的机制 # 连接数据库 # 检测数据库的可连接情况# 起两个线程 # 第一个线程:连接数据库# 等待一个信号 告诉我我们之间的网络是通的# 连接数据库 # 第二个线程:检测与数据库之间的网络是否连通# time.sleep(0,2)# 将事件的状态设置为True
def db_connect(e):count=0while count<3:e.wait(1) # 状态为False时,我只等待0.5s就结束if e.is_set():print('数据库连接成功')breakelse:count+=1print('第%s次连接失败'%count)# else:# raise TimeoutError('连接数据库超时')def check_web(e):time.sleep(random.randint(0,3))e.set()
e=Event()
Thread(target=db_connect,args=(e,)).start()
Thread(target=check_web,args=(e,)).start()
运行结果是随机的
6、条件
# 条件 # acquire release # 一个条件被创建之初 默认有一个False 状态 # False状态会影响wait会一直处于等待状态 # notify(int数据类型)造钥匙 # 想用wait必须先acquire下
condition相当于锁,但是用完不还
def func(con,i):con.acquire()con.wait() # 等钥匙print('在第%s个循环里'%i)con.release()
con=Condition()
for i in range(10):Thread(target=func,args=(con,i)).start()
while True:num=int(input('>>>'))con.acquire()con.notify(num)con.release() # 用完钥匙不会归还
运行结果:
7、定时器:定时开启线程
例子1:
def func():print('时间同步')
Timer(2,func).start() # 等待两秒开启线程,异步 非阻塞
print(1234)
例子2:
def func():print('时间同步')
while True:Timer(2,func).start()time.sleep(2)
运行结果:
每隔两秒输出一个时间同步。。
8、队列
队列的前提:
方法介绍:
队列类型一:
q=queue.Queue() q.get() q.put_nowait() # 队列满后会报错,进行异常处理 q.get() q.get_nowait() # 队列中无数据时会报错,进行异常处理 # 队列:先进先出 # 队列:线程安全的
队列类型2-栈:
q=queue.LifoQueue() # 栈:后进先出
q.put(1)
q.put(2)
print(q.get())
print(q.get())
运行结果:
队列类型3-优先级队列:
q=queue.PriorityQueue() # 优先级队列:先按值排序,再用ASCII排序
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))
q.put((1,'e'))
q.put((1,'d')) # 先出
print(q.get())
运行结果:
9、线程池
# 线程池 # submit() 异步提交 # map(func,*iterables,timeout=None) 取代for循环submit操作 # shutdown(wait=True) # 相当于进程池的pool.close()+pool.join()# wait=True,等待池内所有任务执行完毕回收资源后才继续 # wait=False,立即返回,并不会等待池内的任务执行完毕 # 但不管wait参数为何值,整个程序都会等到所有任务执行完毕 # submit和map必须在shutdown之前 # result 取得结果 # add_done_callback() 回调函数
例子1-submit应用:
from concurrent.futures import ThreadPoolExecutor
import time
'''1'''
def func(n):time.sleep(2)print(n)
tpool=ThreadPoolExecutor(max_workers=5) # 一般cpu个数乘5以内个线程
for i in range(10):tpool.submit(func,i)
tpool.shutdown() # 等价于close+join功能
print('主线程') # 异步
运行结果:
5个数5个数输出
例子2:
'''2 线程池'''
#若为进程池,则只需将ThreadPoolExecute替换为ProcessPoolExecute
def func(n):time.sleep(2)print(n)return n*n
tpool=ThreadPoolExecutor(max_workers=5) # 一般cpu个数乘5以内个线程
t_lst=[]
for i in range(8):t=tpool.submit(func,i)t_lst.append(t)
# tpool.shutdown() # 等价于close+join功能
print('主线程') # 异步
for t in t_lst:print('***',t.result()) # 一定按顺序打印
运行结果:
例子3-map
def func(n):time.sleep(2)print(n)return n*n
tpool=ThreadPoolExecutor(max_workers=5) # 一般cpu个数乘5以内个线程
tpool.map(func,range(8)) # 拿不到返回值
运行结果:
5个5个输出
例子4-callback使用
def call_back(m):print('结果是 %s '%m.result())
def func(n):time.sleep(2)print(n)return n*n
tpool=ThreadPoolExecutor(max_workers=5) # 一般cpu个数乘5以内个线程
for i in range(8):tpool.submit(func,i).add_done_callback(call_back)
运行结果:
参考自http://www.cnblogs.com/Eva-J/articles/8306047.html