Vanson's Eternal Blog

Python中的异步编程详解

Python async.png
Published on
/27 mins read/---

协程

定义

协程(Coroutine)是一种特殊的函数,它允许在执行过程中暂停和恢复。与普通的函数调用不同,协程可以在执行到某个点时暂停,等待某些操作完成后再恢复执行。

这种特性使得协程非常适合处理异步操作,如I/O密集型任务(网络请求、文件读写等),因为它可以在等待I/O操作完成时释放执行权,让其他任务运行,从而提高程序的并发性和效率。 协程的核心概念

特性进程 (Process)线程 (Thread)协程 (Coroutine)
定义操作系统分配资源的基本单位进程中的执行单元,共享进程资源通过async def定义的函数,可暂停
创建开销大(创建新进程开销大)小(共享进程资源)非常小(轻量级)
切换开销大(上下文切换开销大)小(上下文切换开销小)非常小(几乎无开销)
并发数量低(受限于系统资源)中等(受限于系统资源)非常高(可达到数百万)
调度方式操作系统调度操作系统调度事件循环调度
内存共享独立内存空间共享进程内存事件循环中共享
通信方式IPC(管道、消息队列等)共享变量、锁、信号量等事件循环中的消息传递
适用场景计算密集型任务I/O密集型和计算密集型任务I/O密集型任务
优点独立内存空间,避免线程安全问题轻量级,共享内存,通信方便轻量级,高效的并发处理能力
缺点创建和切换开销大线程安全问题,需要处理锁和同步机制依赖事件循环,不适合计算密集型任务

概念

  • 暂停(Suspend):协程可以在执行过程中暂停,等待某些条件满足后再继续执行。这通常是通过await关键字实现的。
  • 恢复(Resume):当等待的条件满足后,协程可以从上次暂停的地方继续执行。
  • 事件循环(Event Loop):事件循环是协程运行的基础,负责调度和管理协程的执行。它会检查哪些协程已经准备好运行,并将它们放入队列中等待执行。

工作过程

‌1. 协程的启动与暂停

  • 入口点‌:通过调用协程函数(如 async def 或生成器函数)创建协程对象,但不会立即执行代码。
  • ‌暂停点‌:遇到 yield(生成器)或 await(异步协程)时,协程保存当前状态(局部变量、程序计数器等)并暂停执行,将控制权交还给调用者或事件循环。

2. 协程的恢复与调度

  • 恢复条件‌:由外部调用 send()(生成器)或事件循环检测到 await 的异步操作完成(如I/O就绪)时触发恢复。
  • ‌调度机制‌: ‌- 用户态调度‌:协程的切换由程序员或框架(如 asyncio)显式控制,无需操作系统介入。 ‌- 事件循环驱动‌:在异步编程中,事件循环监控协程状态,当I/O操作完成时自动恢复关联协程。

‌3. 协程的终止

  • 自然结束‌:协程函数执行到 return 或末尾时自动终止。
  • ‌强制终止‌:可通过显式关闭(如生成器的 close())或取消任务(如 asyncio.Task.cancel())提前结束
 
# 定义消费者函数,这是一个生成器函数
def consumer():
    r = ''  # 初始化返回值为空字符串
    while True:
        n = yield r  # 暂停生成器,等待外部发送数据,并返回当前的r值
        if not n:  # 如果接收到的数据为空(None或0),终止生成器
            return
        print('[CONSUMER] Consuming %s...' % n)  # 打印消费的数据
        r = '200 OK'  # 设置返回值,将在下一次yield时返回给生产者
 
# 定义生产者函数,负责向消费者发送数据
def produce(c):
    c.send(None)  # 启动生成器,第一次调用send时必须传入None。c.send(None)的作用是启动生成器,使其执行到第一个yield表达式并暂停。这是生成器第一次调用send时的必要步骤,避免了TypeError,并确保生成器正确初始化。
    n = 0  # 初始化生产者计数器
    while n < 5:  # 生产者循环,生产5个数据
        n = n + 1  # 生产一个新的数据
        print('[PRODUCER] Producing %s...' % n)  # 打印生产的数据
        r = c.send(n)  # 向消费者发送数据,并接收消费者返回的值
        print('[PRODUCER] Consumer return: %s' % r)  # 打印消费者返回的值
    c.close()  # 生产者完成生产后,关闭消费者生成器
 
