16.Python 多进程和多线程

进程是应用程序正在执行的实体,当程序执行时,也就创建了一个主线程。进程在创建和执行需要占用一定的资源,如内存、文件、I/O设备等。

线程是CPU使用的基本单元,由主线程创建,并使用这个进程的资源,因此线程创建成本低,可以实现并发处理,充分利用CPU。

1. 使用进程

程序只是一堆静态的代码,而进程则是程序的运行过程。同一个程序执行两次,就是两个进程。一个进程就是一个正在运行的任务。对于单核CPU来说,同一时间只能处理一个任务,如果要实现多任务并发处理,可以多任务之间轮换执行。

multiprocessing 是多进程管理包,可以编写多进程和多线程。如编写多线程,使用multiprocessing.dummy即可,用法与multiprocessing基本相同。

常用组件和功能:

  1. 管理进程模块
    • Process:用于创建进程模块。
    • Pool:用于创建管理进程池。
    • Queue:用于进程通信,资源共享。
    • Value,Array:用于进程通信,资源共享。
    • Pipe:用于管理通信。
    • Manager:用于资源共享。
  2. 同步子进程模块
    • Condition:条件对象。
    • Event:事件通信。
    • Lock:进程锁。
    • RLock:递归锁。
    • Semaphore:进程信号量。

使用多线程往往是用来处理CPU密集型(如科学计算)的需求,如果是IO密集型(如文件读取、爬虫等),则可以使用多线程去处理。

Process 是 multiprocessing 的子类,也是multiprocessing 的核心模块,用来创建子进程。可以实现多进程的创建、启动、关闭等操作。在multiprocessing中,每一个进程都用一个Process类表示。

multiprocessing.Process(group=None,target=None,name=None,args=(),kwargs={})

参数说明如下:

  • group:线程组,目前还没有实现,参数值必须为None。
  • target:表示当前进程启动时要执行的调用对象,一般为可执行的方法或函数。
  • name:进程名称,相当于给当前进程取一个别名。
  • args:表示传递给target函数的位置参数,格式为元组。
  • kwargs:表示传递给target函数的关键字参数,格式为字典。

Process对象包含的实例方法如下:

  • is_alive():判断进程实例是否还在执行。
  • join([timeout]):阻塞进程执行,直到进程终止,或者等待一段时间,具体时间由timeout(可选参数)设置,单位为s。
  • start():启动进程实例。
  • run():如果没有设置target参数,调用start()方法时,将执行对象的run()方法。
  • terminate():不管任务是否完成,立即停止进程。

Process 对象的常用属性:

  • name:进程名称。
  • pid:进程ID,在进程被创造前返回None。
  • exitcode:进程的退出码,如果进程没有结束,那么返回None;如果进程被信号N终结,则返回- N。
  • authkey:进程的认证密钥,为一个字节串。
  • sentinel:当进程结束时变为ready状态,可用于同时等待多个事件,否则用join()更简单些。
  • daemon:将父进程设置为守护进程,当父进程结束时,子进程也结束。
# 新建一个test2.py文件,输入以下内容
from multiprocessing import Process
def foo(i):print('say hi',i)if __name__=='__main__':for i in range(10):p = Process(target=foo,args=(i,))p.start()
# 执行同时输出以下10行数据

在这里插入图片描述

# 新建一个test3.py文件,输入以下内容
import multiprocessing # 导入multiprocessing模块
import time # 导入time模块
def worker(): # 处理任务name = multiprocessing.current_process().name # 获取进程的名称print(name,'Starting')time.sleep(4) # 睡眠4sprint(name,'Exiting')def my_service(): # 处理任务name = multiprocessing.current_process().name # 获取进程的名称print(name,'Starting')time.sleep(5) # 睡眠5sprint(name,'Exiting')if __name__ == '__main__': # 主进程service = multiprocessing.Process( # 创建子进程1name = 'my_service',  # 修改进程名称target = my_service # 调用对象)worker_1 = multiprocessing.Process( # 创建子进程2name = 'worker 1', # 修改进程名称target = worker # 调用对象)worker_2 = multiprocessing.Process( # 创建子进程3,保持默认的进程名称target = worker # 调用对象)worker_1.start() # 启动进程1worker_2.start() # 启动进程2service.start() # 启动进程3

