python语法基础-并发编程-进程-长期维护,,##########


############### 进程的启动方式1 ##############

"""并发编程:进程1,运行中的程序,就是进程,程序是没有生命的实体,运行起来了就有生命了,操作系统可以管理进程,进程是操作系统基本的执行单元,2,每一个进程都有它自己的地址空间,进程之间是不会混的,比如qq不能访问微信的地址空间,操作系统替你隔离开了,这也是操作系统引入进程这个概念的原因,#######################################进程的调度1,先来先服务,有一个不好的,就是不利于短作业2,短作业优先算法,但其对长作业不利;不能保证紧迫性作业(进程)被及时处理;作业的长短只是被估算出来的。3,时间片轮转算法,就是轮流执行,已经很科学了,4,多级反馈队列算法,有多个队列,有一个新任务来了放入第一个队列,这是优先级加上时间片轮转,第二个任务来了放入下一级,#######################################并发和并行:进程的并行:这种只有在多核cpu才可以实现,进程的并发:这是轮流执行,由于速度很快,看起来像是一起执行的,比如一遍听音乐,一遍写代码,######################################进程的三状态转换图:非常重要1,进程一开始运行的时候,是就绪的状态,这是第一个状态,就是告诉cpu,我已经准备好可以运行了,进入排队了,2,时间片轮转,轮到你了之后,你就运行了,这是第二个状态,3,发生阻塞,这是第三个状态,比如你的程序让你输入内容,input方法, 这时候是阻塞的,你输入完毕了之后,就又畅通了,这是等待I/O完成,input,sleep,文件的输入和输出,事件处理之后,你还要进入就绪状态了,全部处理完了,就结束了,###########################################同步和异步1,同步,需要等待,需要排队,你什么也不能干,2,异步,不需要等待,你可以去做其他事情,###########################################阻塞和非阻塞1,阻塞,就是input,sleep这些,需要等待,这是阻塞,2,非阻塞,就是跳过这些阻塞,但是程序中不可避免的需要阻塞,因为需要等待内容处理,###########################################同步异步和阻塞非阻塞:同步阻塞,就是同步非阻塞异步阻塞异步非阻塞,效率更高,###############################################多进程有一个内置的模块:from multiprocessing import Process要学习几个地方:1,进程的注册2,进程的开启3,进程的异步,4,进程的同步,join,5,进程注册函数的参数,6,启动多个子进程,"""import timefrom multiprocessing import Processimport osdef f(args1,args2):    print("*"*args1)    time.sleep(1)    print("*"*args2)    # print("子进程号",os.getpid())    # print("子进程的父进程号",os.getppid())    # print(‘我是子进程‘)if __name__ == ‘__main__‘:    # p = Process(target=f, args=(‘bob‘,))    # p = Process(target=f, args=(‘bob‘,"123"))  # 注册,p是一个进程,还没有启动,这是主进程,    # args=(‘bob‘,),如果注册的函数是有参数的,就要传递参数,如果有一个参数括号内要有一个逗号,因为这是一个元组,    # p.start()  # 启动进程,这是启动了一个子进程,    # 现在子进程和主进程之间是异步的,如果我想在子进程结束之后再执行下面的代码,变成同步,怎么办?    # p.join()  # 这个join就是在感知一个子进程的一个结束,将异步改成同步,    # time.sleep(1)    # print("父进程号",os.getpid())    # print("父进程的父进程号",os.getppid())  # 这个就是pycharm的进程号,    # print(‘执行主进程的内容了‘)  # 这一句的执行和子进程的执行内容是异步的,不是同步的,    # 开启10个子进程    p_list=[]    for i in range(10):        p = Process(target=f, args=(10*i, 20*i))        p_list.append(p)        p.start()        # print("第%d轮"%(i+1))    # p.join()    [p.join() for p in p_list ]  # 保证前面的10个进程全部结束了,才会执行下面的代码,    print("运行完了")    # 这种开启了多进程,可以读多个进程去存文件,取文件内容,# 进程的生命周期,# 主进程没有开启子进程,就是执行完他的代码就结束了了# 子进程也是执行完自己的代码就结束了,# 开启了子进程的主进程,主进程执行完了,要等待子进程结束之后,主进程才可以结束,