# 创建消费者生成器
c = consumer()
# 调用生产者函数,传入消费者生成器
produce(c)
 

asyncio

async def

定义异步函数,这种函数返回一个 coroutine 对象。

import asyncio
 
async def async_function():
    print("Hello")
    await asyncio.sleep(1)  # 模拟异步操作
    print("World")
 
# 调用异步函数
asyncio.run(async_function())
 

await

await 用于等待一个 coroutine 对象完成。它只能在 async def 定义的异步函数中使用。

import asyncio
 
async def async_task():
    print("Task started")
    await asyncio.sleep(2)  # 模拟异步操作
    print("Task finished")
    return "Done"
 
async def main():
    result = await async_task()  # 等待异步任务完成
    print(result)
 
asyncio.run(main())
 
# Task started
# (等待2秒)
# Task finished
# Done
 

asyncio.run()

启动异步事件循环 asyncio.run() 是 Python 3.7+ 提供的高级接口,用于启动异步事件循环并运行主函数。

import asyncio
 
async def main():
    print("Hello")
    await asyncio.sleep(1)
    print("World")
 
asyncio.run(main())
 

asyncio.create_task()

asyncio.create_task() 用于创建一个任务(Task),并将其加入事件循环中。

import asyncio
 
async def async_task():
    print("Task started")
    await asyncio.sleep(2)
    print("Task finished")
    return "Done"
 
async def main():
    task = asyncio.create_task(async_task())  # 创建任务
    print("Main function is running")
    result = await task  # 等待任务完成
    print(result)
 
asyncio.run(main())
 

asyncio.gather()

asyncio.gather() 用于并发运行多个任务,并等待它们全部完成。

import asyncio
 
async def async_task(name, delay):
    print(f"Task {name} started")
    await asyncio.sleep(delay)
    print(f"Task {name} finished")
    return f"Task {name} done"
 
async def main():
    tasks = [
        async_task("A", 2),
        async_task("B", 3),
        async_task("C", 1)
    ]
    results = await asyncio.gather(*tasks)  # 并发运行多个任务
    print(results)
 
asyncio.run(main())
 

asyncio.sleep()

asyncio.sleep() 用于模拟异步延迟,它不会阻塞事件循环。

import asyncio
 
async def main():
    print("Hello")
    await asyncio.sleep(2)  # 模拟异步延迟
    print("World")
 
asyncio.run(main())
 

asyncio.wait()

asyncio.wait() 用于等待多个任务完成,可以指定等待的条件。

import asyncio
 
async def async_task(name, delay):
    print(f"Task {name} started")
    await asyncio.sleep(delay)
    print(f"Task {name} finished")
    return f"Task {name} done"
 
async def main():
    tasks = [
        async_task("A", 2),
        async_task("B", 3),
        async_task("C", 1)
    ]
    done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
    for task in done:
        print(task.result())
 
asyncio.run(main())
 

asyncio.as_completed()

asyncio.as_completed() 用于按任务完成的顺序获取结果。

import asyncio
 
async def async_task(name, delay):
    print(f"Task {name} started")
    await asyncio.sleep(delay)
    print(f"Task {name} finished")
    return f"Task {name} done"
 
async def main():
    tasks = [
        async_task("A", 2),
        async_task("B", 3),
        async_task("C", 1)
    ]
    for task in asyncio.as_completed(tasks):
        result = await task
        print(result)
 
asyncio.run(main())

asyncio.Lock()

asyncio.Lock() 用于在异步代码中实现线程安全的锁。

import asyncio
 
async def async_task(lock, name, delay):
    async with lock:
        print(f"Task {name} started")
        await asyncio.sleep(delay)
        print(f"Task {name} finished")
 