在这里插入图片描述

自定义进程:简单的任务,直接使用multiprocessing.Process实现多进程,而对于复杂的任务,通常自定义Process类,扩展Process功能。

# 新建一个test4.py文件,输入以下内容
from multiprocessing import Process # 导入 Process 类
import time,os # 导入time和os模块
class MyProcess(Process): # 自定义进程类,继承自Processdef __init__(self,name): # 重写初始化函数super().__init__() # 调用父类的初始化函数self.name = name  # 重写name属性值def run(self): # 重写 run方法print('%s is running'%self.name,os.getppid())  # 打印子进程信息,os.getppid()获取父进程IDtime.sleep(3)print('%s is done'%self.name,os.getpid()) # 打印子进程信息,os.getpid()获取子进程(当前进程)ID
if __name__ == '__main__':p = MyProcess('子进程1') # 创建子进程p.start() # 执行进程print('主进程',os.getppid()) # 打印主进程ID

在这里插入图片描述

管道Pipe 可以创建管道,用来在两个进程间进行通信,两个进程分别位于管道的两端。

Pipe([duplex])
# (conn1,conn2) = Pipe()

该方法返回两个链接对象(conn1,conn2) 元组,代表管道的两端。参数duplex为可选,默认值为True。

  • 如果duplex为True,那么是双工模式,即conn1和conn2均可收发消息。
  • 如果duplex为False,conn1只负责接收消息,conn2只负责发送消息。

