进程测试
import osimport time
while True : time. sleep( 0.5 ) print ( "hahaha" ) print ( "self" , os. getpid( ) ) print ( "parent" , os. getppid( ) )
互斥锁
from multiprocessing import Process, Lockimport random
import timedef task1 ( lock) : lock. acquire( ) lock. acquire( ) print ( "1my name is:bgon" ) time. sleep( random. randint( 1 , 2 ) ) print ( "1my age is:78" ) time. sleep( random. randint( 1 , 2 ) ) print ( "1my sex is:femal" ) lock. release( ) def task2 ( lock) : lock. acquire( ) print ( "2my name is:blex" ) time. sleep( random. randint( 1 , 2 ) ) print ( "2my age is:68" ) time. sleep( random. randint( 1 , 2 ) ) print ( "2my sex is:femal" ) lock. release( ) def task3 ( lock) : pass
if __name__ == '__main__' : lock = Lock( ) p1 = Process( target= task1, args= ( lock, ) ) p1. start( ) p2 = Process( target= task2, args= ( lock, ) ) p2. start( ) p3 = Process( target= task3, args= ( lock, ) ) p3. start( )
开启进程的两种方式
from multiprocessing import Process
import os
def task ( name) : print ( name) print ( "self" , os. getpid( ) ) print ( "parent" , os. getppid( ) ) print ( "task run" )
if __name__ == '__main__' : print ( "self" , os. getpid( ) ) print ( "parent" , os. getppid( ) ) p = Process( target= task, name= "这是子进程!" , kwargs= { "name" : "bgon" } ) p. start( )
进程间内存相互独立
from multiprocessing import Processimport time
a = 1000000000000 def task ( ) : global aprint ( id ( a) ) a = 0 print ( "子进程的" , a) if __name__ == '__main__' : print ( id ( a) ) p = Process( target= task) p. start( ) time. sleep( 1 ) print ( "自己的" , a)
互斥锁的使用场景_抢票
import json
from multiprocessing import Process, Lock
import time
import random"""
join和锁的区别
1.join中顺序是固定的 不公平
2.join是完全串行 而 锁可以使部分代码串行 其他代码还是并发 """
def check_ticket ( usr) : time. sleep( random. randint( 1 , 3 ) ) with open ( "ticket.json" , "r" , encoding= "utf-8" ) as f: dic = json. loads( f. read( ) ) print ( "%s查看 剩余票数:%s" % ( usr, dic. get( 'count' ) ) ) def buy_ticket ( usr) : with open ( "ticket.json" , "r" , encoding= "utf-8" ) as f: dic = json. load( f) if dic. get( 'count' ) > 0 : time. sleep( random. randint( 1 , 3 ) ) dic[ "count" ] -= 1 with open ( "ticket.json" , "w" , encoding= "utf-8" ) as f2: json. dump( dic, f2) print ( "%s 购票成功!" % usr) else : print ( '剩余票:' , dic. get( 'count' ) ) print ( 'Not Found of tiket!' ) def task ( usr, lock) : check_ticket( usr) lock. acquire( ) buy_ticket( usr) lock. release( ) if __name__ == '__main__' : lock = Lock( ) for i in range ( 10 ) : p = Process( target= task, args= ( "用户%s" % i, lock) ) p. start( )
父进程等待子进程结束
import time
from multiprocessing import Process
def task ( num) : print ( "我是%s号 进程" % num) print ( "=========" ) if __name__ == '__main__' : start_time = time. time( ) ps = [ ] for i in range ( 1 , 4 ) : p = Process( target= task, args= ( i, ) ) p. start( ) print ( "----{}" . format ( i) ) for p in ps: p. join( ) print ( time. time( ) - start_time) print ( "over" )
process 常用属性
from multiprocessing import Processimport timedef task ( ) : print ( "执行完毕!" ) if __name__ == '__main__' : p = Process( target= task, name= "alex" ) p. start( ) print ( p. name) p. terminate( ) print ( p. is_alive( ) )
死锁
"""死锁 指的是 锁 无法打开了 导致程序死卡首先要明确 一把锁 时不会锁死的正常开发时 一把锁足够使用 不要开多把锁""" from multiprocessing import Process, Lock
import time
def task1 ( l1, l2, i) : l1. acquire( ) print ( "盘子被%s抢走了" % i) time. sleep( 1 ) l2. acquire( ) print ( "筷子被%s抢走了" % i) print ( "吃饭.." ) l1. release( ) l2. release( ) pass def task2 ( l1, l2, i) : l2. acquire( ) print ( "筷子被%s抢走了" % i) l1. acquire( ) print ( "盘子被%s抢走了" % i) print ( "吃饭.." ) l1. release( ) l2. release( ) if __name__ == '__main__' : l1 = Lock( ) l2 = Lock( ) Process( target= task1, args= ( l1, l2, 1 ) ) . start( ) Process( target= task2, args= ( l1, l2, 2 ) ) . start( )
IPC 进程间通讯
"""IPC 进程间通讯由于进程之间内存是相互独立的 所以需要对应积极而方案 能够使得进程之间可以相互传递数据1.使用共享文件,多个进程同时读写同一个文件IO速度慢,传输数据大小不受限制2.管道 是基于内存的,速度快,但是是单向的 用起来麻烦(了解)3.申请共享内存空间,多个进程可以共享这个内存区域(重点)速度快但是 数据量不能太大
""" from multiprocessing import Manager, Process
from multiprocessing import Manager, Process, Lock
def work ( d) : d[ 'count' ] -= 1 if __name__ == '__main__' : with Manager( ) as m: dic= m. dict ( { 'count' : 100 } ) p_l= [ ] for i in range ( 100 ) : p= Process( target= work, args= ( dic, ) ) p_l. append( p) p. start( ) for p in p_l: p. join( ) print ( dic)
僵尸进程
import time
from multiprocessing import Process
def task1 ( ) : print ( "子进程 run" ) if __name__ == '__main__' : for i in range ( 10 ) : p = Process( target= task1) p. start( ) time. sleep( 100000 )
队列
"""队列 不只用于进程间通讯也是一种常见的数据容器其特点是:先进先出其优点是:可以保证数据不会错乱 即使在多进程下 因为其put和get默认都是阻塞的对比堆栈刚好相反 :后进先出#""" from multiprocessing import Queue
q = Queue( 1 )
q. put( "张三" )
print ( q. get( ) )
print ( q. get( timeout= 3 ) )
生产者消费者模型
"""什么是生产者 消费者 模型生产者 产生数据的一方消费者 处理数据的一方例如需要做一个爬虫1.爬取数据2.解析数据爬去和解析都是耗时操作,如果正常按照顺序来编写代码,将造成解析需要等待爬去 爬去取也需要等待解析这样效率是很低的要提高效率 就是一个原则 让生产者和消费解开耦合 自己干自己的如何实现:1.将两个任务分别分配给不同进程2.提供一个进程共享的数据容器"""
import random
from multiprocessing import Process, Queue
import time
def get_data ( q) : for num in range ( 5 ) : print ( "正在爬取第%s个数据" % num) time. sleep( random. randint( 1 , 2 ) ) print ( "第%s个数据 爬取完成" % num) q. put( "第%s个数据" % num) def parse_data ( q) : for num in range ( 5 ) : data = q. get( ) print ( "正在解析%s" % data) time. sleep( random. randint( 1 , 2 ) ) print ( "%s 解析完成" % data) if __name__ == '__main__' : q = Queue( 5 ) produce = Process( target= get_data, args= ( q, ) ) produce. start( ) customer = Process( target= parse_data, args= ( q, ) ) customer. start( )
总结
1. 并发编程让你的程序可以同时处理多个任务2. 并发的基石是 多道技术空间复用: 同一时间 内存存储了多个应用程序不同应用程序之间的内存是相互独立的时间复用: 当一个程序遇到了IO操作时 会切换到其他应用程序 , 以此来提高CPU的利用率多道技术的缺点: 当应用程序都是计算任务时 切换反而降低效率 ( 但是必须得切 才能保证多任务并发) 3. 并发 与 并行并发 多个事件 同时发生, 也称之为伪并行并行 多个事件 同时进行, 阻塞和非阻塞 指的是程序的状态就绪 运行 阻塞4. 两种使用进程的方式1. 直接创建Process对象 指定target参数2. 继承Process 覆盖run方法5 . join函数提高优先级 使 父进程等待子进程结束6. 孤儿进程与僵尸进程路径了解孤儿进程 是指 父进程已经终止了 但是自己还在运行 是无害的孤儿进程会自定过继给操作系统僵尸进程 是指 子进程执行完成所有任务 已经终止了但是 还残留一些信息( 进程id 进程名) 但是父进程 没有去处理这些残留信息 导致残留信息占用系统内存僵尸进程时有害的当出现大量的僵尸进程时 会占用系统资源 可以把它父进程杀掉 僵尸就成了孤儿 操作系统会负责回收数据