Vanson's Eternal Blog

Python中使用IO多路复用和原理

Python io.png
Published on
/16 mins read/---

Python中使用IO多路复用和原理

什么是I/O操作

在计算机系统中,I/O操作是指程序与外部设备(如磁盘、网络接口、键盘等)之间的数据传输。

I/O操作通常比CPU运算慢得多,因此程序在等待I/O操作完成时会处于阻塞状态。例如,当一个网络服务器等待客户端发送数据时,它会阻塞在recv()函数上,直到数据到达。

核心原理

I/O多路复用的核心思想是让一个线程或进程同时管理多个I/O操作,而不是为每个I/O操作创建一个单独的线程或进程。这样可以显著减少系统资源的消耗,提高程序的效率。 I/O多路复用的关键在于能够同时监控多个文件描述符(file descriptor),并等待它们中的任何一个准备好进行读、写或异常操作。文件描述符是一个抽象的概念,用于标识一个打开的文件、管道或网络连接。

实现模型

select模型

通过轮询的方式检查每个文件描述符的状态。它将所有文件描述符的状态复制到内核空间,然后逐个检查。

这种方式简单但效率较低,尤其是当文件描述符数量较多时。

机制

select模块的核心是select.select()函数。它的内部实现依赖于操作系统的select系统调用。select系统调用的工作原理如下:

  • 准备文件描述符集合:程序将需要监控的文件描述符分别放入三个集合中:rlist(读集合)、wlist(写集合)和xlist(异常集合)。
  • 调用select系统调用:程序调用select系统调用,将这三个集合传递给内核。
  • 内核检查文件描述符状态:内核会检查每个文件描述符的状态,判断它们是否准备好进行读、写或异常操作。
  • 返回结果:当有文件描述符准备好时,内核会返回这些文件描述符的集合。程序可以通过检查返回的集合来确定哪些文件描述符已经准备好。

瓶颈

  • 文件描述符数量限制:select系统调用支持的最大文件描述符数量通常是固定的(如1024)。当需要监控的文件描述符数量超过这个限制时,程序将无法正常工作。
  • 效率低下:select需要在每次调用时检查所有文件描述符的状态,这在文件描述符数量较多时会变得非常耗时。

合理使用

  • 减少监控的文件描述符数量:只监控当前需要的文件描述符,避免将不必要的文件描述符加入监控集合。
  • 合理设置超时时间:根据实际需求合理设置select的超时时间,避免长时间阻塞。

Python中使用

import select
import socket
import sys
 
# 创建一个套接字
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(("localhost", 12345))
server_socket.listen(5)
 
# 将标准输入和套接字加入监控列表
inputs = [sys.stdin, server_socket]
 
while True:
    # 使用 select 监控
    readable, writable, exceptional = select.select(inputs, [], [])
 
    for s in readable:
        if s == server_socket:
            # 接受新的连接
            client_socket, client_address = server_socket.accept()
            print(f"接受来自 {client_address} 的连接")
            inputs.append(client_socket)  # 将新的客户端套接字加入监控列表
        elif s == sys.stdin:
            # 处理标准输入
            user_input = sys.stdin.readline().strip()
            if user_input == "exit":
                print("退出程序")
                break
            else:
                print(f"你输入了:{user_input}")
        else:
            # 处理客户端发送的数据
            data = s.recv(1024)
            if data:
                print(f"收到客户端数据:{data.decode()}")
                s.sendall(data)  # 将数据回发给客户端
            else:
                print("客户端断开连接")
                inputs.remove(s)  # 从监控列表中移除该套接字
                s.close()
 
    if user_input == "exit":
        break
 
server_socket.close()
 

poll模型

类似于select,但它使用一个数组来存储文件描述符的状态,而不是复制整个状态。

这种方式避免了select的文件描述符数量限制,但效率仍然受限于文件描述符的数量。

机制

poll模块的核心是poll.poll()函数。它的内部实现依赖于操作系统的poll系统调用。poll系统调用的工作原理如下:

  • 创建pollfd数组:程序创建一个pollfd数组,每个元素包含一个文件描述符及其感兴趣的事件(如读、写、异常)。
  • 调用poll系统调用:程序调用poll系统调用,将pollfd数组传递给内核。
  • 内核检查文件描述符状态:内核会检查每个文件描述符的状态,判断它们是否准备好进行读、写或异常操作。
  • 返回结果:当有文件描述符准备好时,内核会返回这些文件描述符的集合。程序可以通过检查返回的集合来确定哪些文件描述符已经准备好。