实例化的Pipe对象拥有connection的方法,5种常用的方法如下:

  • send(obj):发送数据。
  • recv():接收数据。如果没有消息可接收,recv()方法一直阻塞。如果管道已经被关闭,那么recv()方法抛出EOFError错误。
  • poll([timeout]):查看缓冲区是否有数据,可设置时间。如果timeout为None,则无限超时。
  • send_bytes([buffer[,offset[,size]]):发送二进制字节数据。
  • recv_bytes([maxlength]):接收二进制字节数据。
from multiprocessing import Process,Pipe # 导入 Process和Pipe
a,b = Pipe(True) # 如果改 a,b = Pipe(False)
a.send('Hi,b') # 发送数据
print(b.recv()) # 输出 Hi,b# 一个可发送消息,另一个可以接受消息
# 新建一个test5.py文件,输入以下内容
from multiprocessing import Process,Pipe # 导入 Process和Pipe
def send(pipe): # send传输一个列表pipe.send(['spam']+[42,'egg'])pipe.close()
if __name__ == '__main__':(conn1,conn2) = Pipe() # 创建两个Pipe实例sender = Process(target=send,args=(conn1,)) # args一定是实例化后的Pipe变量,不能写args=(Pipe(),)sender.start() # Process 类启动进程print('conn2 got:%s'%conn2.recv()) # 管道的另一端conn2从send收到消息conn2.close() # 关闭管道

在这里插入图片描述

# 管道可以同时发送和接收消息
# 新建一个test6.py文件,输入以下内容
from multiprocessing import Process,Pipe # 导入 Process和Pipe
def talk(pipe):pipe.send(dict(name='Bob',spam=42)) # 传入一个字典reply = pipe.recv() # 接收传输的数据print('talker got:',reply)if __name__ == '__main__':(parentEnd,childEnd) = Pipe() # 创建两个Pipe()实例child = Process(target=talk,args=(childEnd,)) # 创建一个Process进程,名称为childchild.start() # 启动进程print('parent got:',parentEnd.recv()) # parentEnd是一个Pip()管道,可以接受child Process进程传输的数据parentEnd.send({x*2 for x in 'spam'}) # 使用send方法来传输数据child.join()print('parent exit')

在这里插入图片描述

队列Queue 可以创建队列,实现在多个进程间通信。Queue 是multiprocessing的子类

Queue([maxsize])

Queue 实例对象的常用方法:

  • empty():判断队列是否为空,空返回True,否则返回False。
  • full():判断队列是否已满,满返回True,否则返回False
  • put(obj[,block[,timeout]]):写入数据。
  • get([block[,timeout]]):读取数据。
  • put_nowait():直接写入数据。
  • get_nowait():直接读取数据。
  • close():关闭队列。
  • qsize():返回队列的大小。
from multiprocessing import Queue
q = Queue() # 创建一个队列对象
# 使用put方法往队列里面放值
q.put(1) # 添加数字1
q.put(2) # 添加数字2
q.put(3) # 添加数字3
# 使用get方法从队列取值。先进先出,后进后出原则
print(q.get())  # 1
print(q.get()) # 2
print(q.get()) # 3
print(q.full()) # False
print(q.empty()) # True

get()方法将从队列取值,并且把队列内被取出来的值删掉。如果get()没有参数情况下就是默认一直等着取值,就算队列里面没有可取的值,程序也不会结束,就会卡在那里一直等待。

# 新建一个test7.py文件,输入以下内容
from multiprocessing import Process,Queue # 导入 Process,Queue 类
def f(q,name,age): # 进程函数q.put([name,age])  # 添加数据if __name__ == '__main__':q = Queue() # 创建一个Queue对象p = Process(target=f,args=(q,'张三',18)) # 创建一个进程p.start() # 执行进程print(q.get()) # ['张三', 18]p.join() # 阻塞进程

进程池Pool 可以提供指定数量的进程供用户调用。

进程池对象 = Pool(进程池,初始函数,初始参数,最大任务数,上下文)

进程池对象常用方法:

  • apply():执行进程函数。
  • apply_async():异步执行进程函数。
  • map():迭代执行进程函数。
  • map_async():异步迭代执行进程函数。
  • close():关闭进程池。
  • terminal():结束工作进程。
  • join():阻塞主进程等待退出。
# 新建一个test8.py文件,输入以下内容
import time
from multiprocessing import Pool # 导入Pool 类def run(n): # 进程处理函数time.sleep(1) # 阻塞 1 sreturn n*n # 返回浮点数的平方
if __name__ == '__main__': # 主进程testFL = [1,2,3,4,5,6] # 待处理的数列print('顺序执行') # 但进程s = time.time() # 计时开始for fn in testFL:run(fn)e1 = time.time() # 计时结束print('顺序执行时间:',int(e1-s)) # 计算所用时差print('并行执行') # 创建多进程,并行执行pool = Pool(6) # 创建6个进程数量的进程池r1 = pool.map(run,testFL) #  并发执行运算pool.close() # 关闭进程池,不再接受新的进程pool.join() # 主进程阻塞等待子进程的退出e2 = time.time() # 计时结束print('并行执行时间:',int(e2-e1)) # 计算所用时差print(r1)  # 打印计算结果

在这里插入图片描述

进程锁:当多个进程使用同一资源时,容易引发数据安全或顺序混乱问题,这时可以考虑为进程加锁,使进程产生同步,确保数据的一致性。使用Lock可以创建锁。

lock = multiprocessing.Lock() # 创建锁
lock.acquire() # 获取锁
lock.release() # 释放锁# 新建一个test9.py文件,输入以下内容
import os,time,random
from multiprocessing import Process,Lock,set_start_method
def work(lock,n):lock.acquire()print('%s:%s is runing'%(n,os.getpid()))time.sleep(random.random())print('%s:%s is down'%(n,os.getpid()))lock.release()
if __name__ == '__main__':set_start_method('fork')lock = Lock()for i in range(3): # 利用for循环模拟多线程p = Process(target=work,args=(lock,i))p.start()# 使用加锁形式实现了顺序执行,保证了数据的安全,类似于数据库的事务。

在这里插入图片描述

2.使用线程

进程是执行的应用程序,而线程是进程的执行序列,一个进程可以有多个线程,线程(Thread)也叫轻量级进程,多线程类似于同时执行多个不同的程序。

多线程运行优点如下:

  • 进程之间不能共享内存,但线程之间可以共享内存。
  • 操作系统在创建进程时,需要为该进程重新分配系统资源,但创建线程的代价则小的多因此使用多线程实现多任务并发执行比使用多进程的效率高。

多线程编程的优势如下:

  • 使用线程可以把占据长时间程序的任务放到后台处理。
  • 用户界面可以更加吸引人,如用户点击一个按钮去触发某些事件的处理,可以弹出一个进度条来显示处理的进度。
  • 程序的运行速度可能加快。

在一些等待的任务实现上如用户输入、文件读写和网络收敛数据等,线程的优势较明显。

使用Thread 构造器可以创建线程,Thread 是threading模块最核心的类,每个Threa对象代表一个线程,每个线程可以独立处理不同的任务。

Thread(group=None,target=None,name=None,args=(),kwargs={})
# 新建一个test11.py文件,输入以下内容
import time 
import threading 
def f(n):print('线程运算',n)time.sleep(1)if __name__ == '__main__':a = time.time()for i in range(5):# f(i+1) # 单线程运算:5st = threading.Thread(target=f,args=(i+1,)) # 多线程运算:不到1st.start()b = time.time()print('花费实际:%2f'%(b-a))

在这里插入图片描述

自定义线程:继承 threading.Thread 自定义线程类,其本质是重构Thread类中的run方法。

# 新建一个test12.py文件,输入以下内容
# 自定义Threading
import time
import threading
class MyThread(threading.Thread): # 以继承的方式实现线程创建def __init__(self,n): # 重写初始化函数super(MyThread,self).__init__() # 重构run函数必须重写self.n = ndef run(self): # 重写run函数print('task',self.n)time.sleep(1)print('2s')time.sleep(1)print('1s')time.sleep(1)print('0s')
if __name__ == '__main__':t1 = MyThread('t1') # 实例化线程对象t2 = MyThread('t2')t1.start() # 执行线程t2.start()

线程锁:在多线程中,所有变量都由所有线程共享,任何一个变量都可以被任何一个线程修改。因此,线程之间共享数据同时改变一个变量时会把内容改乱。为了确保一个线程在修改变量时,别的线程一定不能改该变量,这就是锁。

Lock 对象有两个基本方法:

  • acquire():可以阻塞或非阻塞地获取锁。
  • release():释放一个锁。

线程锁的优点:确保某段关键代码只能由一个线程从头到尾完整地执行。

线程锁的缺点如下:

  • 阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率降低了。
  • 由于存在多个锁,不同线程持有不同的锁,可能回造成死锁,导致多个线程全部挂起,既不能执行,也无法结束,只能靠操作系统强制终止。
# 新建一个test13.py文件,输入以下内容
import time
import threading
t = 0
lock = threading.Lock() # 创建Lock对象
def run_thread(n): # 线程处理函数global t  # 声明全局变量for i in range(1000000): # 无数次重复操作,对变量执行先存后取相同的值lock.acquire() # 获取锁try:t = t + nt = t - nfinally:lock.release() # 释放锁
t1 = threading.Thread(target=run_thread,args=(5,)) # 创建线程
t2 = threading.Thread(target=run_thread,args=(8,))
t1.start() # 开始执行线程
t2.start()
t1.join() # 阻塞线程
t2.join()
print(t) # 0

递归锁RLock 允许在同一线程中多次调用acqire(),不回阻塞程序,这种锁称为递归锁。acquire和release必须成对出现,即调用了n次acquire()方法,就必须调用n次release()方法,才能真正释放所占用的锁。

# 新建一个test13.py文件,输入以下内容
import time
import threading
t = 0
rlock = threading.RLock() # 创建Lock对象
def run_thread(n): # 线程处理函数global t  # 声明全局变量for i in range(1000000): # 无数次重复操作,对变量执行先存后取相同的值rlock.acquire() # 获取锁rlock.acquire() # 获取锁try:t = t + nt = t - nfinally:rlock.release() # 释放锁rlock.release() # 释放锁
t1 = threading.Thread(target=run_thread,args=(5,)) # 创建线程
t2 = threading.Thread(target=run_thread,args=(8,))
t1.start() # 开始执行线程
t2.start()
t1.join() # 阻塞线程
t2.join()
print(t) # 0

条件对象:允许一个或多个线程在被其它线程通知之前处于等待中。Condition 时 threading 模块的一个子类,用于维护多线程之间的同步协作。内部使用的也是Lock或Rlock,同时增加了等待池功能。常见方法如下:

  • acquire():请求底层锁。
  • release():释放底层锁。
  • wait():等待直到被通知或发送超时。
  • wait_for():等待,直接条件计算为真。
  • notify(n=1):默认唤醒一个等待这个条件的线程。
  • notify_all():唤醒所有正在等待这个条件的线程。
import time
import threading
class Test1(threading.Thread):def __init__(self,name,cond):super().__init__()self.name = nameself.cond = conddef run(self):with self.cond:print(self.name,':1')time.sleep(1)print(self.name,':3')class Test2(threading.Thread):def __init__(self,name,cond):super().__init__()self.name = nameself.cond = conddef run(self):with self.cond:print(self.name,':2')time.sleep(1)print(self.name,':4')cond = threading.Condition()
a = Test1('A',cond)
b = Test2('B',cond)
a.start()
b.start()

事件通信:事件用于主线程控制其他线程的执行,事件是一个简单的线程同步对象,其主要提供以下几个方法:

  • is_set():当且仅当内部标志为True时返回True。
  • set():将内部标志设置为True。所有正在等待这个事件的线程将被唤醒。当标志为True时,调用wait()方法的线程不回被阻塞。
  • clear():将内部标志设置为False。
  • wait():等待设置标志。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/167183.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【JavaEE初阶】认识线程、创建线程

1. 认识线程(Thread) 1.1 概念 1) 线程是什么 一个线程就是一个 "执行流". 每个线程之间都可以按照顺序执行自己的代码. 多个线程之间 "同时" 执行着多份代码. 举例: 还是回到我们之前的银⾏的例⼦中。之前我们主要描…

