python--第十天总结(Select/Poll/Epoll使用 ),,Python Sel


Python Select Server,可监控事件数量有限制:

#!/usr/bin/python# -*- coding: utf-8 -*-import selectimport socketimport Queue  server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)server.setblocking(False)server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR  , 1)server_address= (‘192.168.1.5‘,8080)server.bind(server_address)server.listen(10)  #select轮询等待读socket集合inputs = [server]#select轮询等待写socket集合outputs = []message_queues = {}#select超时时间timeout = 20  while True:    print "等待活动连接......"    readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout)      if not (readable or writable or exceptional) :        print "select超时无活动连接,重新select...... "        continue;       #循环可读事件    for s in readable :        #如果是server监听的socket        if s is server:            #同意连接            connection, client_address = s.accept()            print "新连接: ", client_address            connection.setblocking(0)            #将连接加入到select可读事件队列            inputs.append(connection)            #新建连接为key的字典,写回读取到的消息            message_queues[connection] = Queue.Queue()        else:            #不是本机监听就是客户端发来的消息            data = s.recv(1024)            if data :                print "收到数据:" , data , "客户端:",s.getpeername()                message_queues[s].put(data)                if s not in outputs:                    #将读取到的socket加入到可写事件队列                    outputs.append(s)            else:                #空白消息,关闭连接                print "关闭连接:", client_address                if s in outputs :                    outputs.remove(s)                inputs.remove(s)                s.close()                del message_queues[s]    for s in writable:        try:            msg = message_queues[s].get_nowait()        except Queue.Empty:            print "连接:" , s.getpeername() , ‘消息队列为空‘            outputs.remove(s)        else:            print "发送数据:" , msg , "到", s.getpeername()            s.send(msg)          for s in exceptional:        print "异常连接:", s.getpeername()        inputs.remove(s)        if s in outputs:            outputs.remove(s)        s.close()        del message_queues[s]

Python Poll Server,Select升级版,无可监控事件数量限制,还是要轮询所有事件:

#!/usr/bin/python# -*- coding: utf-8 -*-import socketimport selectimport Queue  server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server.setblocking(False)server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)server_address = ("192.168.1.5", 8080)server.bind(server_address)server.listen(5)print  "服务器启动成功,监听IP:" , server_addressmessage_queues = {}#超时,毫秒timeout = 5000#监听哪些事件READ_ONLY = ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)READ_WRITE = (READ_ONLY|select.POLLOUT)#新建轮询事件对象poller = select.poll()#注册本机监听socket到等待可读事件事件集合poller.register(server,READ_ONLY)#文件描述符到socket映射fd_to_socket = {server.fileno():server,}while True:    print "等待活动连接......"    #轮询注册的事件集合    events = poller.poll(timeout)    if not events:      print "poll超时,无活动连接,重新poll......"      continue    print "有" , len(events), "个新事件,开始处理......"    for fd ,flag in events:        s = fd_to_socket[fd]        #可读事件        if flag & (select.POLLIN | select.POLLPRI) :            if s is server :                #如果socket是监听的server代表有新连接                connection , client_address = s.accept()                print "新连接:" , client_address                connection.setblocking(False)                                  fd_to_socket[connection.fileno()] = connection                #加入到等待读事件集合                poller.register(connection,READ_ONLY)                message_queues[connection]  = Queue.Queue()            else :                #接收客户端发送的数据                data = s.recv(1024)                if data:                    print "收到数据:" , data , "客户端:" , s.getpeername()                    message_queues[s].put(data)                    #修改读取到消息的连接到等待写事件集合                    poller.modify(s,READ_WRITE)                else :                    # Close the connection                    print "  closing" , s.getpeername()                    # Stop listening for input on the connection                    poller.unregister(s)                    s.close()                    del message_queues[s]        #连接关闭事件        elif flag & select.POLLHUP :            print " Closing ", s.getpeername() ,"(HUP)"            poller.unregister(s)            s.close()        #可写事件        elif flag & select.POLLOUT :            try:                msg = message_queues[s].get_nowait()            except Queue.Empty:                print s.getpeername() , " queue empty"                poller.modify(s,READ_ONLY)            else :                print "发送数据:" , data , "客户端:" , s.getpeername()                s.send(msg)        #异常事件        elif flag & select.POLLERR:            print "  exception on" , s.getpeername()            poller.unregister(s)            s.close()            del message_queues[s]

Python Epoll Server,基于回调的事件通知模式,轻松管理大量连接:

#!/usr/bin/python# -*- coding: utf-8 -*-import socket, selectimport Queue serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)server_address = ("192.168.1.5", 8080)serversocket.bind(server_address)serversocket.listen(1)print  "服务器启动成功,监听IP:" , server_addressserversocket.setblocking(0)timeout = 10#新建epoll事件对象,后续要监控的事件添加到其中epoll = select.epoll()#添加服务器监听fd到等待读事件集合epoll.register(serversocket.fileno(), select.EPOLLIN)message_queues = {} fd_to_socket = {serversocket.fileno():serversocket,}while True:  print "等待活动连接......"  #轮询注册的事件集合  events = epoll.poll(timeout)  if not events:     print "epoll超时无活动连接,重新轮询......"     continue  print "有" , len(events), "个新事件,开始处理......"  for fd, event in events:     socket = fd_to_socket[fd]     #可读事件     if event & select.EPOLLIN:         #如果活动socket为服务器所监听,有新连接         if socket == serversocket:            connection, address = serversocket.accept()            print "新连接:" , address            connection.setblocking(0)            #注册新连接fd到待读事件集合            epoll.register(connection.fileno(), select.EPOLLIN)            fd_to_socket[connection.fileno()] = connection            message_queues[connection]  = Queue.Queue()         #否则为客户端发送的数据         else:            data = socket.recv(1024)            if data:               print "收到数据:" , data , "客户端:" , socket.getpeername()               message_queues[socket].put(data)               #修改读取到消息的连接到等待写事件集合               epoll.modify(fd, select.EPOLLOUT)     #可写事件     elif event & select.EPOLLOUT:        try:           msg = message_queues[socket].get_nowait()        except Queue.Empty:           print socket.getpeername() , " queue empty"           epoll.modify(fd, select.EPOLLIN)        else :           print "发送数据:" , data , "客户端:" , socket.getpeername()           socket.send(msg)     #关闭事件     elif event & select.EPOLLHUP:        epoll.unregister(fd)        fd_to_socket[fd].close()        del fd_to_socket[fd]epoll.unregister(serversocket.fileno())epoll.close()serversocket.close()

python--第十天总结(Select/Poll/Epoll使用 )

相关内容

    暂无相关文章

评论关闭