python进阶多进程(进程池,进程间通信),,python 多进程
python进阶多进程(进程池,进程间通信),,python 多进程
python 多进程
程序:是一个指令的集合
进程,正在执行的程序
编写完的代码,没有运行时,称为程序,正在运行的代码,称为进程
– 程序是死的(静态的),进程是活的(动态的)
串行: 每个任务进入CPU中执行,执行一部分秒切下一个任务,处理完一个才能处理下一个,依次执行并行:是指多个线程同时执行(多核CPU),微观上是同时的;并发:一段时间内宏观上有多个序在同时运行,微观上这些程序都是交替执行的,(单核CPU)
多进程multiprocessing
多进程中, 每个进程中所有数据(包括全局变量) 都各有拥有?份, 互不影响
multiprocessing模块提供了?个Process类来创建?个进程对象
from multiprocessing import Processdef run(name): print("子进程运行中,name = %s"%(name))if __name__ == "__main__": #在if __name__ == 'main': 下的代码只有在文件作为程序直接执行才会被执行,而import到其他程序中是不会被执行的 print("父进程启动") p = Process(target=run, args=('test',))#target表示调用对象,args表示调用对象的位置参数元组#(注意:元组中只有一个元素时结尾要加,)print("子进程将要执行")p.start()print(p.name) #p.pidp.join()print("子进程结束")
每次调用模块都会执行依次模块中全部内容,当你P对象建立的时候,会重新开辟一个新空间,导入主进程,会执行主进程当前模块所有内容,如果没有__nane__的就是递归调用模块,出错,有if__mian__的时候,子进程就不会运行
if __name__ == "__main__":
一个py文件有两种使用方法,一直接作为程序执行,二是import到其他的py程序中被调用执行
因此if __name__ == "__main__": 的作用就是控制这两种情况执行代码的过程 __name__是内置变量,用于当前模块的名字,所以if __name__ == "__main__":下的代码只有在文件作为程序直接执行才会执行,而import到其他文件不会执行。
在 Windows 上,子进程会自动 import 启动它的这个文件,而在 import 的时候是会执行这些语句的。
如果不加if __name__ == "__main__":的话就会无限递归创建子进程
Process类常?属性:
?Process(target , name , args)
?参数介绍
–target表示调用对象,即子进程要执行的任务 一般是函数名
–args表示调用对象的位置参数元组,args=(1,)
–name为子进程的名称
全局变量在多个进程中不共享:进程之间的数据是独立的,默认情况下互不影响
from multiprocessing import Processnum = 1def run1(): global num num += 5 print("子进程1运行中,num = %d"%(num))def run2(): global num num += 10 print("子进程2运行中,num = %d"%(num))if __name__ == "__main__": print("父进程启动") p1 = Process(target=run1) p2 = Process(target=run2) print("子进程将要执行") p1.start() p2.start() p1.join() p2.join() print("子进程结束")结果 父进程启动子进程将要执行子进程1运行中,num = 6子进程2运行中,num = 11子进程结束
以上结果,如果不加join主程序会在子程序运行前结束,程序运行,你会发现,全局变量按理说应该变化,但全局变量在多个进程中不共享:进程之间的数据是独立的,默认情况下互不影响的
类创建进程
创建新的进程还能够使?类的?式, 可以?定义?个类, 继承Process类, 每次实
例化这个类的时候, 就等同于实例化?个进程对象
import multiprocessingimport timeclass ClockProcess(multiprocessing.Process): def run(self): n = 5 while n > 0: print(n) time.sleep(1) n -= 1if __name__ == '__main__': p = ClockProcess() p.start() p.join()
进程池:用来方便创建多个进程
?动的去创建进程的?作量巨?,此时就可以?到multiprocessing模块提供的Pool,初始化Pool时, 可以指定?个最?进程数, 当有新的请求提交到Pool中时,如果池还没有满, 那么就会创建?个新的进程?来执?该请求; 但如果池中的进程数已经达到指定的最?值, 那么该请求就会等待, 直到池中有进程结
束, 才会创建新的进程来执?
from multiprocessing import Poolimport random,timedef work(num): print(random.random()*num) time.sleep(2)if __name__ == '__main__': po = Pool(5) #定义一个进程池,最大进程数为5,如果不填写,默认CPU的核数 for i in range(10): 循环创建子进程 po.apply_async(work,(3,))#apply_async选择要调用的目标,每次循环会用空出来的子进程去调用目标 po.close()#进程池关闭之后不再接收新的请求 po.join()#等待po中所有子进程结束,必须放在close后面
multiprocessing.Pool常?函数解析:
–appy_async(func[, args[, kwds]]):使??阻塞?式调?func(并?执?,堵塞?式必须等待上?个进程退出才能执?下?个进程) , args为
传递给func的参数列表, kwds为传递给func的关键字参数列表;
–apply(func[, args[, kwds]])(了解即可) 使?阻塞?式调?func
–close(): 关闭Pool, 使其不再接受新的任务;
join():主进程阻塞, 等待?进程的退出, 必须在close或terminate之后使?;
进程间通信-Queue
?多进程之间,默认是不共享数据的
?通过Queue(队列Q)可以实现进程间的数据传递
?Q本身是一个消息队列
?如何添加消息(入队操作):
from multiprocessing import Queueq = Queue(3) #初始化一个Queue对象,最多可接受3条消息q.put(“消息1”) #添加的消息数据类型不限q.put("消息2")q.put("消息3")print(q.full)
可以使?multiprocessing模块的Queue实现多进程之间的数据传递
?初始化Queue()对象时(例如: q=Queue()) , 若括号中没有指定最?可接收的消息数量, 或数量为负值, 那么就代表可接受的消息数量没有上限
Queue.qsize(): 返回当前队列包含的消息数量
Queue.empty(): 如果队列为空, 返回True, 反之False
Queue.full(): 如果队列满了, 返回True,反之False
Queue.get([block[, timeout]]): 获取队列中的?条消息, 然后将其从列队
中移除, block默认值为True
如果block使?默认值, 且没有设置timeout(单位秒) , 消息列队如果为空, 此时程序将被阻塞(停在读取状态) , 直到从消息列队读到消息为?
如果设置了timeout, 则会等待timeout秒, 若还没读取到任何消息, 则抛出"Queue.Empty"异常
如果block值为False,消息列队如果为空, 则会?刻抛出“Queue.Empty”异常
from multiprocessing import Queue, Processimport timedef write(q): for value in ["a","b","c"]: print("开始写入:",value) q.put(value) time.sleep(1)def read(q): while True: if not q.empty(): print("读取到的是",q.get()) time.sleep(1) else: breakif __name__ == "__main__": q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) pw.start() pw.join()#等待接收完毕 pr.start() pr.join() print("接收完毕!")
进程池创建的进程之间的通信
进程池创建的进程之间通信:如果要使?Pool创建进程, 就需要使?multiprocessing.Manager()中的Queue() ?不是multiprocessing.Queue()
否则会得到?条如下的错误信息:RuntimeError: Queue objects should only be shared between processes through inheritance.
from multiprocessing import Manager,Poolimport timedef writer(q): for i in "we": print("开始写入",i) q.put(i)def reader(q): time.sleep(3) for i in range(q.qsize()): print("得到消息",q.get())if __name__ == "__main__": print("主进程启动") q = Manager().Queue() po = Pool(3) for i in range(5): po.apply_async(writer,(q,)) po.apply_async(reader,(q,)) po.close() po.join()结果:主进程启动开始写入 w开始写入 e开始写入 w开始写入 e开始写入 w开始写入 e得到消息 w得到消息 e得到消息 w得到消息 e得到消息 w得到消息 e #先按照池子规定的容量3,执行三次,然后执行完再往池子注入再执行后边的开始写入 w开始写入 e得到消息 w得到消息 e开始写入 w开始写入 e得到消息 w得到消息 e
python进阶多进程(进程池,进程间通信)
评论关闭