Python开源项目之人工智能老照片修复算法学习

文章目录 前言项目环境搭建conda虚拟环境创建激活环境Pytorch安装Synchronized-BatchNorm-PyTorch repository安装Global目录Synchronized-BatchNorm-PyTorch项目部署检测预处理模型下载下载脸部增强模型文件下载依赖完整部署后项目结构 项目使用验证一下总结关于Python技术储备…

比较2个点的3种结构在不规则平面上的占比

2 2 2 1 2 2 2 2 2 1 2 2 2 2 2 1 2 2 3 3 3 x 3 3 2 2 2 1 2 2 2 2 2 1 2 2 在平面上有一个点x,再增加一个点,11的操作把平面分成了3部分2a1,2a2,2a3,3部分的比值是 2a1 2a2 2a3 5 25 …

IDC最新报告,增速减缓+AI增势,阿里云视频云中国市场第一

国际权威数据公司IDC发布 《中国视频云市场跟踪(2023 H1)》报告 自2018年至今,阿里云持续保持 中国视频云整体市场第一 整体市场占比达24.4% 01 第一之外,低谷之上 近期,国际权威数据公司IDC最新发布了《中国视频…

做亚马逊多久可以赚钱?做亚马逊需要多少资金?——站斧浏览器

做亚马逊需要时间、资金和全面的市场策略。创业者需要有耐心和决心,同时也要灵活应对市场变化。那么做亚马逊多久可以赚钱,做亚马逊需要多少资金。 做亚马逊多久可以赚钱 首先,就像任何其他生意一样,做亚马逊需要时间和努力来建立起稳定的客…