############### 进程的启动方式2 和 进程之间是数据隔离的 ##############

# 进程的启动方式2# 第一点,创建一个类,继承process# 第二点,类中必须实现run方法,这个run方法里面就是子进程要执行的内容,import osfrom multiprocessing import Processclass MyProcess(Process):  # 继承导入的process,    def __init__(self,name):  # 为了进程能传递参数,        super().__init__()  # 这是继承了父类所有的参数,        self.name=name    def run(self):        # print(os.getpid())        print("子进程号",self.pid)        print("参数",self.name)  # print(os.getpid()) 这两句是一样的,if __name__ == ‘__main__‘:    # p1=MyProcess()  # 这是不传参数的    p1=MyProcess("name1")  # 这是传参数的,这就是面向对象的实例化,    p2=MyProcess(‘name2‘)    p3=MyProcess(‘name3‘)    p1.start() #start会自动调用run    p2.start()    # p2.run()    p3.start()    # 三个进程之间是异步的,    p1.join()    p2.join()    p3.join()    # 三个进程都结束了才会执行下面的内容,这是把异步,变成异步,    print(‘主线程‘)# 进程之间的数据隔离问题# 进程和进程之间的数据是否是隔离的,比如qq和微信,之间的数据是隔离的,# 几个进程之间,如果不通过特殊的手段,是不可能共享一个数据的,这个记住,没有什么可理解的,# 下面是一个例子,证明主进程和子进程之间是没有from multiprocessing import Processdef work():    global n  # 声明了一个全局变量,    n=0    print(‘子进程内: ‘,n)if __name__ == ‘__main__‘:    n = 100    p=Process(target=work)    p.start()    print(‘主进程内: ‘,n)

############### 守护进程 ##############

# 守护进程# 子进程----守护进程# 第一版:主进程结束了,子进程还没有结束,# import time# from multiprocessing import Process## def func():#     while True:#         time.sleep(1)#         print("我还活着")### if __name__ == ‘__main__‘:#     p=Process(target=func)#     p.start()#     i = 0#     while i<10:#         time.sleep(1)#         i+=1#     print("主进程结束")# 守护进程,就是主进程代码结束了而结束,记住不是主进程彻底结束,而是代码结束,import timefrom multiprocessing import Processdef func():    while True:        time.sleep(1)        print("我还活着")if __name__ == ‘__main__‘:    p=Process(target=func)    p.daemon=True  # 设置子进程为守护进程,    p.start()    p2=Process(target=func)  # 这个子进程没有设置为守护进程所以这个进程还在进行中,    p2.start()    # time.sleep(3)    # p2.is_alive()  # 判断一个进程是否活着    # p2.terminate()  # 结束一个进程,    i = 0    while i<5:        time.sleep(1)        i+=1    print("主进程代码结束")

############### 进程锁 ##############

