Python中IO多路复用模块selector的用法详解,


目录
  • selector 简介
  • 阻塞IO模型下的 socket 网络编程
  • IO多路复用模型下的socket 网络编程 selector
  • selector 原理分析
    • 返回就绪状态文件句柄
    • 数据拷贝
  • asyncio  和  selector 的关系
    • 附录asyncio模块事件循环核心模块

      selector 简介

      selector 是一个实现了IO复用模型的python包,实现了IO多路复用模型的 select、poll 和 epoll 等函数。它允许程序同时监听多个文件描述符(例如套接字),并在其中任何一个就绪时进行相应的操作。这样可以有效地管理并发 I/O 操作,提高程序的性能和资源利用率。

      本篇主要讲解selector编程示例,以socket编程为主题,首先分析阻塞IO模型的网络编程,然后对比selector实现的IO多路复用模型的网络编程。

      阻塞IO模型下的 socket 网络编程

      通过socket实现最简单的客户端和服务端通信的功能,阻塞IO模型的特点就是在文件IO或网络IO时获取数据的函数会一直阻塞,直到数据到来。服务端:server.py

      import socket
      
      # 创建TCP套接字
      server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      
      # 绑定IP地址和端口号
      server_address = ('0.0.0.0', 12346)
      server_socket.bind(server_address)
      
      # 监听连接
      server_socket.listen()
      
      # 接受客户端连接请求
      print("服务器已启动,等待客户端连接...")
      client_socket, client_address = server_socket.accept()
      print(f"与客户端 {client_address} 建立连接")
      
      # 向客户端发送消息
      message = "欢迎连接到服务器!"
      client_socket.sendall(message.encode())
      
      while True:
      
          # 从客户端接收消息
          data = client_socket.recv(1024).decode()
          print(f"客户端消息:{data}")
          client_socket.sendall(f"服务器收到消息:{data}".encode())
          if data == "close":
              # 关闭客户端套接字
              client_socket.close()
      

      客户端:client.py

      import socket
      
      # 创建TCP套接字
      client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      
      # 服务器地址和端口号
      server_address = ('localhost', 12346)
      
      # 连接服务器
      client_socket.connect(server_address)
      
      # 接收服务器的消息
      data = client_socket.recv(1024).decode()
      print(f"已连接到服务器, 服务器消息:{data}")
      
      while True:
          # 向服务器发送消息
          message = input()
          if message == "end":
              break
          client_socket.sendall(message.encode())
          print(client_socket.recv(1024).decode())
      
      # 关闭客户端套接字
      client_socket.close()
      

      启动服务端,未启动客户端

      启动服务端,代码执行到client_socket, client_address = server_socket.accept()暂停,accept 是一个阻塞函数。

      函数说明:

      client_socket, client_address = server_socket.accept()

      阻塞接口,等待客户端的连接,没有客户端连接时阻塞等待,有客户端连接时返回新的聊天socket,用于后续发送和接收消息

      继续启动客户端

      启动客户端之后,客户端连接服务端,服务端代码执行到data = client_socket.recv(1024).decode(), recv 是一个阻塞函数。

      函数说明:

      data = client_socket.recv(1024).decode()

      阻塞接口,等待缓冲区有消息到来。没有消息时阻塞等待,有消息到来返回消息内容

      客户端发送消息

      客户端发送消息,data = client_socket.recv(1024).decode()收到消息,从网络协议栈中获取消息,并返回。继续whie True 循环的下一轮循环,阻塞在相同地方。

      IO多路复用模型下的socket 网络编程 selector

      selector是实现IO多路复用模型的模块,首先回忆一下IO多路复用。

      IO多路复用是通过selectpollepoll监听文件句柄,当有文件句柄处于就绪状态,就通知对应的应用程序处理。

      服务端:server.py

      import selectors
      import socket
      
      # 选择一个当前平台最优的IO多路复用模型
      sel = selectors.DefaultSelector()
      
      
      def accept(server_socket):
          conn, addr = server_socket.accept()
          print(f"与客户端 {addr} 建立连接")
      
          conn.setblocking(False)  # 设定非阻塞
          # 注册conn对象到selector中,当conn可读时,返回conn和回调函数read
          sel.register(conn, selectors.EVENT_READ, read)
      
      
      def read(conn):
          data = conn.recv(1024).decode('utf-8')
          print(f"客户端消息:{data}")
          if data == "close":
              sel.unregister(conn)
              conn.close()
          else:
              conn.sendall(f"服务器收到消息:{data}".encode())
      
      
      if __name__ == "__main__":
          sock = socket.socket()
          sock.bind(("0.0.0.0", 9999))
          sock.listen()
          sock.setblocking(False)  # 设置sock非阻塞
          # 将sock注册到selector中,当sock可读时,返回sock和回调函数accept
          sel.register(sock, selectors.EVENT_READ, accept)
      
          print("创建事件循环")
          while True:
              events = sel.select()  # 阻塞运行,有就绪的事件返回就绪事件列表
              for key, _ in events:
                  print(key)
                  # key.data: 注册的回调函数  key.fileobj: 注册的文件句柄
                  callback = key.data  # 注册的回调函数
                  callback(key.fileobj)

      主要的函数:

      一、自动选择文件IO模型selectors.DefaultSelector()

      选择一个当前平台最优的IO模型,一般来说是epoll或kqueue。存在的可选项包括:

      • SelectSelector
      • PollSelector
      • EpollSelector
      • DevpollSelector
      • KqueueSelector

      DefaultSelector 是一个指向当前平台上可用的最高效实现的别名,当选择epoll时,可以认为 sel = EpollSelector。返回:一个select对象

      二、文件注册 sel.register(sock, selectors.EVENT_READ, accept)

      函数原型:

      register(fileobj, events, data=None)
      

      注册一个用于选择的文件对象,在其上监视 I/O 事件。

      fileobj 是要监视的文件对象。它可以是整数形式的文件描述符或者具有 fileno() 方法的对象。

      events 是要监视的事件的位掩码。

      data 是一个任意对象或变量。

      返回:

      这将返回一个新的 SelectorKey 实例,实例具体内容见下一个函数的key

      三、获取就绪文件events = sel.select()

      函数原型:

      select(timeout=None)
      

      可用于遍历获取状态变为就绪注册的文件,如果设置超时时间则可能会抛出超时异常。返回:一个(key, events)的元组,

      • key: 一个SelectorKey类的实例,包括
      • fileobj: 已注册的文件对象。
      • fd: 下层的文件描述符

      events: 必须在此文件对象上被等待的事件。

      events:文件句柄可读还是可写的标识。为EVENT_READ或EVENT_WRITE,或者二者的组合

      client.py

      import socket
      
      client = socket.socket(family=socket. AF_INET, type=socket.SOCK_STREAM)
      host = socket.gethostname()
      client.connect((host, 9999))
      
      while True:
          data = input("客户端发送数据:").strip()
          client.send(data.encode())
          if data == "end" or data == "":
              client.close()
              break
          print(client.recv(1024).decode("utf-8"))
      

      服务端启动,客户端未启动

      if __name__ == "__main__":
          sock = socket.socket()
          sock.bind(("0.0.0.0", 9999))
          sock.listen()
          sock.setblocking(False)  # 设置sock非阻塞
          # 将sock注册到selector中,当sock可读时,返回sock和回调函数accept
          sel.register(sock, selectors.EVENT_READ, accept)
      
          print("创建事件循环")
          while True:
              events = sel.select()  # 阻塞运行,有就绪的事件返回就绪事件列表
      

      服务端启动,代码完成的功能包括:

      • 创建一个socket,并绑定IP,监听端口
      • 设置socket为非阻塞,否则超时会报错
      • 将socket注册到 selector 中,等待socket就绪,绑定就绪之后的回调函数accept
      • 进入while True循环,访问select返回的就绪列表。这个阻塞函数,没有文件读写就绪就会阻塞。

      继续启动客户端

      启动一个客户端,客户端连接到服务端,socket文件句柄有连接请求,select返回可读状态的socket。返回的events是一个列表,当中只有一个就绪的文件句柄。

      key.data拿到注册的回调函数也就是accept函数,key.fileobj拿到文件句柄的socket对象。调用accept函数,传入socket对象。

      def accept(server_socket):
          conn, addr = server_socket.accept()
          print(f"与客户端 {addr} 建立连接")
      
          conn.setblocking(False)  # 设定非阻塞
          # 注册conn对象到selector中,当conn可读时,返回conn和回调函数read
          sel.register(conn, selectors.EVENT_READ, read)
      

      accept中先通过accept接收连接,返回通信使用的文件句柄conn,然后设置conn为非阻塞,最后将conn阻塞到selector中,传入回调函数read。等conn文件句柄可读时,就表示有数据发送过来,就可以调用read函数读取内容了。

      客户端发送消息

      客户端发送消息时,selector会返回可读状态的conn文件句柄,从返回对象中获取回调函数,调用回调函数read,传入文件句柄。

      def read(conn):
          data = conn.recv(1024).decode('utf-8')
          print(f"客户端消息:{data}")
          if data == "close":
              sel.unregister(conn)
              conn.close()
          else:
              conn.sendall(f"服务器收到消息:{data}".encode())
      

      在read函数中,首先获取了网络协议栈中的消息内容,然后判断消息是否为关闭连接。如果不是则发送一条消息给对方。

      整个基于IO多路复用模型的网络编程流程就是这样。

      selector 原理分析

      selector是操作系统的IO多路复用模型的一种实现。通过selectpollepoll监听文件句柄,在文件句柄可读的状态下,会返回就绪的文件句柄。

      返回就绪状态文件句柄

      sel = selectors.DefaultSelector()
      while True:
          events = sel.select()
      

      循环中访问sel.select()就是监听文件句柄状态的函数,一个阻塞函数。应用程序调用该函数后会等待,直到有数据到来,数据从设备发送到内核空间,在socket编程中就是数据流从网卡到内核空间中。当数据到达内核空间中,该函数返回文件句柄相关的内容。

      数据拷贝

      当文件句柄就绪之后,就可以从文件句柄里读取数据了。

      在selector中相关的函数是

      • conn, addr = server_socket.accept()
      • data = conn.recv(1024).decode('utf-8')

      总结

      一个完整的IO多路复用模型就是由两个部分组成,分别是

      • 返回就绪状态文件句柄
      • 数据拷贝

      asyncio  和  selector 的关系

      selectors 则是 asyncio 的底层实现之一。asyncio实现的协程是由事件循环任务组成的,而selector就是事件循环的重要依赖模块。

      asyncio 使用了 selectors 模块来实现底层的并发 I/O 操作。通过将 selectors 的功能封装为 asyncio 提供的事件循环(Event Loop)和其他协程相关的工具。

      回顾一下事件循环的机制

      任务列表 = [ 任务1, 任务2, 任务3,... ]

      while True:
          可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将'可执行'和'已完成'的任务返回
          
          for 就绪任务 in 已准备就绪的任务列表:
              执行已就绪的任务
              
          for 已完成的任务 in 已完成的任务列表:
              在任务列表中移除 已完成的任务

          如果 任务列表 中的任务都已完成,则终止循环

      事件循环就是一个while True的循环,循环中做的事情有三个:

      • 获取就绪状态的任务和已完成的任务
      • 执行就绪状态的任务
      • 移除已完成的任务

      那么selector的功能在事件循环中的功能就非常明显了,就是负责返回IO相关的就绪任务。

      asyncio 库使用了底层的 selectors 模块来监听和管理文件描述符的状态变化,并在合适的时候将控制权交给其他的协程。这样可以实现非阻塞的 I/O 操作,并支持高并发和并行执行。

      selectors 提供了底层的 I/O 多路复用机制,而 asyncio 在其之上提供了更高级的异步编程框架。

      附录asyncio模块事件循环核心模块

      def run_forever(self):
          """Run until stop() is called."""
          self._check_closed()
          self._check_running()
          self._set_coroutine_origin_tracking(self._debug)
      
          old_agen_hooks = sys.get_asyncgen_hooks()
          try:
              self._thread_id = threading.get_ident()
              sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
                                     finalizer=self._asyncgen_finalizer_hook)
      
              events._set_running_loop(self)
              while True:
                  self._run_once()
                  if self._stopping:
                      break
          finally:
              self._stopping = False
              self._thread_id = None
              events._set_running_loop(None)
              self._set_coroutine_origin_tracking(False)
              sys.set_asyncgen_hooks(*old_agen_hooks)
      

      到此这篇关于Python中IO多路复用模块selector的用法详解的文章就介绍到这了,更多相关Python多路复用模块selector内容请搜索3672js教程以前的文章或继续浏览下面的相关文章希望大家以后多多支持3672js教程!

      您可能感兴趣的文章:
      • python 并发编程 多路复用IO模型详解
      • 详解Python IO口多路复用
      • Python select及selectors模块概念用法详解
      • python 基于selectors库实现文件上传与下载
      • Python多路复用selector模块的基本使用
      • python IO多路复用之epoll详解
      • 基于Python编写一个有趣的进程勾选器(Process Selector)

      评论关闭