怎么给图片加水印?

怎么给图片加水印?当代年轻人现在越来越爱在社交平台中发表自己拍下来的一些趣事和美照,但是同样的也会有人盗取他人图片的方式来发布在自己的社交平台中,而且没有水印的照片一旦在网上流传开来以后,很难追溯到它的来源&#xff0…

【MySQL】你知道索引查找起来为什么效率特别高吗?

索引 前言正式开始磁盘、os、MySQL之间的IOMySQL与存储扇区结论磁盘随机访问(Random Access)与连续访问(Sequential Access)MySQL 与磁盘交互基本单位小总结简单介绍一下内存池 谈回MySQL简单理解MySQL中的page为何IO交互基本单位是pagepage结构页目录单个page的页目录多个page…

井盖位移传感器生产厂家推荐,时刻感知井盖

马路上的井盖虽然看似微不足道,但实际上对于行人的“脚下安全”起着至关重要的作用。这些井盖下连接着供排水、燃气、电力、供热、通信等功能的管路和线路,是城市生命线运行的重要保障。因此保持井盖状态正常、明确管理责任是确保车辆和行人安全通行的重…

CART算法解密:从原理到Python实现

本文深入探讨了CART(分类与回归树)算法的核心原理、实现方法以及应用场景。文章首先介绍了决策树的基础知识,然后详细解析了CART算法的工作机制,包括特征选择和树的构建。接着,通过Python和PyTorch的实例代码展示了CAR…