# 进程锁# 买票就是一个并发的过程,# 文件db的内容为:{"ticket":1}# 注意一定要用双引号,不然json无法识别# 并发运行,效率高,但竞争写同一文件,数据写入错乱# 买票不加锁,可能会有多个人买到票了。但是票只有一张,# from multiprocessing import Process,Lock# import time,json,random## # 查询余票的函数# def show(i):#     with open("db") as f :#         dic=json.load(f)#     print(‘余票%s‘%dic[‘ticket‘])## # 买票的函数# def bug_ticket(i):#     with open("db") as f :#         dic=json.load(f)#         time.sleep(0.1)  # 模拟读数据的网络延迟#     if dic[‘ticket‘] >0:#         dic[‘ticket‘]-=1#         print(‘\033[32m%s购票成功\033[0m‘%i)#     else:#         print(‘\033[31m%s没买到票\033[0m‘%i)#     time.sleep(0.1)#     with open("db","w") as f:#         json.dump(dic,f)### if __name__ == ‘__main__‘:#     for i in range(10):  # 模拟并发10个客户端查询票#         p=Process(target=show,args=(i,))#         p.start()#     for i in range(10):#         p = Process(target=bug_ticket, args=(i,))#         p.start()# 加锁from multiprocessing import Process,Lockimport time,json,random# 查询余票的函数def show(i):    with open("db") as f :        dic=json.load(f)    print(‘余票%s‘%dic[‘ticket‘])# 买票的函数def bug_ticket(i,lock):    lock.acquire()  # 这就是拿钥匙进门,有一个进程拿了钥匙,之后第二个进程进来就没有钥匙了,就会阻塞,    with open("db") as f :        dic=json.load(f)        time.sleep(0.1)  # 模拟读数据的网络延迟    if dic[‘ticket‘] >0:        dic[‘ticket‘]-=1        print(‘\033[32m%s购票成功\033[0m‘%i)    else:        print(‘\033[31m%s没买到票\033[0m‘%i)    time.sleep(0.1)    with open("db","w") as f:        json.dump(dic,f)    lock.release()  # 这是还钥匙,还了之后,别的进程就可以拿到了,就可以执行了,if __name__ == ‘__main__‘:    for i in range(10):  # 模拟并发10个客户端查询票        p=Process(target=show,args=(i,))        p.start()    lock=Lock()        for i in range(10):        p = Process(target=bug_ticket, args=(i,lock))        p.start()

############### 多进程的信号量 ##############

# 多进程的信号量from multiprocessing import Processimport time,randomfrom multiprocessing import Semaphore# ktv只有1个房间,1个房间只能装4个人,但是这样写就是20个人都进入到房间了,# 假设ket门口有4把钥匙,一个进程来了那一把钥匙,然后关门,这样只有4个进程能拿到,剩下的之后1个进程出来了才可以继续其他的进程,# 这个概念就叫做信号量,同一时间就只有四个人,def ktv(i,sem):    sem.acquire()  # 获取钥匙    print("%d进入ktv"%i)    time.sleep(random.randint(60,180))  # 这是每一个人唱歌1-3分钟    print("%d走出ktv"%i)    sem.release()  # 还钥匙if __name__ == "__main__":    sem = Semaphore(4)  # 这就是设置有多少把钥匙,  信号量的英文就是:Semaphore    for i in range(20):        p=Process(target=ktv,args=(i,sem))        p.start()

############### 进程的事件 ##############

# 进程的事件# 事件import timefrom multiprocessing import Event, Process# 一个信号,可以使所有的进程都进入阻塞状态,也可以控制所有信号都解除阻塞,# 一个事件创建之后,默认是阻塞状态,# e = Event()  # 创建一个事件# print(e.is_set())  # 查看一个事件是否是阻塞状态,# print(123445)# e.set()  # 这是把阻塞的状态改为true,# print(e.is_set())# e.wait()  # 根据e.is_set()的结果,如果是false,就会阻塞,如果是true就会不阻塞# print(12344)# e.clear()  # 这是把阻塞的状态改为false# print(e.is_set())# e.wait()  # 虽然阻塞了,但是一定要有这个wait,才会阻塞后面的代码,# print(444444)# 举一个例子,红绿灯# 每一个进程表示一辆车,def car(e,i):    #e.is_set() 默认返回False 代表的是绿灯    if not e.is_set():        print("car%s在等待"%i)        e.wait()    print("car%s通行了"%i)def light(e):    while True:        if e.is_set():            e.clear()            print(‘\033[31m红灯亮了\033[0m‘)        else:            e.set()            print(‘\033[32m绿灯亮了\033[0m‘)        time.sleep(2)if __name__ == ‘__main__‘:    e=Event()    # 模拟启动交通灯    p1=Process(target=light,args=(e,))    p1.daemon=True    p1.start()    #模拟20辆小车    for i in range(20):        import random        time.sleep(random.uniform(0,2))        p2=Process(target=car,args=(e,i))        p2.start()    print("程序彻底结束!")