async def main():
    lock = asyncio.Lock()
    tasks = [
        async_task(lock, "A", 2),
        async_task(lock, "B", 1)
    ]
    await asyncio.gather(*tasks)
 
asyncio.run(main())
 

asyncio.Queue()

asyncio.Queue() 用于在异步代码中实现线程安全的队列。

import asyncio
 
async def producer(queue, name):
    for i in range(3):
        print(f"Producer {name} produced {i}")
        await queue.put(i)
        await asyncio.sleep(1)
 
async def consumer(queue, name):
    while True:
        item = await queue.get()
        print(f"Consumer {name} consumed {item}")
        await asyncio.sleep(1)
        queue.task_done()
 
async def main():
    queue = asyncio.Queue()
    producer_task = asyncio.create_task(producer(queue, "P1"))
    consumer_task = asyncio.create_task(consumer(queue, "C1"))
    await producer_task
    await queue.join()
    consumer_task.cancel()
 
asyncio.run(main())
 

aiohttp

异步打开文件

使用 aiofiles.open 方法异步打开文件,它与内置的 open 函数类似,但返回的是一个异步文件对象

import aiofiles
import asyncio
 
async def main():
    async with aiofiles.open('example.txt', mode='r') as f:
        contents = await f.read()
        print(contents)
 
asyncio.run(main())
 

异步读取文件

通过 read() 方法异步读取文件内容

import aiofiles
import asyncio
 
async def read_file():
    async with aiofiles.open('example.txt', mode='r') as f:
        contents = await f.read()
        print(contents)
 
asyncio.run(read_file())
 

异步逐行读取文件

可以使用异步迭代器逐行读取文件内容,适合处理大型文件。

 
import aiofiles
import asyncio
 
async def read_file_by_line():
    async with aiofiles.open('example.txt', mode='r') as f:
        async for line in f:
            print(line.strip())
 
asyncio.run(read_file_by_line())
 

异步写入文件

使用 write() 方法异步写入文件内容

import aiofiles
import asyncio
 
async def write_file():
    async with aiofiles.open('example.txt', mode='w') as f:
        await f.write('Hello, aiofiles!')
 
asyncio.run(write_file())
 

异步追加内容到文件

使用 'a' 模式打开文件,然后使用 write() 方法追加内容

 
import aiofiles
import asyncio
 
async def append_file():
    async with aiofiles.open('example.txt', mode='a') as f:
        await f.write('\nThis is an appended line.')
 
asyncio.run(append_file())
 

异步文件迭代器

异步文件迭代器可以逐行读取文件,适合逐行处理文件内容

import aiofiles
import asyncio
 
async def read_file_by_line():
    async with aiofiles.open('example.txt', mode='r') as f:
        async for line in f:
            print(line.strip())
 
asyncio.run(read_file_by_line())
 

异步文件操作的其他方法

aiofiles 还提供了其他文件操作方法,如 seek()、tell()、flush() 等

import aiofiles
import asyncio
 
async def file_operations():
    async with aiofiles.open('example.txt', mode='r+') as f:
        await f.write('New content')
        await f.seek(0)  # 移动文件指针到文件开头
        content = await f.read()
        print(content)
 
asyncio.run(file_operations())
 

异步操作临时文件

aiofiles.tempfile 模块提供了异步操作临时文件的功能

 
import aiofiles.tempfile
import asyncio
 
async def temp_file_example():
    async with aiofiles.tempfile.TemporaryFile('w+') as f:
        await f.write('Hello, temporary file!')
        await f.seek(0)
        content = await f.read()
        print(content)
 
asyncio.run(temp_file_example())
 

异步文件操作的 os 方法

aiofiles.os 模块提供了异步版本的 os 文件操作函数

import aiofiles.os
import asyncio
 
async def os_operations():
    await aiofiles.os.mkdir('test_dir')
    await aiofiles.os.rename('example.txt', 'test_dir/new_example.txt')
    await aiofiles.os.remove('test_dir/new_example.txt')
    await aiofiles.os.rmdir('test_dir')
 