优势

  • 没有文件描述符数量限制:poll模块可以支持更多的文件描述符,因为它不需要像select那样将文件描述符状态复制到内核空间。
  • 更高的灵活性:poll模块允许程序为每个文件描述符指定不同的事件类型,更加灵活。

瓶颈

  • 效率问题:poll模块需要在每次调用时检查所有文件描述符的状态,这在文件描述符数量较多时仍然会变得非常耗时。
  • 系统调用开销:每次调用poll系统调用时,都需要将pollfd数组从用户空间复制到内核空间,这会带来一定的开销。

Python中使用

import select
import socket
import sys
 
# 创建一个套接字
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(("localhost", 12345))
server_socket.listen(5)
 
# 创建 poll 对象
poller = select.poll()
poller.register(sys.stdin, select.POLLIN)
poller.register(server_socket, select.POLLIN)
 
fd_to_socket = {sys.stdin.fileno(): sys.stdin, server_socket.fileno(): server_socket}
 
while True:
    events = poller.poll()
 
    for fd, event in events:
        s = fd_to_socket[fd]
        if s == server_socket:
            # 接受新的连接
            client_socket, client_address = server_socket.accept()
            print(f"接受来自 {client_address} 的连接")
            poller.register(client_socket, select.POLLIN)  # 注册新的客户端套接字
            fd_to_socket[client_socket.fileno()] = client_socket
        elif s == sys.stdin:
            # 处理标准输入
            user_input = sys.stdin.readline().strip()
            if user_input == "exit":
                print("退出程序")
                break
            else:
                print(f"你输入了:{user_input}")
        else:
            # 处理客户端发送的数据
            data = s.recv(1024)
            if data:
                print(f"收到客户端数据:{data.decode()}")
                s.sendall(data)  # 将数据回发给客户端
            else:
                print("客户端断开连接")
                poller.unregister(s)  # 从 poll 对象中注销该套接字
                del fd_to_socket[s.fileno()]
                s.close()
 
    if user_input == "exit":
        break
 
server_socket.close()
 
 

epoll模型

通过事件驱动的方式实现。epoll会将文件描述符的状态变化记录在一个内核缓冲区中,当程序调用epoll时,只需要读取这个缓冲区即可获取事件信息。

这种方式效率非常高,特别适合处理大量文件描述符。

机制

epoll模块的核心是epoll系统调用。它的内部实现依赖于Linux内核的epoll机制。epoll的工作原理如下:

  • 创建epoll实例:程序通过epoll.create()创建一个epoll实例。
  • 注册文件描述符:程序通过epoll.register()将文件描述符及其感兴趣的事件注册到epoll实例中。
  • 等待事件:程序通过epoll.poll()等待事件发生。当有文件描述符准备好时,内核会将事件信息存储在一个内核缓冲区中。
  • 获取事件:程序通过epoll.poll()获取内核缓冲区中的事件信息,从而知道哪些文件描述符已经准备好。

优势

epoll模块相比select和poll模块有以下显著优势:

  • 高效率:epoll通过内核缓冲区存储事件信息,避免了每次调用时检查所有文件描述符的状态。这种方式大大提高了效率,特别适合处理大量文件描述符。
  • 无文件描述符数量限制:epoll可以支持大量的文件描述符,不受select的1024个文件描述符的限制。
  • 内核级优化:epoll是Linux内核专门为I/O多路复用设计的机制,经过了大量优化,性能非常出色。

高级用法

epoll模块不仅可以用于简单的I/O操作,还可以用于构建高性能的网络服务器。以下是一些高级应用技巧:

  • 非阻塞I/O:结合非阻塞I/O操作(如socket.setblocking(False)),可以进一步提高程序的效率。非阻塞I/O操作不会阻塞程序,而是立即返回,这使得程序可以快速处理多个I/O操作。
  • 事件驱动编程:epoll模块非常适合事件驱动编程模型。程序可以通过注册事件并等待事件发生,然后根据事件类型进行相应的处理。这种方式可以显著提高程序的响应速度和效率。
  • 边缘触发(Edge - Triggered)模式:epoll支持两种模式:水平触发(Level - Triggered)和边缘触发(Edge - Triggered)。边缘触发模式下,内核只在文件描述符状态发生变化时通知程序一次,这可以避免重复处理同一个事件,进一步提高效率。

Python中使用

import select
import socket
import sys
 
# 创建一个套接字
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(("localhost", 12345))
server_socket.listen(5)
server_socket.setblocking(False)  # 设置为非阻塞模式
 
# 创建 epoll 对象
epoll = select.epoll()
epoll.register(server_socket.fileno(), select.EPOLLIN)
 