############### 进程间的通信---队列 ##############

# 进程间的通信,# 队列和通道,# 队列# 之前学过一个模块是queue,现在进程间的通信不能使用这个# import queue# 基本的队列的方法:# from multiprocessing import Queue# q =Queue(5)  # 创建共享的进程队列,maxsize是队列中允许的最大项数,如果省略此参数,则无大小限制,容量是5# q.put(1)  # 添加值,# q.put(2)# q.put(3)# q.put(4)# q.put(5)# q.put(6)  # 队列满了,就不能放了,这个就会阻塞,# print(q.full())  # 这个队列是否满了,如果q已满,返回为True# print(q.get())  # 返回q中的一个项目。如果q为空,此方法将阻塞# print(q.get())  # 获取值,# print(q.get())  # 获取值,# print(q.get())  # 获取值,# print(q.get())  # 获取值,# print(q.get())  # 获取值,没有值了,第六次的时候就会阻塞,## print(q.empty())  # 如果调用此方法时 q为空,返回True## while True:#     try:#         q.get_nowait()  # 如果有值就等#     except:#         print("队列已经空了")#         import time#         time.sleep(1)# # 队列之间的通信# from multiprocessing import Queue,Process## # 生产数据的函数# def produce(q):#     q.put("hello")## def consume(q):#     print(q.get())## if __name__ == ‘__main__‘:#     q = Queue()#     p = Process(target=produce,args=(q,))#     p.start()#     c = Process(target=consume,args=(q,))#     c.start()##     # 这就是两个子进程之间的通信,通过的队列,## 队列的生产者和消费者模型# 买包子的例子# 有蒸包子的人,这就是生产者,有买包子的人,这就是消费者,# 实际中,可能会有数据供需不平衡的问题,# 就是数据生产的多了没有消费,所以我们要增加消费者,或者减少生产# 数据消费的多了,我们要增加生产者,来解决这个问题,# 我们把生产者作为一个进程,把消费者作为一个进程from multiprocessing import Process,Queue,JoinableQueueimport time, randomdef producer(name,food,q):  # 三个参数,就是谁生产,生产了什么,放到哪里    for i in range(10):        time.sleep(random.randint(1,3))  # 1-3秒生产1个,        f = "%s生产了%s%s"%(name,food,i)        print(f)        q.put(f)    q.join()  # 阻塞, 这是感知一个队列中的数据全部都处理完毕,    # 这种相当于把生产的生命周期拉长了,就是说你是生产完了还没有结束,你还要等待消费者把你生产的所有的东西都消费了,才能结束,def consumer(q,name):    while True:        food =q.get()        if food is None:  # 问题:有多个消费者的时候,只有一个消费者拿到这个值,然后结束了,但是拿不到的消费者,就还没有结束,            print("获取到一个空")            break        print("%s消费了%s"%(name,food))        time.sleep(random.randint(1,3))  # 1-3秒消费1个,        q.task_done()  # 队列的计数器 -1if __name__ == ‘__main__‘:    # q = Queue()    q = JoinableQueue()    p1 = Process(target=producer,args=("andy","包子",q))    p1.start()    p2 = Process(target=producer,args=("Lucy","油条",q))    p2.start()    c1 = Process(target=consumer,args=(q,"xiaoxiao"))    c1.daemon =True    # 意味着,主进程的代码执行结束之后,子进程就结束了,    # 而主进程又是依赖两个生产者结束才结束的,    # 而我在生产者的地方加了一个阻塞,直到消费者全都消费了之后才结束,    # 所以这个设计是非常的巧妙的,    c1.start()    # 只有一个消费者,两个生产者, 所以会有供给过大,需要加一个消费者,    c2 = Process(target=consumer,args=(q,"meimei"))    c2.daemon =True    c2.start()    # 因为只会生产10个,所以怎么能够,没有生产了,但是消费的地方还在get,怎么办?    p1.join()    p2.join()    # q.put(None)    # q.put(None)    # 为什么是两个none?    # 问题:有多个消费者的时候,只有一个消费者拿到这个值,然后结束了,但是拿不到的消费者,就还没有结束,    # 这种写法太麻烦了,如果有1000个还得了,怎么办?使用新的一个模块:JoinableQueue    # 做了三件事;    # 1,把c1,c2,改成守护进程    # 2,把生产者加一个q.join(),直到消费者全部消费结束    # 3,加了一个        q.task_done()  # 队列的计数器 -1