asyncio.run(os_operations())
 

综合案例

异步基础

import asyncio
import time
 
async def basic_example():
    """
    基础的异步函数示例
    模拟一个异步操作,例如网络请求或文件读写。
    """
    print("开始执行")  # 打印任务开始的信息
    await asyncio.sleep(1)  # 模拟耗时1秒的异步I/O操作,如网络请求或文件读写
    print("执行完成")  # 打印任务完成的信息
 
async def multiple_tasks():
    """
    并发执行多个任务
    创建多个异步任务并等待它们全部完成。
    """
    # 创建任务列表
    tasks = [
        asyncio.create_task(basic_example()),  # 创建一个异步任务
        asyncio.create_task(basic_example()),  # 创建第二个异步任务
        asyncio.create_task(basic_example())   # 创建第三个异步任务
    ]
    
    # 等待所有任务完成
    # asyncio.gather() 用于并发运行多个任务,并等待它们全部完成
    # 它会返回一个Future对象,该对象在所有任务完成时完成
    await asyncio.gather(*tasks)  # 使用 * 解包任务列表,将每个任务作为参数传递给 gather()
 
# 使用示例
start_time = time.time()  # 记录开始时间
asyncio.run(multiple_tasks())  # 启动事件循环并运行 multiple_tasks 函数
end_time = time.time()  # 记录结束时间
# 计算总耗时并打印结果,格式化为两位小数
print(f"总耗时:{end_time - start_time:.2f}秒")

异步爬虫并发下载多个页面

import aiohttp
import asyncio
from typing import List, Dict
 
async def fetch_url(session: aiohttp.ClientSession, url: str) -> Dict:
    """
    异步获取URL内容
    :param session: aiohttp.ClientSession 对象,用于发起 HTTP 请求
    :param url: 要请求的 URL
    :return: 一个字典,包含 URL、状态码和内容或错误信息
    """
    try:
        async with session.get(url) as response:  # 发起 GET 请求
            return {
                'url': url,  # 返回的字典中包含请求的 URL
                'status': response.status,  # 返回 HTTP 状态码
                'content': await response.text()  # 返回响应内容
            }
    except Exception as e:  # 捕获可能的异常
        return {
            'url': url,  # 返回的字典中包含请求的 URL
            'status': None,  # 状态码为 None,表示请求失败
            'error': str(e)  # 返回错误信息
        }
 
async def batch_fetch(urls: List[str]) -> List[Dict]:
    """
    批量获取多个URL内容
    :param urls: 一个包含多个 URL 的列表
    :return: 一个列表,包含每个 URL 的请求结果
    """
    # 设置连接池限制和超时
    timeout = aiohttp.ClientTimeout(total=30)  # 设置超时时间为 30 秒
    conn = aiohttp.TCPConnector(limit=5)  # 设置连接池大小为 5
 
    async with aiohttp.ClientSession(
        connector=conn,  # 使用自定义的连接池
        timeout=timeout  # 使用自定义的超时设置
    ) as session:
        tasks = [fetch_url(session, url) for url in urls]  # 创建任务列表
        return await asyncio.gather(*tasks)  # 并发运行所有任务并等待它们完成
 
# 使用示例
urls = [
    'https://api.example.com/data1',  # 示例 URL 1
    'https://api.example.com/data2',  # 示例 URL 2
    'https://api.example.com/data3'   # 示例 URL 3
]
 
async def main():
    """
    主函数,用于运行批量获取 URL 内容的逻辑
    """
    results = await batch_fetch(urls)  # 调用 batch_fetch 函数获取结果
    for result in results:  # 遍历结果列表
        print(f"URL: {result['url']}, Status: {result['status']}")  # 打印每个 URL 的状态码
 
# 运行主函数
asyncio.run(main())
 

高效处理文件

import aiofiles
import asyncio
from typing import List
 