fd_to_socket = {server_socket.fileno(): server_socket}
 
while True:
    events = epoll.poll()
 
    for fd, event in events:
        s = fd_to_socket[fd]
        if s == server_socket:
            # 接受新的连接
            client_socket, client_address = server_socket.accept()
            print(f"接受来自 {client_address} 的连接")
            client_socket.setblocking(False)  # 设置为非阻塞模式
            epoll.register(client_socket.fileno(), select.EPOLLIN)  # 注册新的客户端套接字
            fd_to_socket[client_socket.fileno()] = client_socket
        else:
            # 处理客户端发送的数据
            data = s.recv(1024)
            if data:
                print(f"收到客户端数据:{data.decode()}")
                s.sendall(data)  # 将数据回发给客户端
            else:
                print("客户端断开连接")
                epoll.unregister(s)  # 从 epoll 对象中注销该套接字
                del fd_to_socket[s.fileno()]
                s.close()
 

主流框架中的应用

Tornado

Tornado 使用 epoll(在 Linux 系统上)作为其 I/O 多路复用机制。它通过 IOLoop 类来封装底层的 I/O 多路复用逻辑

在 Tornado 的 IOLoop 类中,configurable_default 方法会根据操作系统选择合适的 I/O 多路复用实现。

@classmethod
def configurable_default(cls):
    if hasattr(select, "epoll"):
        from tornado.platform.epoll import EPollIOLoop
        return EPollIOLoop
    if hasattr(select, "kqueue"):
        # Python 2.6+ on BSD or Mac
        from tornado.platform.kqueue import KQueueIOLoop
        return KQueueIOLoop
    from tornado.platform.select import SelectIOLoop
    return SelectIOLoop
 
 
class IOLoop(Configurable):
    # Global lock for creating global IOLoop instance
    _instance_lock = threading.Lock()
    _current = threading.local()
    # 返回一个全局的IOLoop实例,
    大多数应用程序是在主线程上运行一个单例的、全局的IOLoop实例。
    大多数情形下,使用该方法获取IOLoop实例 比 使用current()方法获取当前线程的IOLoop实例要好。
    @staticmethod
    def instance():
        """Returns a global `IOLoop` instance.
        Most applications have a single, global `IOLoop` running on the
        main thread.  Use this method to get this instance from
        another thread.  In most other cases, it is better to use `current()`
        to get the current thread's `IOLoop`.
        """
        if not hasattr(IOLoop, "_instance"):
            with IOLoop._instance_lock:
                if not hasattr(IOLoop, "_instance"):
                    # New instance after double check
                    IOLoop._instance = IOLoop()
                    # 因为IOLoop继承了Configurable,并且IOLoop自己没有定义__new__方法,所以会调用Configurable的__new__方法,创建实例。
                    # IOLoop的configurable_base()方法,返回的是自身,并且没有通过__impl_class指定实现类。
                    # 所以会使用IOLoop.configurable_default()方法返回的类作为实现类。
                    # 首先应该知道的是:IOLoop本质上是封装了操作系统的IO多路复用机制,不同的操作系统提供了不同的IO多路复用机制,
                    # 比如linux提供了epoll机制,bsd和mac提供了kqueue机制,windows提供了select机制。
                    # 在tornado中,每种机制都对应一个具体的IOLoop子类,比如linux的epoll对应tornado.platform.epoll.EPollIOLoop类。
                    # IOLoop.configurable_default()方法完成的任务就是根据操作系统平台选择其对应的实现类。
                    # 所以,开发人员不必自己根据操作系统选择实现类,这个过程是自动的。
                    # 在创建了实现类的对象之后,就会调用它的intialize()方法进行初始化。以EPollIOLoop为例进行说明:
                    # EPollIOLoop的initialize方法会调用其父类PollIOLoop的initialize()方法,
                    # 并且将impl参数指定为select.epoll()(也就是使用epoll机制)
                    # PollIOLoop的initalize()方法,又会调用IOLoop的initialize()方法。

asyncio

在 asyncio 的实现中,事件循环的后端会根据操作系统选择合适的 I/O 多路复用机制。

在 Linux 系统上,asyncio 默认使用 EPollIOLoop类。

def get_event_loop_policy():
    # 在 Linux 系统上,asyncio 默认使用 EpollEventLoopPolicy
    if hasattr(select, "epoll"):
        return EpollEventLoopPolicy()
    # 在其他系统上,asyncio 可能使用 SelectEventLoopPolicy
    return SelectEventLoopPolicy()