python语法基础-并发编程-进程-长期维护,,##########
python语法基础-并发编程-进程-长期维护,,##########
############### 进程的启动方式1 ##############
"""并发编程:进程1,运行中的程序,就是进程,程序是没有生命的实体,运行起来了就有生命了,操作系统可以管理进程,进程是操作系统基本的执行单元,2,每一个进程都有它自己的地址空间,进程之间是不会混的,比如qq不能访问微信的地址空间,操作系统替你隔离开了,这也是操作系统引入进程这个概念的原因,#######################################进程的调度1,先来先服务,有一个不好的,就是不利于短作业2,短作业优先算法,但其对长作业不利;不能保证紧迫性作业(进程)被及时处理;作业的长短只是被估算出来的。3,时间片轮转算法,就是轮流执行,已经很科学了,4,多级反馈队列算法,有多个队列,有一个新任务来了放入第一个队列,这是优先级加上时间片轮转,第二个任务来了放入下一级,#######################################并发和并行:进程的并行:这种只有在多核cpu才可以实现,进程的并发:这是轮流执行,由于速度很快,看起来像是一起执行的,比如一遍听音乐,一遍写代码,######################################进程的三状态转换图:非常重要1,进程一开始运行的时候,是就绪的状态,这是第一个状态,就是告诉cpu,我已经准备好可以运行了,进入排队了,2,时间片轮转,轮到你了之后,你就运行了,这是第二个状态,3,发生阻塞,这是第三个状态,比如你的程序让你输入内容,input方法, 这时候是阻塞的,你输入完毕了之后,就又畅通了,这是等待I/O完成,input,sleep,文件的输入和输出,事件处理之后,你还要进入就绪状态了,全部处理完了,就结束了,###########################################同步和异步1,同步,需要等待,需要排队,你什么也不能干,2,异步,不需要等待,你可以去做其他事情,###########################################阻塞和非阻塞1,阻塞,就是input,sleep这些,需要等待,这是阻塞,2,非阻塞,就是跳过这些阻塞,但是程序中不可避免的需要阻塞,因为需要等待内容处理,###########################################同步异步和阻塞非阻塞:同步阻塞,就是同步非阻塞异步阻塞异步非阻塞,效率更高,###############################################多进程有一个内置的模块:from multiprocessing import Process要学习几个地方:1,进程的注册2,进程的开启3,进程的异步,4,进程的同步,join,5,进程注册函数的参数,6,启动多个子进程,"""import timefrom multiprocessing import Processimport osdef f(args1,args2): print("*"*args1) time.sleep(1) print("*"*args2) # print("子进程号",os.getpid()) # print("子进程的父进程号",os.getppid()) # print(‘我是子进程‘)if __name__ == ‘__main__‘: # p = Process(target=f, args=(‘bob‘,)) # p = Process(target=f, args=(‘bob‘,"123")) # 注册,p是一个进程,还没有启动,这是主进程, # args=(‘bob‘,),如果注册的函数是有参数的,就要传递参数,如果有一个参数括号内要有一个逗号,因为这是一个元组, # p.start() # 启动进程,这是启动了一个子进程, # 现在子进程和主进程之间是异步的,如果我想在子进程结束之后再执行下面的代码,变成同步,怎么办? # p.join() # 这个join就是在感知一个子进程的一个结束,将异步改成同步, # time.sleep(1) # print("父进程号",os.getpid()) # print("父进程的父进程号",os.getppid()) # 这个就是pycharm的进程号, # print(‘执行主进程的内容了‘) # 这一句的执行和子进程的执行内容是异步的,不是同步的, # 开启10个子进程 p_list=[] for i in range(10): p = Process(target=f, args=(10*i, 20*i)) p_list.append(p) p.start() # print("第%d轮"%(i+1)) # p.join() [p.join() for p in p_list ] # 保证前面的10个进程全部结束了,才会执行下面的代码, print("运行完了") # 这种开启了多进程,可以读多个进程去存文件,取文件内容,# 进程的生命周期,# 主进程没有开启子进程,就是执行完他的代码就结束了了# 子进程也是执行完自己的代码就结束了,# 开启了子进程的主进程,主进程执行完了,要等待子进程结束之后,主进程才可以结束,
############### 进程的启动方式2 和 进程之间是数据隔离的 ##############
# 进程的启动方式2# 第一点,创建一个类,继承process# 第二点,类中必须实现run方法,这个run方法里面就是子进程要执行的内容,import osfrom multiprocessing import Processclass MyProcess(Process): # 继承导入的process, def __init__(self,name): # 为了进程能传递参数, super().__init__() # 这是继承了父类所有的参数, self.name=name def run(self): # print(os.getpid()) print("子进程号",self.pid) print("参数",self.name) # print(os.getpid()) 这两句是一样的,if __name__ == ‘__main__‘: # p1=MyProcess() # 这是不传参数的 p1=MyProcess("name1") # 这是传参数的,这就是面向对象的实例化, p2=MyProcess(‘name2‘) p3=MyProcess(‘name3‘) p1.start() #start会自动调用run p2.start() # p2.run() p3.start() # 三个进程之间是异步的, p1.join() p2.join() p3.join() # 三个进程都结束了才会执行下面的内容,这是把异步,变成异步, print(‘主线程‘)# 进程之间的数据隔离问题# 进程和进程之间的数据是否是隔离的,比如qq和微信,之间的数据是隔离的,# 几个进程之间,如果不通过特殊的手段,是不可能共享一个数据的,这个记住,没有什么可理解的,# 下面是一个例子,证明主进程和子进程之间是没有from multiprocessing import Processdef work(): global n # 声明了一个全局变量, n=0 print(‘子进程内: ‘,n)if __name__ == ‘__main__‘: n = 100 p=Process(target=work) p.start() print(‘主进程内: ‘,n)
############### 守护进程 ##############
# 守护进程# 子进程----守护进程# 第一版:主进程结束了,子进程还没有结束,# import time# from multiprocessing import Process## def func():# while True:# time.sleep(1)# print("我还活着")### if __name__ == ‘__main__‘:# p=Process(target=func)# p.start()# i = 0# while i<10:# time.sleep(1)# i+=1# print("主进程结束")# 守护进程,就是主进程代码结束了而结束,记住不是主进程彻底结束,而是代码结束,import timefrom multiprocessing import Processdef func(): while True: time.sleep(1) print("我还活着")if __name__ == ‘__main__‘: p=Process(target=func) p.daemon=True # 设置子进程为守护进程, p.start() p2=Process(target=func) # 这个子进程没有设置为守护进程所以这个进程还在进行中, p2.start() # time.sleep(3) # p2.is_alive() # 判断一个进程是否活着 # p2.terminate() # 结束一个进程, i = 0 while i<5: time.sleep(1) i+=1 print("主进程代码结束")
############### 进程锁 ##############
# 进程锁# 买票就是一个并发的过程,# 文件db的内容为:{"ticket":1}# 注意一定要用双引号,不然json无法识别# 并发运行,效率高,但竞争写同一文件,数据写入错乱# 买票不加锁,可能会有多个人买到票了。但是票只有一张,# from multiprocessing import Process,Lock# import time,json,random## # 查询余票的函数# def show(i):# with open("db") as f :# dic=json.load(f)# print(‘余票%s‘%dic[‘ticket‘])## # 买票的函数# def bug_ticket(i):# with open("db") as f :# dic=json.load(f)# time.sleep(0.1) # 模拟读数据的网络延迟# if dic[‘ticket‘] >0:# dic[‘ticket‘]-=1# print(‘\033[32m%s购票成功\033[0m‘%i)# else:# print(‘\033[31m%s没买到票\033[0m‘%i)# time.sleep(0.1)# with open("db","w") as f:# json.dump(dic,f)### if __name__ == ‘__main__‘:# for i in range(10): # 模拟并发10个客户端查询票# p=Process(target=show,args=(i,))# p.start()# for i in range(10):# p = Process(target=bug_ticket, args=(i,))# p.start()# 加锁from multiprocessing import Process,Lockimport time,json,random# 查询余票的函数def show(i): with open("db") as f : dic=json.load(f) print(‘余票%s‘%dic[‘ticket‘])# 买票的函数def bug_ticket(i,lock): lock.acquire() # 这就是拿钥匙进门,有一个进程拿了钥匙,之后第二个进程进来就没有钥匙了,就会阻塞, with open("db") as f : dic=json.load(f) time.sleep(0.1) # 模拟读数据的网络延迟 if dic[‘ticket‘] >0: dic[‘ticket‘]-=1 print(‘\033[32m%s购票成功\033[0m‘%i) else: print(‘\033[31m%s没买到票\033[0m‘%i) time.sleep(0.1) with open("db","w") as f: json.dump(dic,f) lock.release() # 这是还钥匙,还了之后,别的进程就可以拿到了,就可以执行了,if __name__ == ‘__main__‘: for i in range(10): # 模拟并发10个客户端查询票 p=Process(target=show,args=(i,)) p.start() lock=Lock() for i in range(10): p = Process(target=bug_ticket, args=(i,lock)) p.start()
############### 多进程的信号量 ##############
# 多进程的信号量from multiprocessing import Processimport time,randomfrom multiprocessing import Semaphore# ktv只有1个房间,1个房间只能装4个人,但是这样写就是20个人都进入到房间了,# 假设ket门口有4把钥匙,一个进程来了那一把钥匙,然后关门,这样只有4个进程能拿到,剩下的之后1个进程出来了才可以继续其他的进程,# 这个概念就叫做信号量,同一时间就只有四个人,def ktv(i,sem): sem.acquire() # 获取钥匙 print("%d进入ktv"%i) time.sleep(random.randint(60,180)) # 这是每一个人唱歌1-3分钟 print("%d走出ktv"%i) sem.release() # 还钥匙if __name__ == "__main__": sem = Semaphore(4) # 这就是设置有多少把钥匙, 信号量的英文就是:Semaphore for i in range(20): p=Process(target=ktv,args=(i,sem)) p.start()
############### 进程的事件 ##############
# 进程的事件# 事件import timefrom multiprocessing import Event, Process# 一个信号,可以使所有的进程都进入阻塞状态,也可以控制所有信号都解除阻塞,# 一个事件创建之后,默认是阻塞状态,# e = Event() # 创建一个事件# print(e.is_set()) # 查看一个事件是否是阻塞状态,# print(123445)# e.set() # 这是把阻塞的状态改为true,# print(e.is_set())# e.wait() # 根据e.is_set()的结果,如果是false,就会阻塞,如果是true就会不阻塞# print(12344)# e.clear() # 这是把阻塞的状态改为false# print(e.is_set())# e.wait() # 虽然阻塞了,但是一定要有这个wait,才会阻塞后面的代码,# print(444444)# 举一个例子,红绿灯# 每一个进程表示一辆车,def car(e,i): #e.is_set() 默认返回False 代表的是绿灯 if not e.is_set(): print("car%s在等待"%i) e.wait() print("car%s通行了"%i)def light(e): while True: if e.is_set(): e.clear() print(‘\033[31m红灯亮了\033[0m‘) else: e.set() print(‘\033[32m绿灯亮了\033[0m‘) time.sleep(2)if __name__ == ‘__main__‘: e=Event() # 模拟启动交通灯 p1=Process(target=light,args=(e,)) p1.daemon=True p1.start() #模拟20辆小车 for i in range(20): import random time.sleep(random.uniform(0,2)) p2=Process(target=car,args=(e,i)) p2.start() print("程序彻底结束!")
############### 进程间的通信---队列 ##############
# 进程间的通信,# 队列和通道,# 队列# 之前学过一个模块是queue,现在进程间的通信不能使用这个# import queue# 基本的队列的方法:# from multiprocessing import Queue# q =Queue(5) # 创建共享的进程队列,maxsize是队列中允许的最大项数,如果省略此参数,则无大小限制,容量是5# q.put(1) # 添加值,# q.put(2)# q.put(3)# q.put(4)# q.put(5)# q.put(6) # 队列满了,就不能放了,这个就会阻塞,# print(q.full()) # 这个队列是否满了,如果q已满,返回为True# print(q.get()) # 返回q中的一个项目。如果q为空,此方法将阻塞# print(q.get()) # 获取值,# print(q.get()) # 获取值,# print(q.get()) # 获取值,# print(q.get()) # 获取值,# print(q.get()) # 获取值,没有值了,第六次的时候就会阻塞,## print(q.empty()) # 如果调用此方法时 q为空,返回True## while True:# try:# q.get_nowait() # 如果有值就等# except:# print("队列已经空了")# import time# time.sleep(1)# # 队列之间的通信# from multiprocessing import Queue,Process## # 生产数据的函数# def produce(q):# q.put("hello")## def consume(q):# print(q.get())## if __name__ == ‘__main__‘:# q = Queue()# p = Process(target=produce,args=(q,))# p.start()# c = Process(target=consume,args=(q,))# c.start()## # 这就是两个子进程之间的通信,通过的队列,## 队列的生产者和消费者模型# 买包子的例子# 有蒸包子的人,这就是生产者,有买包子的人,这就是消费者,# 实际中,可能会有数据供需不平衡的问题,# 就是数据生产的多了没有消费,所以我们要增加消费者,或者减少生产# 数据消费的多了,我们要增加生产者,来解决这个问题,# 我们把生产者作为一个进程,把消费者作为一个进程from multiprocessing import Process,Queue,JoinableQueueimport time, randomdef producer(name,food,q): # 三个参数,就是谁生产,生产了什么,放到哪里 for i in range(10): time.sleep(random.randint(1,3)) # 1-3秒生产1个, f = "%s生产了%s%s"%(name,food,i) print(f) q.put(f) q.join() # 阻塞, 这是感知一个队列中的数据全部都处理完毕, # 这种相当于把生产的生命周期拉长了,就是说你是生产完了还没有结束,你还要等待消费者把你生产的所有的东西都消费了,才能结束,def consumer(q,name): while True: food =q.get() if food is None: # 问题:有多个消费者的时候,只有一个消费者拿到这个值,然后结束了,但是拿不到的消费者,就还没有结束, print("获取到一个空") break print("%s消费了%s"%(name,food)) time.sleep(random.randint(1,3)) # 1-3秒消费1个, q.task_done() # 队列的计数器 -1if __name__ == ‘__main__‘: # q = Queue() q = JoinableQueue() p1 = Process(target=producer,args=("andy","包子",q)) p1.start() p2 = Process(target=producer,args=("Lucy","油条",q)) p2.start() c1 = Process(target=consumer,args=(q,"xiaoxiao")) c1.daemon =True # 意味着,主进程的代码执行结束之后,子进程就结束了, # 而主进程又是依赖两个生产者结束才结束的, # 而我在生产者的地方加了一个阻塞,直到消费者全都消费了之后才结束, # 所以这个设计是非常的巧妙的, c1.start() # 只有一个消费者,两个生产者, 所以会有供给过大,需要加一个消费者, c2 = Process(target=consumer,args=(q,"meimei")) c2.daemon =True c2.start() # 因为只会生产10个,所以怎么能够,没有生产了,但是消费的地方还在get,怎么办? p1.join() p2.join() # q.put(None) # q.put(None) # 为什么是两个none? # 问题:有多个消费者的时候,只有一个消费者拿到这个值,然后结束了,但是拿不到的消费者,就还没有结束, # 这种写法太麻烦了,如果有1000个还得了,怎么办?使用新的一个模块:JoinableQueue # 做了三件事; # 1,把c1,c2,改成守护进程 # 2,把生产者加一个q.join(),直到消费者全部消费结束 # 3,加了一个 q.task_done() # 队列的计数器 -1
############### 进程间通信--管道(了解) ##############
# 进程间通信# 管道(管道只做了解)from multiprocessing import Pipe,Process# conn1,conn2 = Pipe() # pipe是一个函数,有两个返回值,这个地方我们是使用两个参数来接收这两个返回值,# conn1.send("123")# print(conn2.recv())# 这就是管道,这是一个双向通信的,# 你调用这个,就会给你一个左边,一个右边,你从左边传入,就可以从右边传出,# 你从右边传入,就可以从左边传出,# 怎么使用管道是的在进程间通信?# 也可以使用生产者和消费者模型import time,randomdef producter(con,pro,name,food): con.close() for i in range(4): time.sleep(random.random()) f = "%s生产了%s%s" % (name, food, i) print(f) pro.send(f)def consumer(con,pro,name): pro.close() while True: try: food = con.recv() print("%s消费了%s" % (name, food)) time.sleep(random.random()) except EOFError: con.close() breakif __name__ == ‘__main__‘: con, pro = Pipe() p= Process(target=producter,args=(con, pro,"andy","包子")) p.start() c1= Process(target=consumer,args=(con, pro,"li")) c1.start() c2= Process(target=consumer,args=(con, pro,"wang")) c2.start() con.close() pro.close()# pipe有一个数据不安全性,一个放一个取没有问题,# 但两个消费者的时候会有问题,会出现两个消费者抢资源的问题,# 怎么解决这个问题?通过加锁# 所以我们还是使用队列,队列就是基于管道加锁的,管道就是基于socket的,# 使用队列就不会有数据不安全的问题了,# 自己加锁需要考虑很多问题,所以我们还是使用队列,管道作为了解,# 我们工作中顶多就会用到队列,
############### 进程之间的数据共享 ##############
# 进程之间的数据共享# 通过Manager模块from multiprocessing import Manager,Process,Lock# 单个进程:# def work(dic):# dic[‘count‘]-=1# print(dic)## if __name__ == ‘__main__‘:# m = Manager()# dic=m.dict({‘count‘:100})# p_list=[]# p = Process(target=work, args=(dic,))# p_list.append(p)# p.start()# p.join()# print("主进程",dic) # 主进程 {‘count‘: 99},这就是子进程的改变,影响到了主进程# 多个进程,出现的问题:# def work(dic):# dic[‘count‘]-=1# # 这个就是涉及到多个进程修改一个内容的情况,这种是不安全的,怎么办?加锁# # 按理说是50,可能会结果不是50## if __name__ == ‘__main__‘:# m = Manager()# dic=m.dict({‘count‘:100})# p_list=[]# for i in range(50): # 创建多个进程# p = Process(target=work, args=(dic,))# p_list.append(p)# p.start()# for i in p_list:# p.join()# print("主进程",dic) # 主进程 {‘count‘: 99},这就是子进程的改变,影响到了主进程# 多个进程加锁:def work(dic,lock): lock.acquire() dic[‘count‘]-=1 # 这个就是涉及到多个进程修改一个内容的情况,这种是不安全的,怎么办?加锁 lock.release()if __name__ == ‘__main__‘: m = Manager() lock = Lock() dic=m.dict({‘count‘:100}) p_list=[] for i in range(50): # 创建多个进程 p = Process(target=work, args=(dic,lock)) p_list.append(p) p.start() for i in p_list: p.join() print("主进程",dic) # 主进程 {‘count‘: 99},这就是子进程的改变,影响到了主进程# 这个数据共享,工作中也不会用到,# 队列还有很多,kafak,rebbitmq,memcache
############### 进程池 ##############
"""进程池的概念为什么会有进程池?1,因为没开启一个进程,都需要创建一个内存空间,这是耗时的2,进程过多,操作熊的调度也会耗时,所以会有非常大的性能问题,所以我们不会让进程太大,我们会设计一个进程池,进程池:1,Python中先创建一个进程的池子,2,这个进程池能存放多少个进程,比如有5个进程,3,先把这些进程创建好,4,比如有50个任务他们到进程池里面去找进程,找到的就执行,找不到的就等待,5,进程执行结束之后,不会结束,而是返回进程池,等待下一个任务,所以进程池,可以节省进程创建的时间,节省了操作系统的调度,而且进程不会过多的创建,所以进程池和信号量有什么关系?假设有200个任务,信号量,信号量还是200个进程在排队,去拿钥匙,所以不能控制有多少进程,而是控制了同一时间有几个进程在执行,也就是只允许5个进程让操作系统调度,节省了操作系统的调度时间,但是并没有节省进程的创建时间,而进程池,是有200个任务去拿进程,所以进程池既是节省了操作系统的调度时间,也节省进程的创建时间,更高级的进程池是比较智能的,比如现在进程池有5个进程,就可以处理过来了,就不需要增加但是如果处理等待的任务太多了,急需要往进程池里面加进程,一直到设置的进程池上限如果任务减少了,就进程池里面减少,这是比较智能的,Python中没有高级的进程池,只有一个固定的进程数的进程池,没有弹性的那种,"""# from multiprocessing import Pool, Process# def func(n):# print(n + 1)# if __name__ == ‘__main__‘:# pool = Pool(5) # 进程池里面有5个进程,约定就是cpu的内核+1# pool.map(func, range(100)) # 这是模拟100个任务,# 这个map是一个异步的,而且自带close,和join功能,# 上面是一个使用进程池的方法,还有其他的使用进程池的方法# 使用了进程池之后,就不使用哪种创建进程的方式了,from multiprocessing import Poolimport time,osdef func(n): print("start func%s"%n,os.getpid()) time.sleep(1) print("end func%s"%n,os.getpid())if __name__ == ‘__main__‘: p = Pool(5) # 不写就是默认cpu的核数, for i in range(10): # p.apply(func,args=(i,)) # p.apply这是同步,很慢, p.apply_async(func,args=(i,)) # p.apply_async这是异步,这个一定是和close和join同时使用的, p.close() # 结束进程池接收任务 p.join() # 这是感知进程池中的任务执行结束,
############### 进程池的返回值 ##############
# 进程池的返回值,from multiprocessing import Pool, Processdef func(i): return i# if __name__ == ‘__main__‘:# pool = Pool(5)# res_list = []# for i in range(10):# # res = pool.apply(func,args=(i,)) # 所以这个结果接收,就是返回值,# res = pool.apply_async(func,args=(i,)) # 所以这个结果接收,就是返回值,# res_list.append(res)# for res in res_list:# print(res.get()) # get会阻塞等待结果# 上面讲了apply和apply_async 的返回值的问题,# 下面讲讲map的返回值的问题,比较简单if __name__ == ‘__main__‘: pool = Pool(5) ret = pool.map(func,range(10)) print(ret) # 这是返回了一个列表,# 使用的时候想用map,map搞不定就使用,apply_async
############### 进程池的回调函数 ##############
# 进程池的回调函数from multiprocessing import Pooldef func1(n): print(111) return ndef func2(n): print(222) print(n*2)if __name__ == ‘__main__‘: p = Pool(5) p.apply_async(func1,args=(10,),callback=func2) p.close() p.join() # 回调函数都是在主进程中执行的,
python语法基础-并发编程-进程-长期维护
相关内容
- python学习进度4(条件判断),,----------
- Python学习25:关于函数更多的练习,,在这一章的学习中
- C#调用python脚本,,只尝试了两种调用方式
- 14、python异常处理及断言,,前言:本文主要介绍p
- Python语言学习前提:循环语句,,一、循环语句1.循环
- python的面向对象,,一、面向对象编程1、
- python面试题二:Python 基础题,python基础面试题及答案,
- Neovim中提示Error: Required vim compiled with +python,,Neovim在编
- python中的map()函数,,python中的ma
- Python--基础总结(二),,模块操作什么是模块?
评论关闭