Vanson's Eternal Blog

Python中的yield

Python yield.png
Published on
/14 mins read/---

yield 是 Python 中的一个关键字,用于定义生成器(generator)。 生成器是一种特殊的迭代器,它可以记住上次迭代的位置,并在下一次迭代时从上次停止的地方继续执行。 yield 的使用使得函数可以像生成器一样工作,而不仅仅是返回一个值。

工作机制

对外接口

对外接口

  • iter() → 自己
  • next() → 产出下一个值
  • send(value) → 把值“注入”到上一个 yield 表达式处
  • throw(exc) → 把异常注入到上一个 yield 处
  • close() → 在 yield 处抛出 GeneratorExit,做清理

暂停和恢复

yield 的核心功能是暂停函数的执行,并返回一个值。当生成器的 __next__() 方法被调用时,函数从上次暂停的地方继续执行。

def my_generator():
    print("Start")
    yield 1
    print("Middle")
    yield 2
    print("End")
 
gen = my_generator()
print(next(gen))  # 输出:Start, 1
print(next(gen))  # 输出:Middle, 2
print(next(gen))  # 输出:End
 

状态保存

生成器函数在每次调用 yield 时会保存当前的状态,包括局部变量、指令指针等。当再次调用时,函数会从上次暂停的地方继续执行。

 
def counter(start, end):
    while start < end:
        yield start
        start += 1
 
for num in counter(1, 5):
    print(num)  # 输出:1, 2, 3, 4
 

各种用法

定义生成器

使用 yield 的函数称为生成器函数,调用生成器函数会返回一个生成器对象。

def my_generator():
    yield 1
    yield 2
    yield 3
 
gen = my_generator()
print(next(gen))  # 输出:1
print(next(gen))  # 输出:2
print(next(gen))  # 输出:3
 

生成器可以用于 for 循环,每次迭代都会调用生成器函数,直到没有更多的 yield。

for value in my_generator():
    print(value)  # 输出:1, 2, 3

生成器表达式

类似于列表推导式,生成器表达式可以用来创建生成器。

# 列表推导式:立即生成全部数据
list_comp = [x * 2 for x in range(5)]  # [0, 2, 4, 6, 8]
 
# 生成器表达式:按需生成数据
gen_exp = (x * 2 for x in range(5))
print(list(gen_exp))  # 输出: [0, 2, 4, 6, 8]
 
 

yield from

yield from 用于从一个生成器中委托调用另一个生成器。

yield from <iterable> 把「可迭代对象」里的每一个值依次转发给外层调用者;

同时把「双向通道」也透明地桥接起来(send/throw/return/close 全部转交)。

# 语义等价(简化版) yield from subgen  大致等价于
for v in subgen:
    yield v

但 不是 等价:

  • 异常/return 值必须正确传播;
  • send/throw 必须透传;
  • 子生成器 return 的值会被 yield from 表达式本身返回。
def sub_generator():
    yield "Sub 1"
    yield "Sub 2"
 
def main_generator():
    yield "Main 1"
    yield from sub_generator()
    yield "Main 2"
 
for item in main_generator():
    print(item)  # 输出: Main 1 → Sub 1 → Sub 2 → Main 2
 

以前的实现

def nested_generators():
    for i in range(3):
        for j in range(3):
            yield (i, j)

使用 yield from实现

def nested_generators_with_yield_from():
    for i in range(3):
        yield from ((i, j) for j in range(3))

双向通信:send() 方法

通过 send(value) 向生成器发送数据,yield 返回接收到的值。

def interactive_generator():
    value = yield "Ready to receive"
    while True:
        value = yield f"Received: {value}"
 
gen = interactive_generator()
print(next(gen))         # 输出: Ready to receive
print(gen.send("Hello")) # 输出: Received: Hello
print(gen.send(123))     # 输出: Received: 123
 
 

数据流处理

生成器可以用于处理大型数据流,避免一次性加载所有数据到内存中。

def read_large_file(file_path):
    with open(file_path, 'r') as file:
        for line in file:
            yield line.strip()
 
for line in read_large_file('large_file.txt'):
    print(line)
 

提前终止和处理

def resource():
    print('acquire')
    try:
        yield 'DATA'
        yield 'MORE'
    finally:
        print('release')
 
g = resource()
print(next(g))   # acquire  DATA
g.close()        # release

异步编程

虽然 yield 本身不是异步的,但它可以用于实现协程(coroutine)。

# Python 3.4 之前的协程(使用生成器)
def old_coroutine():
    while True:
        received = yield
        print(f"Received: {received}")
 
coro = old_coroutine()
next(coro)           # 启动协程
coro.send("Hello")   # 输出: Received: Hello
coro.send(123)       # 输出: Received: 123
 
 

状态机