############### 进程间通信--管道(了解) ##############

# 进程间通信# 管道(管道只做了解)from multiprocessing import Pipe,Process# conn1,conn2 = Pipe()  # pipe是一个函数,有两个返回值,这个地方我们是使用两个参数来接收这两个返回值,# conn1.send("123")# print(conn2.recv())# 这就是管道,这是一个双向通信的,# 你调用这个,就会给你一个左边,一个右边,你从左边传入,就可以从右边传出,# 你从右边传入,就可以从左边传出,# 怎么使用管道是的在进程间通信?# 也可以使用生产者和消费者模型import time,randomdef producter(con,pro,name,food):    con.close()    for i in range(4):        time.sleep(random.random())        f = "%s生产了%s%s" % (name, food, i)        print(f)        pro.send(f)def consumer(con,pro,name):    pro.close()    while True:        try:            food = con.recv()            print("%s消费了%s" % (name, food))            time.sleep(random.random())        except EOFError:            con.close()            breakif __name__ == ‘__main__‘:    con, pro = Pipe()    p= Process(target=producter,args=(con, pro,"andy","包子"))    p.start()    c1= Process(target=consumer,args=(con, pro,"li"))    c1.start()    c2= Process(target=consumer,args=(con, pro,"wang"))    c2.start()    con.close()    pro.close()# pipe有一个数据不安全性,一个放一个取没有问题,# 但两个消费者的时候会有问题,会出现两个消费者抢资源的问题,# 怎么解决这个问题?通过加锁# 所以我们还是使用队列,队列就是基于管道加锁的,管道就是基于socket的,# 使用队列就不会有数据不安全的问题了,# 自己加锁需要考虑很多问题,所以我们还是使用队列,管道作为了解,# 我们工作中顶多就会用到队列,

############### 进程之间的数据共享 ##############

# 进程之间的数据共享# 通过Manager模块from multiprocessing import Manager,Process,Lock# 单个进程:# def work(dic):#     dic[‘count‘]-=1#     print(dic)## if __name__ == ‘__main__‘:#     m = Manager()#     dic=m.dict({‘count‘:100})#     p_list=[]#     p = Process(target=work, args=(dic,))#     p_list.append(p)#     p.start()#     p.join()#     print("主进程",dic)  # 主进程 {‘count‘: 99},这就是子进程的改变,影响到了主进程# 多个进程,出现的问题:# def work(dic):#     dic[‘count‘]-=1#     # 这个就是涉及到多个进程修改一个内容的情况,这种是不安全的,怎么办?加锁#     # 按理说是50,可能会结果不是50## if __name__ == ‘__main__‘:#     m = Manager()#     dic=m.dict({‘count‘:100})#     p_list=[]#     for i in range(50):  # 创建多个进程#         p = Process(target=work, args=(dic,))#         p_list.append(p)#         p.start()#     for i in p_list:#         p.join()#     print("主进程",dic)  # 主进程 {‘count‘: 99},这就是子进程的改变,影响到了主进程# 多个进程加锁:def work(dic,lock):    lock.acquire()    dic[‘count‘]-=1    # 这个就是涉及到多个进程修改一个内容的情况,这种是不安全的,怎么办?加锁    lock.release()if __name__ == ‘__main__‘:    m = Manager()    lock = Lock()    dic=m.dict({‘count‘:100})    p_list=[]    for i in range(50):  # 创建多个进程        p = Process(target=work, args=(dic,lock))        p_list.append(p)        p.start()    for i in p_list:        p.join()    print("主进程",dic)  # 主进程 {‘count‘: 99},这就是子进程的改变,影响到了主进程# 这个数据共享,工作中也不会用到,# 队列还有很多,kafak,rebbitmq,memcache

