Python中使用IO多路复用和原理
什么是I/O操作
在计算机系统中,I/O操作是指程序与外部设备(如磁盘、网络接口、键盘等)之间的数据传输。
I/O操作通常比CPU运算慢得多,因此程序在等待I/O操作完成时会处于阻塞状态。例如,当一个网络服务器等待客户端发送数据时,它会阻塞在recv()函数上,直到数据到达。
核心原理
I/O多路复用的核心思想是让一个线程或进程同时管理多个I/O操作,而不是为每个I/O操作创建一个单独的线程或进程。这样可以显著减少系统资源的消耗,提高程序的效率。 I/O多路复用的关键在于能够同时监控多个文件描述符(file descriptor),并等待它们中的任何一个准备好进行读、写或异常操作。文件描述符是一个抽象的概念,用于标识一个打开的文件、管道或网络连接。
实现模型
select模型
通过轮询的方式检查每个文件描述符的状态。它将所有文件描述符的状态复制到内核空间,然后逐个检查。
这种方式简单但效率较低,尤其是当文件描述符数量较多时。
机制
select模块的核心是select.select()函数。它的内部实现依赖于操作系统的select系统调用。select系统调用的工作原理如下:
- 准备文件描述符集合:程序将需要监控的文件描述符分别放入三个集合中:rlist(读集合)、wlist(写集合)和xlist(异常集合)。
- 调用select系统调用:程序调用select系统调用,将这三个集合传递给内核。
- 内核检查文件描述符状态:内核会检查每个文件描述符的状态,判断它们是否准备好进行读、写或异常操作。
- 返回结果:当有文件描述符准备好时,内核会返回这些文件描述符的集合。程序可以通过检查返回的集合来确定哪些文件描述符已经准备好。
瓶颈
- 文件描述符数量限制:select系统调用支持的最大文件描述符数量通常是固定的(如1024)。当需要监控的文件描述符数量超过这个限制时,程序将无法正常工作。
- 效率低下:select需要在每次调用时检查所有文件描述符的状态,这在文件描述符数量较多时会变得非常耗时。
合理使用
- 减少监控的文件描述符数量:只监控当前需要的文件描述符,避免将不必要的文件描述符加入监控集合。
- 合理设置超时时间:根据实际需求合理设置select的超时时间,避免长时间阻塞。
Python中使用
import select
import socket
import sys
# 创建一个套接字
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(("localhost", 12345))
server_socket.listen(5)
# 将标准输入和套接字加入监控列表
inputs = [sys.stdin, server_socket]
while True:
# 使用 select 监控
readable, writable, exceptional = select.select(inputs, [], [])
for s in readable:
if s == server_socket:
# 接受新的连接
client_socket, client_address = server_socket.accept()
print(f"接受来自 {client_address} 的连接")
inputs.append(client_socket) # 将新的客户端套接字加入监控列表
elif s == sys.stdin:
# 处理标准输入
user_input = sys.stdin.readline().strip()
if user_input == "exit":
print("退出程序")
break
else:
print(f"你输入了:{user_input}")
else:
# 处理客户端发送的数据
data = s.recv(1024)
if data:
print(f"收到客户端数据:{data.decode()}")
s.sendall(data) # 将数据回发给客户端
else:
print("客户端断开连接")
inputs.remove(s) # 从监控列表中移除该套接字
s.close()
if user_input == "exit":
break
server_socket.close()
poll模型
类似于select,但它使用一个数组来存储文件描述符的状态,而不是复制整个状态。
这种方式避免了select的文件描述符数量限制,但效率仍然受限于文件描述符的数量。
机制
poll模块的核心是poll.poll()函数。它的内部实现依赖于操作系统的poll系统调用。poll系统调用的工作原理如下:
- 创建pollfd数组:程序创建一个pollfd数组,每个元素包含一个文件描述符及其感兴趣的事件(如读、写、异常)。
- 调用poll系统调用:程序调用poll系统调用,将pollfd数组传递给内核。
- 内核检查文件描述符状态:内核会检查每个文件描述符的状态,判断它们是否准备好进行读、写或异常操作。
- 返回结果:当有文件描述符准备好时,内核会返回这些文件描述符的集合。程序可以通过检查返回的集合来确定哪些文件描述符已经准备好。
优势
- 没有文件描述符数量限制:poll模块可以支持更多的文件描述符,因为它不需要像select那样将文件描述符状态复制到内核空间。
- 更高的灵活性:poll模块允许程序为每个文件描述符指定不同的事件类型,更加灵活。
瓶颈
- 效率问题:poll模块需要在每次调用时检查所有文件描述符的状态,这在文件描述符数量较多时仍然会变得非常耗时。
- 系统调用开销:每次调用poll系统调用时,都需要将pollfd数组从用户空间复制到内核空间,这会带来一定的开销。
Python中使用
import select
import socket
import sys
# 创建一个套接字
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(("localhost", 12345))
server_socket.listen(5)
# 创建 poll 对象
poller = select.poll()
poller.register(sys.stdin, select.POLLIN)
poller.register(server_socket, select.POLLIN)
fd_to_socket = {sys.stdin.fileno(): sys.stdin, server_socket.fileno(): server_socket}
while True:
events = poller.poll()
for fd, event in events:
s = fd_to_socket[fd]
if s == server_socket:
# 接受新的连接
client_socket, client_address = server_socket.accept()
print(f"接受来自 {client_address} 的连接")
poller.register(client_socket, select.POLLIN) # 注册新的客户端套接字
fd_to_socket[client_socket.fileno()] = client_socket
elif s == sys.stdin:
# 处理标准输入
user_input = sys.stdin.readline().strip()
if user_input == "exit":
print("退出程序")
break
else:
print(f"你输入了:{user_input}")
else:
# 处理客户端发送的数据
data = s.recv(1024)
if data:
print(f"收到客户端数据:{data.decode()}")
s.sendall(data) # 将数据回发给客户端
else:
print("客户端断开连接")
poller.unregister(s) # 从 poll 对象中注销该套接字
del fd_to_socket[s.fileno()]
s.close()
if user_input == "exit":
break
server_socket.close()
epoll模型
通过事件驱动的方式实现。epoll会将文件描述符的状态变化记录在一个内核缓冲区中,当程序调用epoll时,只需要读取这个缓冲区即可获取事件信息。
这种方式效率非常高,特别适合处理大量文件描述符。
机制
epoll模块的核心是epoll系统调用。它的内部实现依赖于Linux内核的epoll机制。epoll的工作原理如下:
- 创建epoll实例:程序通过epoll.create()创建一个epoll实例。
- 注册文件描述符:程序通过epoll.register()将文件描述符及其感兴趣的事件注册到epoll实例中。
- 等待事件:程序通过epoll.poll()等待事件发生。当有文件描述符准备好时,内核会将事件信息存储在一个内核缓冲区中。
- 获取事件:程序通过epoll.poll()获取内核缓冲区中的事件信息,从而知道哪些文件描述符已经准备好。
优势
epoll模块相比select和poll模块有以下显著优势:
- 高效率:epoll通过内核缓冲区存储事件信息,避免了每次调用时检查所有文件描述符的状态。这种方式大大提高了效率,特别适合处理大量文件描述符。
- 无文件描述符数量限制:epoll可以支持大量的文件描述符,不受select的1024个文件描述符的限制。
- 内核级优化:epoll是Linux内核专门为I/O多路复用设计的机制,经过了大量优化,性能非常出色。
高级用法
epoll模块不仅可以用于简单的I/O操作,还可以用于构建高性能的网络服务器。以下是一些高级应用技巧:
- 非阻塞I/O:结合非阻塞I/O操作(如socket.setblocking(False)),可以进一步提高程序的效率。非阻塞I/O操作不会阻塞程序,而是立即返回,这使得程序可以快速处理多个I/O操作。
- 事件驱动编程:epoll模块非常适合事件驱动编程模型。程序可以通过注册事件并等待事件发生,然后根据事件类型进行相应的处理。这种方式可以显著提高程序的响应速度和效率。
- 边缘触发(Edge - Triggered)模式:epoll支持两种模式:水平触发(Level - Triggered)和边缘触发(Edge - Triggered)。边缘触发模式下,内核只在文件描述符状态发生变化时通知程序一次,这可以避免重复处理同一个事件,进一步提高效率。
Python中使用
import select
import socket
import sys
# 创建一个套接字
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(("localhost", 12345))
server_socket.listen(5)
server_socket.setblocking(False) # 设置为非阻塞模式
# 创建 epoll 对象
epoll = select.epoll()
epoll.register(server_socket.fileno(), select.EPOLLIN)
fd_to_socket = {server_socket.fileno(): server_socket}
while True:
events = epoll.poll()
for fd, event in events:
s = fd_to_socket[fd]
if s == server_socket:
# 接受新的连接
client_socket, client_address = server_socket.accept()
print(f"接受来自 {client_address} 的连接")
client_socket.setblocking(False) # 设置为非阻塞模式
epoll.register(client_socket.fileno(), select.EPOLLIN) # 注册新的客户端套接字
fd_to_socket[client_socket.fileno()] = client_socket
else:
# 处理客户端发送的数据
data = s.recv(1024)
if data:
print(f"收到客户端数据:{data.decode()}")
s.sendall(data) # 将数据回发给客户端
else:
print("客户端断开连接")
epoll.unregister(s) # 从 epoll 对象中注销该套接字
del fd_to_socket[s.fileno()]
s.close()
主流框架中的应用
Tornado
Tornado 使用 epoll(在 Linux 系统上)作为其 I/O 多路复用机制。它通过 IOLoop 类来封装底层的 I/O 多路复用逻辑
在 Tornado 的 IOLoop 类中,configurable_default 方法会根据操作系统选择合适的 I/O 多路复用实现。
@classmethod
def configurable_default(cls):
if hasattr(select, "epoll"):
from tornado.platform.epoll import EPollIOLoop
return EPollIOLoop
if hasattr(select, "kqueue"):
# Python 2.6+ on BSD or Mac
from tornado.platform.kqueue import KQueueIOLoop
return KQueueIOLoop
from tornado.platform.select import SelectIOLoop
return SelectIOLoop
class IOLoop(Configurable):
# Global lock for creating global IOLoop instance
_instance_lock = threading.Lock()
_current = threading.local()
# 返回一个全局的IOLoop实例,
大多数应用程序是在主线程上运行一个单例的、全局的IOLoop实例。
大多数情形下,使用该方法获取IOLoop实例 比 使用current()方法获取当前线程的IOLoop实例要好。
@staticmethod
def instance():
"""Returns a global `IOLoop` instance.
Most applications have a single, global `IOLoop` running on the
main thread. Use this method to get this instance from
another thread. In most other cases, it is better to use `current()`
to get the current thread's `IOLoop`.
"""
if not hasattr(IOLoop, "_instance"):
with IOLoop._instance_lock:
if not hasattr(IOLoop, "_instance"):
# New instance after double check
IOLoop._instance = IOLoop()
# 因为IOLoop继承了Configurable,并且IOLoop自己没有定义__new__方法,所以会调用Configurable的__new__方法,创建实例。
# IOLoop的configurable_base()方法,返回的是自身,并且没有通过__impl_class指定实现类。
# 所以会使用IOLoop.configurable_default()方法返回的类作为实现类。
# 首先应该知道的是:IOLoop本质上是封装了操作系统的IO多路复用机制,不同的操作系统提供了不同的IO多路复用机制,
# 比如linux提供了epoll机制,bsd和mac提供了kqueue机制,windows提供了select机制。
# 在tornado中,每种机制都对应一个具体的IOLoop子类,比如linux的epoll对应tornado.platform.epoll.EPollIOLoop类。
# IOLoop.configurable_default()方法完成的任务就是根据操作系统平台选择其对应的实现类。
# 所以,开发人员不必自己根据操作系统选择实现类,这个过程是自动的。
# 在创建了实现类的对象之后,就会调用它的intialize()方法进行初始化。以EPollIOLoop为例进行说明:
# EPollIOLoop的initialize方法会调用其父类PollIOLoop的initialize()方法,
# 并且将impl参数指定为select.epoll()(也就是使用epoll机制)
# PollIOLoop的initalize()方法,又会调用IOLoop的initialize()方法。
asyncio
在 asyncio 的实现中,事件循环的后端会根据操作系统选择合适的 I/O 多路复用机制。
在 Linux 系统上,asyncio 默认使用 EPollIOLoop类。
def get_event_loop_policy():
# 在 Linux 系统上,asyncio 默认使用 EpollEventLoopPolicy
if hasattr(select, "epoll"):
return EpollEventLoopPolicy()
# 在其他系统上,asyncio 可能使用 SelectEventLoopPolicy
return SelectEventLoopPolicy()