状态机是一种行为模型,用于描述一个系统在不同状态之间的转换。生成器通过 yield 暂停和恢复执行的特性,可以非常自然地实现状态机的行为。

基本原理

  • 状态表示:每个 yield 表示一个状态。
  • 状态转换:每次调用 next() 或 send() 时,生成器从一个状态转换到另一个状态。
  • 暂停和恢复:生成器在每个状态处暂停执行,并在下一次调用时从上次暂停的地方继续执行。
def state_machine():
    state = "State A"
    while True:
        if state == "State A":
            print("Current state: State A")
            action = yield
            state = "State B"
        elif state == "State B":
            print("Current state: State B")
            action = yield
            state = "State C"
        elif state == "State C":
            print("Current state: State C")
            action = yield
            state = "State A"
 
# 创建状态机
machine = state_machine()
 
# 启动状态机
next(machine)  # 初始状态为 State A
 
# 触发状态转换
machine.send(None)  # 从 State A 到 State B
machine.send(None)  # 从 State B 到 State C
machine.send(None)  # 从 State C 回到 State A
machine.send(None)  # 再次从 State A 到 State B

上下文管理器:contextlib.contextmanager

利用 yield 实现资源自动管理(如文件操作)。

from contextlib import contextmanager
 
@contextmanager
def file_manager(filename, mode):
    file = open(filename, mode)
    try:
        yield file
    finally:
        file.close()
 
with file_manager("test.txt", "w") as f:
    f.write("Hello World")
 
 

异常处理与生成器关闭

通过 throw() 向生成器抛出异常,close() 终止生成器。

 
def error_generator():
    try:
        yield "Start"
        yield "Continue"
    except ValueError as e:
        yield f"Error handled: {e}"
    finally:
        yield "Cleaning up"
 
gen = error_generator()
print(next(gen))          # 输出: Start
print(gen.throw(ValueError("Oops!")))  # 输出: Error handled: Oops!
print(next(gen))          # 输出: Cleaning up → 抛出 StopIteration
 
gen.close()  # 终止生成器
 
 

生产者-消费者模型

生成器实现协程协作,生产者发送数据,消费者处理数据。

 
def producer():
    data = 0
    while data < 3:
        data += 1
        yield f"Produced: {data}"
 
def consumer(prod):
    while True:
        item = next(prod)
        print(f"Consumed: {item}")
 
prod = producer()
consumer(prod)
 
# 输出:
# Consumed: Produced: 1
# Consumed: Produced: 2
# Consumed: Produced: 3
# 抛出 StopIteration
 
 

惰性求值与大数据处理

逐行读取大文件,避免内存溢出。

 
def read_large_file(file_path):
    with open(file_path, "r") as f:
        for line in f:
            yield line.strip()
 
for line in read_large_file("large_log.txt"):
    process_line(line)  # 逐行处理,内存友好
 
 

无限序列生成

生成器实现无限斐波那契数列。

 
def fibonacci():
    a, b = 0, 1
    while True:
        yield a
        a, b = b, a + b
 
fib = fibonacci()
for _ in range(10):
    print(next(fib))  # 输出: 0 → 1 → 1 → 2 → 3 → 5 → 8 → 13 → 21 → 34
 
 

双向通道透传

def inner():
    val = yield 'inner ready'
    yield f'inner got {val}'
    return 'inner done'
 
def outer():
    ret = yield from inner()
    yield f'outer got return: {ret}'
 
gen = outer()
print(next(gen))            # inner ready
print(gen.send('hello'))    # inner got hello
try:
    gen.send(None)
except StopIteration as e:
    print(e.value)          # outer got return: inner done

递归遍历

展平数据

from collections.abc import Iterable
 
def flatten(obj):
    if isinstance(obj, Iterable) and not isinstance(obj, (str, bytes)):
        for item in obj:
            yield from flatten(item)
    else:
        yield obj
 
tree = [1, [2, 3, [4]], (5, 6)]
print(list(flatten(tree)))   # [1, 2, 3, 4, 5, 6]

协程链——日志流水线

import time, random
 
def source(target):
    """生产者"""
    for i in range(5):
        target.send(f'msg-{i}')
        time.sleep(random.random()*0.1)
    target.close()
 
def filter1(target):
    """过滤大写"""
    while True:
        try:
            item = yield
            target.send(item.upper())
        except GeneratorExit:
            target.close()
            break
 
def sink():
    """终端输出"""
    while True:
        try:
            item = yield
            print('SINK:', item)
        except GeneratorExit:
            print('sink closed')
            break
 
# 手动预激活
s = sink(); next(s)
f = filter1(s); next(f)
 
source(f)

手撸简单的事件循环