livox 半固体激光雷达 gazebo 仿真 | 更换仿真中mid360雷达外形

livox 半固体激光雷达 gazebo 仿真 | 更换仿真中mid360雷达外形 livox 半固体激光雷达 gazebo 仿真 | 更换仿真中mid360雷达外形livox 介绍更换仿真中mid360雷达外形 livox 半固体激光雷达 gazebo 仿真 | 更换仿真中mid360雷达外形 livox 介绍 览沃科技有限公司(L…

双11后观察:中国电商产业带的数字新叙事

在电商平台走过的第十五个双11后,产业带的数字化蓝图也更加完整。但在电商平台与产业带相互补足的背景下,一个更值得思考的问题是,随着电商平台的低价竞争愈演愈烈,产业带上的供应链能力能否跟上? 作者|思杭 编辑|皮…

分享5款经过时间验证的精品软件

​ 今天来给大家推荐5款良心软件,每款都是经过时间检验的精品,用起来让你的工作效率提升飞快,各个都让你觉得相见恨晚! 1.文件夹隐藏工具——文件夹隐藏精灵 ​ 文件夹隐藏精灵是一款可以隐藏你的文件夹和文件的工具,它可以让你的隐私和重要…

Aop面向切面实现开发日志收集打印一文轻松搞定,内附详细图文示例+源码自取

目录 介绍 动态代理 jdk动态代理 cglib动态代理 注解实现Aop 添加必须依赖 添加Atm类 (主业务逻辑代码块) 定义打印log方法(提取公共代码逻辑块) 启用代理 切点表达式 Aop通知类型 前置通知(Before) 后置通知(After) 正常结束通知(AfterReturning) 异常结束通知…

树莓派上使用Nginx通过内网穿透实现无公网IP访问内网本地站点

前言 安装 Nginx(发音为“engine-x”)可以将您的树莓派变成一个强大的 Web 服务器,可以用于托管网站或 Web 应用程序。相比其他 Web 服务器,Nginx 的内存占用率非常低,可以在树莓派等资源受限的设备上运行。同时结合c…

如何让大模型更好地完成知识图谱推理?

​ 论文标题: Making Large Language Models Perform Better in Knowledge Graph Completion 论文链接: https://arxiv.org/abs/2310.06671 代码链接:GitHub - zjukg/KoPA: [Paper][Preprint 2023] Making Large Language Models Perform Be…

node-red - 节点实战总结1

node-red - 节点实战总结1 二、功能2.1 循环(for\while) 三、网络四、序列五、解析六、存储七、协议7.1 modbus协议7.2 opcua 八、formats8.1 时间格式化与时区转换 二、功能 2.1 循环(for\while) 安装节点node-red-contrib-loop-processing,该节点支持三种方式的循环&#xf…

【SpringBoot】 This application has no explicit mapping for 解决方法

This application has no explicit mapping for 解决方法 This application has no explicit mapping for 解决方法一、背景二、原因三、解决方案方式一:方式二: 四、解决 This application has no explicit mapping for 解决方法 一、背景 在SpringBo…

奥特曼不是第一次被开除!离职YC系“被创始人要求离开”

明敏 西风 发自 凹非寺 量子位 | 公众号 QbitAI 钮祜禄奥特曼,竟然不是第一次被“扫地出门”??! 没想到,OpenAI闹剧刚稍微消停了一点,“前传”马上来了。 《华盛顿邮报》从知情人士处获悉,奥…

java编程:使用递归 循环和位运算实现将10进制转为2进制

1 递归 /*** 递归&#xff1a;十进制转二进制* param decimal 待转换的十进制数* param binary 转换后的二进制数*/public static void decimalToBinaryByRecursion(int decimal,StringBuilder binary){if(decimal < 0){return;}decimalToBinaryByRecursion(decimal/2,bina…

3D卷积的理解

卷积核不仅需要在高宽这两个维度上进行滑动&#xff0c;还需要在时间维度上进行滑动