async def async_read_file(filepath: str) -> str:
    """
    异步读取文件内容
    :param filepath: 文件路径
    :return: 文件内容
    """
    async with aiofiles.open(filepath, mode='r', encoding='utf-8') as f:
        # 使用 aiofiles.open 打开文件,模式为读取模式,编码为 UTF-8
        return await f.read()  # 异步读取文件内容并返回
 
async def async_write_file(filepath: str, content: str):
    """
    异步写入文件内容
    :param filepath: 文件路径
    :param content: 要写入的内容
    """
    async with aiofiles.open(filepath, mode='w', encoding='utf-8') as f:
        # 使用 aiofiles.open 打开文件,模式为写入模式,编码为 UTF-8
        await f.write(content)  # 异步写入内容
 
async def process_files(file_paths: List[str]):
    """
    并发处理多个文件
    :param file_paths: 文件路径列表
    """
    # 并发读取所有文件
    read_tasks = [async_read_file(fp) for fp in file_paths]  # 创建读取文件的任务列表
    contents = await asyncio.gather(*read_tasks)  # 并发运行所有读取任务并等待它们完成
 
    # 处理文件内容
    processed_contents = [
        content.upper()  # 示例:将内容转换为大写
        for content in contents
    ]
 
    # 并发写入处理后的内容
    write_tasks = [
        async_write_file(f"processed_{i}.txt", content)  # 创建写入文件的任务列表
        for i, content in enumerate(processed_contents)
    ]
    await asyncio.gather(*write_tasks)  # 并发运行所有写入任务并等待它们完成
 
# 使用示例
file_paths = ['file1.txt', 'file2.txt', 'file3.txt']  # 示例文件路径列表
asyncio.run(process_files(file_paths))  # 运行主函数

聊天服务器处理多用户消息

import asyncio
 
async def handle_client(reader, writer):
    while True:
        data = await reader.read(1024)  # 从客户端读取数据
        if not data:  # 如果没有数据,客户端已断开连接
            break
        message = data.decode()  # 解码数据为字符串
        response = f"Echo: {message}".encode()  # 构造回显消息
        writer.write(response)  # 写入回显消息
        await writer.drain()  # 确保数据发送完成
    writer.close()  # 关闭连接
 
async def run_server():
    server = await asyncio.start_server(handle_client, '0.0.0.0', 8888)  # 启动服务器
    async with server:
        await server.serve_forever()  # 服务器持续运行
 
# 运行服务器
asyncio.run(run_server())
 

异步数据库操作:提升数据处理效率

 
import asyncpg
import asyncio
from typing import List, Dict
 
class AsyncDatabase:
    """异步数据库操作封装"""
    
    def __init__(self, dsn: str):
        """
        初始化异步数据库操作类
        :param dsn: 数据库连接字符串
        """
        self.dsn = dsn  # 数据库连接字符串
        self.pool = None  # 数据库连接池,初始为 None
 
    async def connect(self):
        """
        创建数据库连接池
        如果连接池尚未创建,则创建一个新的连接池。
        """
        if not self.pool:  # 检查连接池是否已创建
            self.pool = await asyncpg.create_pool(
                self.dsn,  # 数据库连接字符串
                min_size=5,  # 连接池最小连接数
                max_size=20  # 连接池最大连接数
            )
 
    async def close(self):
        """
        关闭连接池
        如果连接池已创建,则关闭连接池。
        """
        if self.pool:  # 检查连接池是否存在
            await self.pool.close()  # 关闭连接池
 
    async def fetch_all(self, query: str, *args) -> List[Dict]:
        """
        执行查询并获取所有结果
        :param query: SQL 查询语句
        :param args: 查询参数
        :return: 查询结果列表,每个结果为一个字典
        """
        async with self.pool.acquire() as conn:  # 从连接池中获取一个连接
            results = await conn.fetch(query, *args)  # 执行查询并获取结果
            return [dict(row) for row in results]  # 将结果转换为字典列表
 
    async def execute(self, query: str, *args) -> str:
        """
        执行更新操作
        :param query: SQL 更新语句
        :param args: 更新参数
        :return: 执行结果
        """
        async with self.pool.acquire() as conn:  # 从连接池中获取一个连接
            return await conn.execute(query, *args)  # 执行更新操作
 
