yield 是 Python 中的一个关键字,用于定义生成器(generator)。 生成器是一种特殊的迭代器,它可以记住上次迭代的位置,并在下一次迭代时从上次停止的地方继续执行。 yield 的使用使得函数可以像生成器一样工作,而不仅仅是返回一个值。
工作机制
对外接口
对外接口
- iter() → 自己
- next() → 产出下一个值
- send(value) → 把值“注入”到上一个 yield 表达式处
- throw(exc) → 把异常注入到上一个 yield 处
- close() → 在 yield 处抛出 GeneratorExit,做清理
暂停和恢复
yield 的核心功能是暂停函数的执行,并返回一个值。当生成器的 __next__()
方法被调用时,函数从上次暂停的地方继续执行。
def my_generator():
print("Start")
yield 1
print("Middle")
yield 2
print("End")
gen = my_generator()
print(next(gen)) # 输出:Start, 1
print(next(gen)) # 输出:Middle, 2
print(next(gen)) # 输出:End
状态保存
生成器函数在每次调用 yield 时会保存当前的状态,包括局部变量、指令指针等。当再次调用时,函数会从上次暂停的地方继续执行。
def counter(start, end):
while start < end:
yield start
start += 1
for num in counter(1, 5):
print(num) # 输出:1, 2, 3, 4
各种用法
定义生成器
使用 yield 的函数称为生成器函数,调用生成器函数会返回一个生成器对象。
def my_generator():
yield 1
yield 2
yield 3
gen = my_generator()
print(next(gen)) # 输出:1
print(next(gen)) # 输出:2
print(next(gen)) # 输出:3
生成器可以用于 for 循环,每次迭代都会调用生成器函数,直到没有更多的 yield。
for value in my_generator():
print(value) # 输出:1, 2, 3
生成器表达式
类似于列表推导式,生成器表达式可以用来创建生成器。
# 列表推导式:立即生成全部数据
list_comp = [x * 2 for x in range(5)] # [0, 2, 4, 6, 8]
# 生成器表达式:按需生成数据
gen_exp = (x * 2 for x in range(5))
print(list(gen_exp)) # 输出: [0, 2, 4, 6, 8]
yield from
yield from 用于从一个生成器中委托调用另一个生成器。
yield from <iterable>
把「可迭代对象」里的每一个值依次转发给外层调用者;
同时把「双向通道」也透明地桥接起来(send/throw/return/close 全部转交)。
# 语义等价(简化版) yield from subgen 大致等价于
for v in subgen:
yield v
但 不是 等价:
- 异常/return 值必须正确传播;
- send/throw 必须透传;
- 子生成器 return 的值会被 yield from 表达式本身返回。
def sub_generator():
yield "Sub 1"
yield "Sub 2"
def main_generator():
yield "Main 1"
yield from sub_generator()
yield "Main 2"
for item in main_generator():
print(item) # 输出: Main 1 → Sub 1 → Sub 2 → Main 2
以前的实现
def nested_generators():
for i in range(3):
for j in range(3):
yield (i, j)
使用 yield from实现
def nested_generators_with_yield_from():
for i in range(3):
yield from ((i, j) for j in range(3))
双向通信:send() 方法
通过 send(value) 向生成器发送数据,yield 返回接收到的值。
def interactive_generator():
value = yield "Ready to receive"
while True:
value = yield f"Received: {value}"
gen = interactive_generator()
print(next(gen)) # 输出: Ready to receive
print(gen.send("Hello")) # 输出: Received: Hello
print(gen.send(123)) # 输出: Received: 123
数据流处理
生成器可以用于处理大型数据流,避免一次性加载所有数据到内存中。
def read_large_file(file_path):
with open(file_path, 'r') as file:
for line in file:
yield line.strip()
for line in read_large_file('large_file.txt'):
print(line)
提前终止和处理
def resource():
print('acquire')
try:
yield 'DATA'
yield 'MORE'
finally:
print('release')
g = resource()
print(next(g)) # acquire DATA
g.close() # release
异步编程
虽然 yield 本身不是异步的,但它可以用于实现协程(coroutine)。
# Python 3.4 之前的协程(使用生成器)
def old_coroutine():
while True:
received = yield
print(f"Received: {received}")
coro = old_coroutine()
next(coro) # 启动协程
coro.send("Hello") # 输出: Received: Hello
coro.send(123) # 输出: Received: 123
状态机
状态机是一种行为模型,用于描述一个系统在不同状态之间的转换。生成器通过 yield 暂停和恢复执行的特性,可以非常自然地实现状态机的行为。
基本原理
- 状态表示:每个 yield 表示一个状态。
- 状态转换:每次调用 next() 或 send() 时,生成器从一个状态转换到另一个状态。
- 暂停和恢复:生成器在每个状态处暂停执行,并在下一次调用时从上次暂停的地方继续执行。
def state_machine():
state = "State A"
while True:
if state == "State A":
print("Current state: State A")
action = yield
state = "State B"
elif state == "State B":
print("Current state: State B")
action = yield
state = "State C"
elif state == "State C":
print("Current state: State C")
action = yield
state = "State A"
# 创建状态机
machine = state_machine()
# 启动状态机
next(machine) # 初始状态为 State A
# 触发状态转换
machine.send(None) # 从 State A 到 State B
machine.send(None) # 从 State B 到 State C
machine.send(None) # 从 State C 回到 State A
machine.send(None) # 再次从 State A 到 State B
上下文管理器:contextlib.contextmanager
利用 yield 实现资源自动管理(如文件操作)。
from contextlib import contextmanager
@contextmanager
def file_manager(filename, mode):
file = open(filename, mode)
try:
yield file
finally:
file.close()
with file_manager("test.txt", "w") as f:
f.write("Hello World")
异常处理与生成器关闭
通过 throw() 向生成器抛出异常,close() 终止生成器。
def error_generator():
try:
yield "Start"
yield "Continue"
except ValueError as e:
yield f"Error handled: {e}"
finally:
yield "Cleaning up"
gen = error_generator()
print(next(gen)) # 输出: Start
print(gen.throw(ValueError("Oops!"))) # 输出: Error handled: Oops!
print(next(gen)) # 输出: Cleaning up → 抛出 StopIteration
gen.close() # 终止生成器
生产者-消费者模型
生成器实现协程协作,生产者发送数据,消费者处理数据。
def producer():
data = 0
while data < 3:
data += 1
yield f"Produced: {data}"
def consumer(prod):
while True:
item = next(prod)
print(f"Consumed: {item}")
prod = producer()
consumer(prod)
# 输出:
# Consumed: Produced: 1
# Consumed: Produced: 2
# Consumed: Produced: 3
# 抛出 StopIteration
惰性求值与大数据处理
逐行读取大文件,避免内存溢出。
def read_large_file(file_path):
with open(file_path, "r") as f:
for line in f:
yield line.strip()
for line in read_large_file("large_log.txt"):
process_line(line) # 逐行处理,内存友好
无限序列生成
生成器实现无限斐波那契数列。
def fibonacci():
a, b = 0, 1
while True:
yield a
a, b = b, a + b
fib = fibonacci()
for _ in range(10):
print(next(fib)) # 输出: 0 → 1 → 1 → 2 → 3 → 5 → 8 → 13 → 21 → 34
双向通道透传
def inner():
val = yield 'inner ready'
yield f'inner got {val}'
return 'inner done'
def outer():
ret = yield from inner()
yield f'outer got return: {ret}'
gen = outer()
print(next(gen)) # inner ready
print(gen.send('hello')) # inner got hello
try:
gen.send(None)
except StopIteration as e:
print(e.value) # outer got return: inner done
递归遍历
展平数据
from collections.abc import Iterable
def flatten(obj):
if isinstance(obj, Iterable) and not isinstance(obj, (str, bytes)):
for item in obj:
yield from flatten(item)
else:
yield obj
tree = [1, [2, 3, [4]], (5, 6)]
print(list(flatten(tree))) # [1, 2, 3, 4, 5, 6]
协程链——日志流水线
import time, random
def source(target):
"""生产者"""
for i in range(5):
target.send(f'msg-{i}')
time.sleep(random.random()*0.1)
target.close()
def filter1(target):
"""过滤大写"""
while True:
try:
item = yield
target.send(item.upper())
except GeneratorExit:
target.close()
break
def sink():
"""终端输出"""
while True:
try:
item = yield
print('SINK:', item)
except GeneratorExit:
print('sink closed')
break
# 手动预激活
s = sink(); next(s)
f = filter1(s); next(f)
source(f)
手撸简单的事件循环
工作原理
- 协程式编程:使用生成器实现协程,通过yield暂停执行并返回等待的I/O操作
- 非阻塞I/O:使用selectors模块监控多个socket的就绪状态
- 事件驱动:当I/O就绪时恢复对应的生成器继续执行
执行流程
- 启动时只有一个accept任务在队列中
- loop()取出accept任务并执行:
- accept生成器yield等待监听socket可读
- 当有新连接时:
- select返回可读事件
- 恢复accept生成器,它接受连接并创建handler任务
- 每个handler任务:
- 先等待连接可读(接收数据)
- 收到数据后等待连接可写(发送数据)
- 重复直到连接关闭
import socket
import selectors
from collections import deque
from typing import Generator, Tuple
# 任务队列,存储所有待执行的生成器
tasks = deque()
def accept(sock: socket.socket) -> Generator[Tuple[str, socket.socket], None, None]:
"""
接受新连接的生成器函数
参数:
sock: 监听socket
返回:
生成器,yield等待的操作类型和socket
"""
while True:
# 暂停执行,等待socket可读(有新连接)
yield 'read', sock
try:
# 接受新连接
conn, addr = sock.accept()
conn.setblocking(False) # 设置为非阻塞模式
print(f"Accepted connection from {addr}")
# 为新连接创建处理任务并加入队列
tasks.append(handler(conn))
except socket.error as e:
print(f"Accept error: {e}")
continue
def handler(conn: socket.socket) -> Generator[Tuple[str, socket.socket], None, None]:
"""
处理客户端连接的生成器函数
参数:
conn: 客户端连接socket
返回:
生成器,交替yield读/写操作
"""
try:
while True:
# 等待连接可读(有数据到达)
yield 'read', conn
try:
# 接收数据
data = conn.recv(1024)
if not data: # 客户端关闭连接
break
print(f"Received: {data.decode().strip()}")
# 等待连接可写
yield 'write', conn
# 发送处理后的数据(转为大写)
conn.sendall(data.upper())
except socket.error as e:
print(f"Connection error: {e}")
break
finally:
# 确保连接关闭
print("Closing connection")
conn.close()
def loop() -> None:
"""
主事件循环
管理任务执行和I/O事件监控
"""
# 创建系统最优的I/O多路复用器
sel = selectors.DefaultSelector()
# 只要还有任务或注册的socket就继续运行
while tasks or sel.get_map():
# 如果没有可立即执行的任务,等待I/O事件
if not tasks:
# 阻塞等待,直到有I/O事件发生
events = sel.select()
for key, _ in events:
# 获取关联的生成器和操作类型
gen, what = key.data
# 从选择器中注销该socket
sel.unregister(key.fileobj)
# 将可继续执行的生成器加入任务队列
tasks.append(gen)
# 处理任务队列中的任务
while tasks:
try:
# 获取下一个任务
gen = tasks.popleft()
# 执行生成器到下一个yield
op, sock = next(gen)
# 根据操作类型注册对应事件
events = selectors.EVENT_READ if op == 'read' else selectors.EVENT_WRITE
sel.register(sock, events, (gen, op))
except StopIteration:
# 生成器执行完毕
pass
except Exception as e:
# 处理其他异常
print(f"Task error: {e}")
def main() -> None:
"""主函数,设置服务器并启动事件循环"""
try:
# 创建TCP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(False) # 非阻塞模式
# 绑定到本地25000端口
sock.bind(('', 25000))
sock.listen()
print("Server started on port 25000")
# 将初始accept任务加入队列
tasks.append(accept(sock))
# 启动事件循环
loop()
except KeyboardInterrupt:
print("\nServer shutting down...")
finally:
sock.close()
print("Server stopped")
if __name__ == '__main__':
main()
常见陷阱 & 调试技巧
常见陷阱
- yield from 会吞掉子生成器抛出的 StopIteration 并把 .value 返回。如果你手动 try/except StopIteration 包裹 yield from,将无法捕获真正的迭代结束。
- 在 Python 3.7+ 中,生成器 return 的值可以通过 StopIteration.value 拿到,但框架层(如 asyncio)已经自动处理。
调试技巧
- inspect.getgeneratorstate(gen) → 查看 'GEN_CREATED', 'GEN_SUSPENDED', 'GEN_CLOSED'
- gen.gi_frame.f_locals 查看局部变量
- python -m asyncio 里用 asyncio.run_coroutine_threadsafe 调试原生协程