工作原理

  • 协程式编程:使用生成器实现协程,通过yield暂停执行并返回等待的I/O操作
  • 非阻塞I/O:使用selectors模块监控多个socket的就绪状态
  • 事件驱动:当I/O就绪时恢复对应的生成器继续执行

执行流程

  • 启动时只有一个accept任务在队列中
  • loop()取出accept任务并执行:
    • accept生成器yield等待监听socket可读
  • 当有新连接时:
    • select返回可读事件
    • 恢复accept生成器,它接受连接并创建handler任务
  • 每个handler任务:
    • 先等待连接可读(接收数据)
    • 收到数据后等待连接可写(发送数据)
    • 重复直到连接关闭
import socket
import selectors
from collections import deque
from typing import Generator, Tuple
 
# 任务队列,存储所有待执行的生成器
tasks = deque()
 
def accept(sock: socket.socket) -> Generator[Tuple[str, socket.socket], None, None]:
    """
    接受新连接的生成器函数
    参数:
        sock: 监听socket
    返回:
        生成器,yield等待的操作类型和socket
    """
    while True:
        # 暂停执行,等待socket可读(有新连接)
        yield 'read', sock
        
        try:
            # 接受新连接
            conn, addr = sock.accept()
            conn.setblocking(False)  # 设置为非阻塞模式
            print(f"Accepted connection from {addr}")
            
            # 为新连接创建处理任务并加入队列
            tasks.append(handler(conn))
        except socket.error as e:
            print(f"Accept error: {e}")
            continue
 
def handler(conn: socket.socket) -> Generator[Tuple[str, socket.socket], None, None]:
    """
    处理客户端连接的生成器函数
    参数:
        conn: 客户端连接socket
    返回:
        生成器,交替yield读/写操作
    """
    try:
        while True:
            # 等待连接可读(有数据到达)
            yield 'read', conn
            
            try:
                # 接收数据
                data = conn.recv(1024)
                if not data:  # 客户端关闭连接
                    break
                    
                print(f"Received: {data.decode().strip()}")
                
                # 等待连接可写
                yield 'write', conn
                
                # 发送处理后的数据(转为大写)
                conn.sendall(data.upper())
            except socket.error as e:
                print(f"Connection error: {e}")
                break
    finally:
        # 确保连接关闭
        print("Closing connection")
        conn.close()
 
def loop() -> None:
    """
    主事件循环
    管理任务执行和I/O事件监控
    """
    # 创建系统最优的I/O多路复用器
    sel = selectors.DefaultSelector()
    
    # 只要还有任务或注册的socket就继续运行
    while tasks or sel.get_map():
        # 如果没有可立即执行的任务,等待I/O事件
        if not tasks:
            # 阻塞等待,直到有I/O事件发生
            events = sel.select()
            for key, _ in events:
                # 获取关联的生成器和操作类型
                gen, what = key.data
                # 从选择器中注销该socket
                sel.unregister(key.fileobj)
                # 将可继续执行的生成器加入任务队列
                tasks.append(gen)
        
        # 处理任务队列中的任务
        while tasks:
            try:
                # 获取下一个任务
                gen = tasks.popleft()
                
                # 执行生成器到下一个yield
                op, sock = next(gen)
                
                # 根据操作类型注册对应事件
                events = selectors.EVENT_READ if op == 'read' else selectors.EVENT_WRITE
                sel.register(sock, events, (gen, op))
            except StopIteration:
                # 生成器执行完毕
                pass
            except Exception as e:
                # 处理其他异常
                print(f"Task error: {e}")
 
def main() -> None:
    """主函数,设置服务器并启动事件循环"""
    try:
        # 创建TCP socket
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.setblocking(False)  # 非阻塞模式
        
        # 绑定到本地25000端口
        sock.bind(('', 25000))
        sock.listen()
        print("Server started on port 25000")
        
        # 将初始accept任务加入队列
        tasks.append(accept(sock))
        
        # 启动事件循环
        loop()
    except KeyboardInterrupt:
        print("\nServer shutting down...")
    finally:
        sock.close()
        print("Server stopped")
 
if __name__ == '__main__':
    main()

常见陷阱 & 调试技巧

常见陷阱

  • yield from 会吞掉子生成器抛出的 StopIteration 并把 .value 返回。如果你手动 try/except StopIteration 包裹 yield from,将无法捕获真正的迭代结束。
  • 在 Python 3.7+ 中,生成器 return 的值可以通过 StopIteration.value 拿到,但框架层(如 asyncio)已经自动处理。

调试技巧

  • inspect.getgeneratorstate(gen) → 查看 'GEN_CREATED', 'GEN_SUSPENDED', 'GEN_CLOSED'
  • gen.gi_frame.f_locals 查看局部变量
  • python -m asyncio 里用 asyncio.run_coroutine_threadsafe 调试原生协程