############### 进程池 ##############

"""进程池的概念为什么会有进程池?1,因为没开启一个进程,都需要创建一个内存空间,这是耗时的2,进程过多,操作熊的调度也会耗时,所以会有非常大的性能问题,所以我们不会让进程太大,我们会设计一个进程池,进程池:1,Python中先创建一个进程的池子,2,这个进程池能存放多少个进程,比如有5个进程,3,先把这些进程创建好,4,比如有50个任务他们到进程池里面去找进程,找到的就执行,找不到的就等待,5,进程执行结束之后,不会结束,而是返回进程池,等待下一个任务,所以进程池,可以节省进程创建的时间,节省了操作系统的调度,而且进程不会过多的创建,所以进程池和信号量有什么关系?假设有200个任务,信号量,信号量还是200个进程在排队,去拿钥匙,所以不能控制有多少进程,而是控制了同一时间有几个进程在执行,也就是只允许5个进程让操作系统调度,节省了操作系统的调度时间,但是并没有节省进程的创建时间,而进程池,是有200个任务去拿进程,所以进程池既是节省了操作系统的调度时间,也节省进程的创建时间,更高级的进程池是比较智能的,比如现在进程池有5个进程,就可以处理过来了,就不需要增加但是如果处理等待的任务太多了,急需要往进程池里面加进程,一直到设置的进程池上限如果任务减少了,就进程池里面减少,这是比较智能的,Python中没有高级的进程池,只有一个固定的进程数的进程池,没有弹性的那种,"""# from multiprocessing import Pool, Process# def func(n):#     print(n + 1)# if __name__ == ‘__main__‘:#     pool = Pool(5)  # 进程池里面有5个进程,约定就是cpu的内核+1#     pool.map(func, range(100))  # 这是模拟100个任务,# 这个map是一个异步的,而且自带close,和join功能,# 上面是一个使用进程池的方法,还有其他的使用进程池的方法# 使用了进程池之后,就不使用哪种创建进程的方式了,from multiprocessing import Poolimport time,osdef func(n):    print("start func%s"%n,os.getpid())    time.sleep(1)    print("end func%s"%n,os.getpid())if __name__ == ‘__main__‘:    p = Pool(5) # 不写就是默认cpu的核数,    for i in range(10):        # p.apply(func,args=(i,))  # p.apply这是同步,很慢,        p.apply_async(func,args=(i,))  # p.apply_async这是异步,这个一定是和close和join同时使用的,    p.close()  # 结束进程池接收任务    p.join()  # 这是感知进程池中的任务执行结束,

############### 进程池的返回值 ##############

# 进程池的返回值,from multiprocessing import Pool, Processdef func(i):    return i# if __name__ == ‘__main__‘:#     pool = Pool(5)#     res_list = []#     for i in range(10):#         # res = pool.apply(func,args=(i,))  # 所以这个结果接收,就是返回值,#         res = pool.apply_async(func,args=(i,))  # 所以这个结果接收,就是返回值,#         res_list.append(res)#     for res in res_list:#         print(res.get())  # get会阻塞等待结果# 上面讲了apply和apply_async 的返回值的问题,# 下面讲讲map的返回值的问题,比较简单if __name__ == ‘__main__‘:    pool = Pool(5)    ret = pool.map(func,range(10))    print(ret)  # 这是返回了一个列表,# 使用的时候想用map,map搞不定就使用,apply_async

############### 进程池的回调函数 ##############

# 进程池的回调函数from multiprocessing import Pooldef func1(n):    print(111)    return ndef func2(n):    print(222)    print(n*2)if __name__ == ‘__main__‘:    p = Pool(5)    p.apply_async(func1,args=(10,),callback=func2)    p.close()    p.join()    # 回调函数都是在主进程中执行的,

python语法基础-并发编程-进程-长期维护

评论关闭