# 使用示例
async def main():
    """
    主函数,用于演示异步数据库操作
    """
    # 创建数据库连接
    db = AsyncDatabase('postgresql://user:pass@localhost/dbname')  # 创建数据库操作对象
    await db.connect()  # 建立数据库连接
 
    try:
        # 执行查询
        users = await db.fetch_all(
            'SELECT * FROM users WHERE age > $1',  # 查询语句
            18  # 查询参数
        )
        print(f"找到 {len(users)} 个用户")  # 打印查询结果数量
 
        # 批量插入
        tasks = [
            db.execute(
                'INSERT INTO logs (user_id, action) VALUES ($1, $2)',  # 插入语句
                user['id'],  # 用户 ID
                'login'  # 操作类型
            )
            for user in users  # 遍历查询结果
        ]
        await asyncio.gather(*tasks)  # 并发执行所有插入任务
 
    finally:
        await db.close()  # 确保关闭数据库连接
 
# 运行主函数
asyncio.run(main())
 

异步队列:生产者消费者模式

import asyncio
from asyncio import Queue
from typing import List
 
async def producer(queue: Queue, items: List):
    """
    生产者:将项目放入队列
    :param queue: asyncio.Queue 对象,用于存放项目
    :param items: 要生产的项目列表
    """
    for item in items:
        # 模拟生产过程,每次生产耗时 0.5 秒
        await asyncio.sleep(0.5)
        await queue.put(item)  # 将项目放入队列
        print(f"生产: {item}")
 
async def consumer(queue: Queue, name: str):
    """
    消费者:从队列中获取项目并处理
    :param queue: asyncio.Queue 对象,用于获取项目
    :param name: 消费者的名称
    """
    while True:
        try:
            # 获取项目,等待超时时间为 5 秒
            item = await asyncio.wait_for(queue.get(), timeout=5.0)
            
            # 模拟处理过程,每次处理耗时 1 秒
            await asyncio.sleep(1)
            print(f"消费者 {name} 处理: {item}")
            
            # 标记任务完成
            queue.task_done()
            
        except asyncio.TimeoutError:
            # 如果在超时时间内没有获取到项目,打印退出信息并退出循环
            print(f"消费者 {name} 退出")
            break
 
async def process_queue():
    """
    实现生产者-消费者模式
    创建一个队列,启动一个生产者和多个消费者,等待生产者完成并将队列清空。
    """
    # 创建队列,最大容量为 5
    queue = Queue(maxsize=5)
    
    # 创建生产者任务
    items = [f"项目_{i}" for i in range(10)]  # 生产 10 个项目
    producer_task = asyncio.create_task(producer(queue, items))
    
    # 创建多个消费者任务
    consumer_tasks = [
        asyncio.create_task(consumer(queue, f"消费者_{i}"))
        for i in range(3)  # 创建 3 个消费者
    ]
    
    # 等待生产者完成
    await producer_task
    
    # 等待队列清空
    await queue.join()
    
    # 取消所有消费者任务
    for task in consumer_tasks:
        task.cancel()
 
# 运行示例
asyncio.run(process_queue())
 

异步上下文管理器:资源管理更简单

 
from contextlib import asynccontextmanager
import asyncio
 
class AsyncResource:
    """异步资源类示例"""
    
    async def connect(self):
        """
        连接资源
        模拟资源连接过程,耗时1秒。
        """
        print("连接资源...")
        await asyncio.sleep(1)  # 模拟连接过程
    
    async def disconnect(self):
        """
        断开资源连接
        模拟资源断开过程,耗时1秒。
        """
        print("断开连接...")
        await asyncio.sleep(1)  # 模拟断开过程
    
    async def process(self, data):
        """
        处理数据
        模拟数据处理过程,耗时0.5秒。
        :param data: 要处理的数据
        """
        print(f"处理数据: {data}")
        await asyncio.sleep(0.5)  # 模拟处理过程
 
@asynccontextmanager
async def managed_resource():
    """
    异步上下文管理器
    管理资源的生命周期,确保资源在使用后正确释放。
    """
    resource = AsyncResource()  # 创建资源对象
    try:
        await resource.connect()  # 连接资源
        yield resource  # 将资源对象传递给上下文管理器中的代码块
    finally:
        await resource.disconnect()  # 确保资源在退出时断开连接
 
async def process_with_resource():
    """
    使用异步上下文管理器处理资源
    使用异步上下文管理器 `managed_resource` 来管理资源的生命周期。
    """
    async with managed_resource() as resource:  # 使用异步上下文管理器
        # 使用资源处理多个数据
        tasks = [
            resource.process(f"数据_{i}")  # 创建多个处理任务
            for i in range(5)
        ]
        await asyncio.gather(*tasks)  # 并发运行所有任务
 
# 使用示例
asyncio.run(process_with_resource())  # 运行主函数
 

异步迭代器:处理大数据集

 
import asyncio
from typing import AsyncIterator
 
class AsyncDataProcessor:
    """异步数据处理器"""
    
    def __init__(self, data_list):
        """
        初始化异步数据处理器
        :param data_list: 要处理的数据列表
        """
        self.data = data_list  # 存储要处理的数据列表
        self.index = 0  # 当前处理的索引位置
 
    def __aiter__(self):
        """
        返回异步迭代器对象
        """
        return self
 
    async def __anext__(self):
        """
        异步获取下一个数据项
        :return: 下一个数据项
        :raises StopAsyncIteration: 如果没有更多数据项
        """
        if self.index >= len(self.data):  # 检查是否已处理完所有数据
            raise StopAsyncIteration  # 如果处理完所有数据,抛出 StopAsyncIteration 异常
        
        # 模拟异步处理,耗时 0.1 秒
        await asyncio.sleep(0.1)
        value = self.data[self.index]  # 获取当前索引的数据项
        self.index += 1  # 更新索引位置
        return value  # 返回当前数据项
 
async def process_large_dataset(data_list: list):
    """
    处理大型数据集
    使用异步数据处理器处理数据列表。
    :param data_list: 要处理的数据列表
    """
    processor = AsyncDataProcessor(data_list)  # 创建异步数据处理器对象
    
    # 使用异步 for 循环处理数据
    async for item in processor:  # 遍历异步迭代器
        print(f"处理: {item}")  # 打印处理的数据项
 
# 使用示例
data = [f"项目_{i}" for i in range(10)]  # 创建一个包含 10 个项目的示例数据列表
asyncio.run(process_large_dataset(data))  # 运行主函数
 

异步信号量:控制并发数量

 
import asyncio
from asyncio import Semaphore
import aiohttp
 
async def fetch_with_semaphore(
    sem: Semaphore,
    session: aiohttp.ClientSession,
    url: str
):
    """
    使用信号量控制并发请求
    :param sem: asyncio.Semaphore 对象,用于控制并发数量
    :param session: aiohttp.ClientSession 对象,用于发起 HTTP 请求
    :param url: 要请求的 URL
    :return: 请求的结果
    """
    async with sem:  # 使用信号量控制并发
        async with session.get(url) as response:  # 发起 GET 请求
            return await response.text()  # 返回响应内容
 
async def main():
    """
    控制并发数量的示例
    使用信号量限制并发数量,避免对服务器造成过大压力。
    """
    # 创建信号量,限制并发数为5
    sem = Semaphore(5)
    
    # 准备URL列表
    urls = [f"https://api.example.com/data/{i}" for i in range(20)]
    
    async with aiohttp.ClientSession() as session:  # 创建 aiohttp 客户端会话
        # 创建所有任务
        tasks = [
            fetch_with_semaphore(sem, session, url)
            for url in urls
        ]
        
        # 执行所有任务
        results = await asyncio.gather(*tasks)  # 并发运行所有任务并等待它们完成
        print(f"完成 {len(results)} 个请求")  # 打印完成的请求数量
 
# 使用示例
asyncio.run(main())  # 运行主函数