协程
定义
协程(Coroutine)是一种特殊的函数,它允许在执行过程中暂停和恢复。与普通的函数调用不同,协程可以在执行到某个点时暂停,等待某些操作完成后再恢复执行。
这种特性使得协程非常适合处理异步操作,如I/O密集型任务(网络请求、文件读写等),因为它可以在等待I/O操作完成时释放执行权,让其他任务运行,从而提高程序的并发性和效率。 协程的核心概念
特性 | 进程 (Process) | 线程 (Thread) | 协程 (Coroutine) |
---|---|---|---|
定义 | 操作系统分配资源的基本单位 | 进程中的执行单元,共享进程资源 | 通过async def 定义的函数,可暂停 |
创建开销 | 大(创建新进程开销大) | 小(共享进程资源) | 非常小(轻量级) |
切换开销 | 大(上下文切换开销大) | 小(上下文切换开销小) | 非常小(几乎无开销) |
并发数量 | 低(受限于系统资源) | 中等(受限于系统资源) | 非常高(可达到数百万) |
调度方式 | 操作系统调度 | 操作系统调度 | 事件循环调度 |
内存共享 | 独立内存空间 | 共享进程内存 | 事件循环中共享 |
通信方式 | IPC(管道、消息队列等) | 共享变量、锁、信号量等 | 事件循环中的消息传递 |
适用场景 | 计算密集型任务 | I/O密集型和计算密集型任务 | I/O密集型任务 |
优点 | 独立内存空间,避免线程安全问题 | 轻量级,共享内存,通信方便 | 轻量级,高效的并发处理能力 |
缺点 | 创建和切换开销大 | 线程安全问题,需要处理锁和同步机制 | 依赖事件循环,不适合计算密集型任务 |
概念
- 暂停(Suspend):协程可以在执行过程中暂停,等待某些条件满足后再继续执行。这通常是通过await关键字实现的。
- 恢复(Resume):当等待的条件满足后,协程可以从上次暂停的地方继续执行。
- 事件循环(Event Loop):事件循环是协程运行的基础,负责调度和管理协程的执行。它会检查哪些协程已经准备好运行,并将它们放入队列中等待执行。
工作过程
1. 协程的启动与暂停
- 入口点:通过调用协程函数(如 async def 或生成器函数)创建协程对象,但不会立即执行代码。
- 暂停点:遇到 yield(生成器)或 await(异步协程)时,协程保存当前状态(局部变量、程序计数器等)并暂停执行,将控制权交还给调用者或事件循环。
2. 协程的恢复与调度
- 恢复条件:由外部调用 send()(生成器)或事件循环检测到 await 的异步操作完成(如I/O就绪)时触发恢复。
- 调度机制: - 用户态调度:协程的切换由程序员或框架(如 asyncio)显式控制,无需操作系统介入。 - 事件循环驱动:在异步编程中,事件循环监控协程状态,当I/O操作完成时自动恢复关联协程。
3. 协程的终止
- 自然结束:协程函数执行到 return 或末尾时自动终止。
- 强制终止:可通过显式关闭(如生成器的 close())或取消任务(如 asyncio.Task.cancel())提前结束
# 定义消费者函数,这是一个生成器函数
def consumer():
r = '' # 初始化返回值为空字符串
while True:
n = yield r # 暂停生成器,等待外部发送数据,并返回当前的r值
if not n: # 如果接收到的数据为空(None或0),终止生成器
return
print('[CONSUMER] Consuming %s...' % n) # 打印消费的数据
r = '200 OK' # 设置返回值,将在下一次yield时返回给生产者
# 定义生产者函数,负责向消费者发送数据
def produce(c):
c.send(None) # 启动生成器,第一次调用send时必须传入None。c.send(None)的作用是启动生成器,使其执行到第一个yield表达式并暂停。这是生成器第一次调用send时的必要步骤,避免了TypeError,并确保生成器正确初始化。
n = 0 # 初始化生产者计数器
while n < 5: # 生产者循环,生产5个数据
n = n + 1 # 生产一个新的数据
print('[PRODUCER] Producing %s...' % n) # 打印生产的数据
r = c.send(n) # 向消费者发送数据,并接收消费者返回的值
print('[PRODUCER] Consumer return: %s' % r) # 打印消费者返回的值
c.close() # 生产者完成生产后,关闭消费者生成器
# 创建消费者生成器
c = consumer()
# 调用生产者函数,传入消费者生成器
produce(c)
asyncio
async def
定义异步函数,这种函数返回一个 coroutine 对象。
import asyncio
async def async_function():
print("Hello")
await asyncio.sleep(1) # 模拟异步操作
print("World")
# 调用异步函数
asyncio.run(async_function())
await
await 用于等待一个 coroutine 对象完成。它只能在 async def 定义的异步函数中使用。
import asyncio
async def async_task():
print("Task started")
await asyncio.sleep(2) # 模拟异步操作
print("Task finished")
return "Done"
async def main():
result = await async_task() # 等待异步任务完成
print(result)
asyncio.run(main())
# Task started
# (等待2秒)
# Task finished
# Done
asyncio.run()
启动异步事件循环 asyncio.run() 是 Python 3.7+ 提供的高级接口,用于启动异步事件循环并运行主函数。
import asyncio
async def main():
print("Hello")
await asyncio.sleep(1)
print("World")
asyncio.run(main())
asyncio.create_task()
asyncio.create_task() 用于创建一个任务(Task),并将其加入事件循环中。
import asyncio
async def async_task():
print("Task started")
await asyncio.sleep(2)
print("Task finished")
return "Done"
async def main():
task = asyncio.create_task(async_task()) # 创建任务
print("Main function is running")
result = await task # 等待任务完成
print(result)
asyncio.run(main())
asyncio.gather()
asyncio.gather() 用于并发运行多个任务,并等待它们全部完成。
import asyncio
async def async_task(name, delay):
print(f"Task {name} started")
await asyncio.sleep(delay)
print(f"Task {name} finished")
return f"Task {name} done"
async def main():
tasks = [
async_task("A", 2),
async_task("B", 3),
async_task("C", 1)
]
results = await asyncio.gather(*tasks) # 并发运行多个任务
print(results)
asyncio.run(main())
asyncio.sleep()
asyncio.sleep() 用于模拟异步延迟,它不会阻塞事件循环。
import asyncio
async def main():
print("Hello")
await asyncio.sleep(2) # 模拟异步延迟
print("World")
asyncio.run(main())
asyncio.wait()
asyncio.wait() 用于等待多个任务完成,可以指定等待的条件。
import asyncio
async def async_task(name, delay):
print(f"Task {name} started")
await asyncio.sleep(delay)
print(f"Task {name} finished")
return f"Task {name} done"
async def main():
tasks = [
async_task("A", 2),
async_task("B", 3),
async_task("C", 1)
]
done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
for task in done:
print(task.result())
asyncio.run(main())
asyncio.as_completed()
asyncio.as_completed() 用于按任务完成的顺序获取结果。
import asyncio
async def async_task(name, delay):
print(f"Task {name} started")
await asyncio.sleep(delay)
print(f"Task {name} finished")
return f"Task {name} done"
async def main():
tasks = [
async_task("A", 2),
async_task("B", 3),
async_task("C", 1)
]
for task in asyncio.as_completed(tasks):
result = await task
print(result)
asyncio.run(main())
asyncio.Lock()
asyncio.Lock() 用于在异步代码中实现线程安全的锁。
import asyncio
async def async_task(lock, name, delay):
async with lock:
print(f"Task {name} started")
await asyncio.sleep(delay)
print(f"Task {name} finished")
async def main():
lock = asyncio.Lock()
tasks = [
async_task(lock, "A", 2),
async_task(lock, "B", 1)
]
await asyncio.gather(*tasks)
asyncio.run(main())
asyncio.Queue()
asyncio.Queue() 用于在异步代码中实现线程安全的队列。
import asyncio
async def producer(queue, name):
for i in range(3):
print(f"Producer {name} produced {i}")
await queue.put(i)
await asyncio.sleep(1)
async def consumer(queue, name):
while True:
item = await queue.get()
print(f"Consumer {name} consumed {item}")
await asyncio.sleep(1)
queue.task_done()
async def main():
queue = asyncio.Queue()
producer_task = asyncio.create_task(producer(queue, "P1"))
consumer_task = asyncio.create_task(consumer(queue, "C1"))
await producer_task
await queue.join()
consumer_task.cancel()
asyncio.run(main())
aiohttp
异步打开文件
使用 aiofiles.open 方法异步打开文件,它与内置的 open 函数类似,但返回的是一个异步文件对象
import aiofiles
import asyncio
async def main():
async with aiofiles.open('example.txt', mode='r') as f:
contents = await f.read()
print(contents)
asyncio.run(main())
异步读取文件
通过 read() 方法异步读取文件内容
import aiofiles
import asyncio
async def read_file():
async with aiofiles.open('example.txt', mode='r') as f:
contents = await f.read()
print(contents)
asyncio.run(read_file())
异步逐行读取文件
可以使用异步迭代器逐行读取文件内容,适合处理大型文件。
import aiofiles
import asyncio
async def read_file_by_line():
async with aiofiles.open('example.txt', mode='r') as f:
async for line in f:
print(line.strip())
asyncio.run(read_file_by_line())
异步写入文件
使用 write() 方法异步写入文件内容
import aiofiles
import asyncio
async def write_file():
async with aiofiles.open('example.txt', mode='w') as f:
await f.write('Hello, aiofiles!')
asyncio.run(write_file())
异步追加内容到文件
使用 'a' 模式打开文件,然后使用 write() 方法追加内容
import aiofiles
import asyncio
async def append_file():
async with aiofiles.open('example.txt', mode='a') as f:
await f.write('\nThis is an appended line.')
asyncio.run(append_file())
异步文件迭代器
异步文件迭代器可以逐行读取文件,适合逐行处理文件内容
import aiofiles
import asyncio
async def read_file_by_line():
async with aiofiles.open('example.txt', mode='r') as f:
async for line in f:
print(line.strip())
asyncio.run(read_file_by_line())
异步文件操作的其他方法
aiofiles 还提供了其他文件操作方法,如 seek()、tell()、flush() 等
import aiofiles
import asyncio
async def file_operations():
async with aiofiles.open('example.txt', mode='r+') as f:
await f.write('New content')
await f.seek(0) # 移动文件指针到文件开头
content = await f.read()
print(content)
asyncio.run(file_operations())
异步操作临时文件
aiofiles.tempfile 模块提供了异步操作临时文件的功能
import aiofiles.tempfile
import asyncio
async def temp_file_example():
async with aiofiles.tempfile.TemporaryFile('w+') as f:
await f.write('Hello, temporary file!')
await f.seek(0)
content = await f.read()
print(content)
asyncio.run(temp_file_example())
异步文件操作的 os 方法
aiofiles.os 模块提供了异步版本的 os 文件操作函数
import aiofiles.os
import asyncio
async def os_operations():
await aiofiles.os.mkdir('test_dir')
await aiofiles.os.rename('example.txt', 'test_dir/new_example.txt')
await aiofiles.os.remove('test_dir/new_example.txt')
await aiofiles.os.rmdir('test_dir')
asyncio.run(os_operations())
综合案例
异步基础
import asyncio
import time
async def basic_example():
"""
基础的异步函数示例
模拟一个异步操作,例如网络请求或文件读写。
"""
print("开始执行") # 打印任务开始的信息
await asyncio.sleep(1) # 模拟耗时1秒的异步I/O操作,如网络请求或文件读写
print("执行完成") # 打印任务完成的信息
async def multiple_tasks():
"""
并发执行多个任务
创建多个异步任务并等待它们全部完成。
"""
# 创建任务列表
tasks = [
asyncio.create_task(basic_example()), # 创建一个异步任务
asyncio.create_task(basic_example()), # 创建第二个异步任务
asyncio.create_task(basic_example()) # 创建第三个异步任务
]
# 等待所有任务完成
# asyncio.gather() 用于并发运行多个任务,并等待它们全部完成
# 它会返回一个Future对象,该对象在所有任务完成时完成
await asyncio.gather(*tasks) # 使用 * 解包任务列表,将每个任务作为参数传递给 gather()
# 使用示例
start_time = time.time() # 记录开始时间
asyncio.run(multiple_tasks()) # 启动事件循环并运行 multiple_tasks 函数
end_time = time.time() # 记录结束时间
# 计算总耗时并打印结果,格式化为两位小数
print(f"总耗时:{end_time - start_time:.2f}秒")
异步爬虫并发下载多个页面
import aiohttp
import asyncio
from typing import List, Dict
async def fetch_url(session: aiohttp.ClientSession, url: str) -> Dict:
"""
异步获取URL内容
:param session: aiohttp.ClientSession 对象,用于发起 HTTP 请求
:param url: 要请求的 URL
:return: 一个字典,包含 URL、状态码和内容或错误信息
"""
try:
async with session.get(url) as response: # 发起 GET 请求
return {
'url': url, # 返回的字典中包含请求的 URL
'status': response.status, # 返回 HTTP 状态码
'content': await response.text() # 返回响应内容
}
except Exception as e: # 捕获可能的异常
return {
'url': url, # 返回的字典中包含请求的 URL
'status': None, # 状态码为 None,表示请求失败
'error': str(e) # 返回错误信息
}
async def batch_fetch(urls: List[str]) -> List[Dict]:
"""
批量获取多个URL内容
:param urls: 一个包含多个 URL 的列表
:return: 一个列表,包含每个 URL 的请求结果
"""
# 设置连接池限制和超时
timeout = aiohttp.ClientTimeout(total=30) # 设置超时时间为 30 秒
conn = aiohttp.TCPConnector(limit=5) # 设置连接池大小为 5
async with aiohttp.ClientSession(
connector=conn, # 使用自定义的连接池
timeout=timeout # 使用自定义的超时设置
) as session:
tasks = [fetch_url(session, url) for url in urls] # 创建任务列表
return await asyncio.gather(*tasks) # 并发运行所有任务并等待它们完成
# 使用示例
urls = [
'https://api.example.com/data1', # 示例 URL 1
'https://api.example.com/data2', # 示例 URL 2
'https://api.example.com/data3' # 示例 URL 3
]
async def main():
"""
主函数,用于运行批量获取 URL 内容的逻辑
"""
results = await batch_fetch(urls) # 调用 batch_fetch 函数获取结果
for result in results: # 遍历结果列表
print(f"URL: {result['url']}, Status: {result['status']}") # 打印每个 URL 的状态码
# 运行主函数
asyncio.run(main())
高效处理文件
import aiofiles
import asyncio
from typing import List
async def async_read_file(filepath: str) -> str:
"""
异步读取文件内容
:param filepath: 文件路径
:return: 文件内容
"""
async with aiofiles.open(filepath, mode='r', encoding='utf-8') as f:
# 使用 aiofiles.open 打开文件,模式为读取模式,编码为 UTF-8
return await f.read() # 异步读取文件内容并返回
async def async_write_file(filepath: str, content: str):
"""
异步写入文件内容
:param filepath: 文件路径
:param content: 要写入的内容
"""
async with aiofiles.open(filepath, mode='w', encoding='utf-8') as f:
# 使用 aiofiles.open 打开文件,模式为写入模式,编码为 UTF-8
await f.write(content) # 异步写入内容
async def process_files(file_paths: List[str]):
"""
并发处理多个文件
:param file_paths: 文件路径列表
"""
# 并发读取所有文件
read_tasks = [async_read_file(fp) for fp in file_paths] # 创建读取文件的任务列表
contents = await asyncio.gather(*read_tasks) # 并发运行所有读取任务并等待它们完成
# 处理文件内容
processed_contents = [
content.upper() # 示例:将内容转换为大写
for content in contents
]
# 并发写入处理后的内容
write_tasks = [
async_write_file(f"processed_{i}.txt", content) # 创建写入文件的任务列表
for i, content in enumerate(processed_contents)
]
await asyncio.gather(*write_tasks) # 并发运行所有写入任务并等待它们完成
# 使用示例
file_paths = ['file1.txt', 'file2.txt', 'file3.txt'] # 示例文件路径列表
asyncio.run(process_files(file_paths)) # 运行主函数
聊天服务器处理多用户消息
import asyncio
async def handle_client(reader, writer):
while True:
data = await reader.read(1024) # 从客户端读取数据
if not data: # 如果没有数据,客户端已断开连接
break
message = data.decode() # 解码数据为字符串
response = f"Echo: {message}".encode() # 构造回显消息
writer.write(response) # 写入回显消息
await writer.drain() # 确保数据发送完成
writer.close() # 关闭连接
async def run_server():
server = await asyncio.start_server(handle_client, '0.0.0.0', 8888) # 启动服务器
async with server:
await server.serve_forever() # 服务器持续运行
# 运行服务器
asyncio.run(run_server())
异步数据库操作:提升数据处理效率
import asyncpg
import asyncio
from typing import List, Dict
class AsyncDatabase:
"""异步数据库操作封装"""
def __init__(self, dsn: str):
"""
初始化异步数据库操作类
:param dsn: 数据库连接字符串
"""
self.dsn = dsn # 数据库连接字符串
self.pool = None # 数据库连接池,初始为 None
async def connect(self):
"""
创建数据库连接池
如果连接池尚未创建,则创建一个新的连接池。
"""
if not self.pool: # 检查连接池是否已创建
self.pool = await asyncpg.create_pool(
self.dsn, # 数据库连接字符串
min_size=5, # 连接池最小连接数
max_size=20 # 连接池最大连接数
)
async def close(self):
"""
关闭连接池
如果连接池已创建,则关闭连接池。
"""
if self.pool: # 检查连接池是否存在
await self.pool.close() # 关闭连接池
async def fetch_all(self, query: str, *args) -> List[Dict]:
"""
执行查询并获取所有结果
:param query: SQL 查询语句
:param args: 查询参数
:return: 查询结果列表,每个结果为一个字典
"""
async with self.pool.acquire() as conn: # 从连接池中获取一个连接
results = await conn.fetch(query, *args) # 执行查询并获取结果
return [dict(row) for row in results] # 将结果转换为字典列表
async def execute(self, query: str, *args) -> str:
"""
执行更新操作
:param query: SQL 更新语句
:param args: 更新参数
:return: 执行结果
"""
async with self.pool.acquire() as conn: # 从连接池中获取一个连接
return await conn.execute(query, *args) # 执行更新操作
# 使用示例
async def main():
"""
主函数,用于演示异步数据库操作
"""
# 创建数据库连接
db = AsyncDatabase('postgresql://user:pass@localhost/dbname') # 创建数据库操作对象
await db.connect() # 建立数据库连接
try:
# 执行查询
users = await db.fetch_all(
'SELECT * FROM users WHERE age > $1', # 查询语句
18 # 查询参数
)
print(f"找到 {len(users)} 个用户") # 打印查询结果数量
# 批量插入
tasks = [
db.execute(
'INSERT INTO logs (user_id, action) VALUES ($1, $2)', # 插入语句
user['id'], # 用户 ID
'login' # 操作类型
)
for user in users # 遍历查询结果
]
await asyncio.gather(*tasks) # 并发执行所有插入任务
finally:
await db.close() # 确保关闭数据库连接
# 运行主函数
asyncio.run(main())
异步队列:生产者消费者模式
import asyncio
from asyncio import Queue
from typing import List
async def producer(queue: Queue, items: List):
"""
生产者:将项目放入队列
:param queue: asyncio.Queue 对象,用于存放项目
:param items: 要生产的项目列表
"""
for item in items:
# 模拟生产过程,每次生产耗时 0.5 秒
await asyncio.sleep(0.5)
await queue.put(item) # 将项目放入队列
print(f"生产: {item}")
async def consumer(queue: Queue, name: str):
"""
消费者:从队列中获取项目并处理
:param queue: asyncio.Queue 对象,用于获取项目
:param name: 消费者的名称
"""
while True:
try:
# 获取项目,等待超时时间为 5 秒
item = await asyncio.wait_for(queue.get(), timeout=5.0)
# 模拟处理过程,每次处理耗时 1 秒
await asyncio.sleep(1)
print(f"消费者 {name} 处理: {item}")
# 标记任务完成
queue.task_done()
except asyncio.TimeoutError:
# 如果在超时时间内没有获取到项目,打印退出信息并退出循环
print(f"消费者 {name} 退出")
break
async def process_queue():
"""
实现生产者-消费者模式
创建一个队列,启动一个生产者和多个消费者,等待生产者完成并将队列清空。
"""
# 创建队列,最大容量为 5
queue = Queue(maxsize=5)
# 创建生产者任务
items = [f"项目_{i}" for i in range(10)] # 生产 10 个项目
producer_task = asyncio.create_task(producer(queue, items))
# 创建多个消费者任务
consumer_tasks = [
asyncio.create_task(consumer(queue, f"消费者_{i}"))
for i in range(3) # 创建 3 个消费者
]
# 等待生产者完成
await producer_task
# 等待队列清空
await queue.join()
# 取消所有消费者任务
for task in consumer_tasks:
task.cancel()
# 运行示例
asyncio.run(process_queue())
异步上下文管理器:资源管理更简单
from contextlib import asynccontextmanager
import asyncio
class AsyncResource:
"""异步资源类示例"""
async def connect(self):
"""
连接资源
模拟资源连接过程,耗时1秒。
"""
print("连接资源...")
await asyncio.sleep(1) # 模拟连接过程
async def disconnect(self):
"""
断开资源连接
模拟资源断开过程,耗时1秒。
"""
print("断开连接...")
await asyncio.sleep(1) # 模拟断开过程
async def process(self, data):
"""
处理数据
模拟数据处理过程,耗时0.5秒。
:param data: 要处理的数据
"""
print(f"处理数据: {data}")
await asyncio.sleep(0.5) # 模拟处理过程
@asynccontextmanager
async def managed_resource():
"""
异步上下文管理器
管理资源的生命周期,确保资源在使用后正确释放。
"""
resource = AsyncResource() # 创建资源对象
try:
await resource.connect() # 连接资源
yield resource # 将资源对象传递给上下文管理器中的代码块
finally:
await resource.disconnect() # 确保资源在退出时断开连接
async def process_with_resource():
"""
使用异步上下文管理器处理资源
使用异步上下文管理器 `managed_resource` 来管理资源的生命周期。
"""
async with managed_resource() as resource: # 使用异步上下文管理器
# 使用资源处理多个数据
tasks = [
resource.process(f"数据_{i}") # 创建多个处理任务
for i in range(5)
]
await asyncio.gather(*tasks) # 并发运行所有任务
# 使用示例
asyncio.run(process_with_resource()) # 运行主函数
异步迭代器:处理大数据集
import asyncio
from typing import AsyncIterator
class AsyncDataProcessor:
"""异步数据处理器"""
def __init__(self, data_list):
"""
初始化异步数据处理器
:param data_list: 要处理的数据列表
"""
self.data = data_list # 存储要处理的数据列表
self.index = 0 # 当前处理的索引位置
def __aiter__(self):
"""
返回异步迭代器对象
"""
return self
async def __anext__(self):
"""
异步获取下一个数据项
:return: 下一个数据项
:raises StopAsyncIteration: 如果没有更多数据项
"""
if self.index >= len(self.data): # 检查是否已处理完所有数据
raise StopAsyncIteration # 如果处理完所有数据,抛出 StopAsyncIteration 异常
# 模拟异步处理,耗时 0.1 秒
await asyncio.sleep(0.1)
value = self.data[self.index] # 获取当前索引的数据项
self.index += 1 # 更新索引位置
return value # 返回当前数据项
async def process_large_dataset(data_list: list):
"""
处理大型数据集
使用异步数据处理器处理数据列表。
:param data_list: 要处理的数据列表
"""
processor = AsyncDataProcessor(data_list) # 创建异步数据处理器对象
# 使用异步 for 循环处理数据
async for item in processor: # 遍历异步迭代器
print(f"处理: {item}") # 打印处理的数据项
# 使用示例
data = [f"项目_{i}" for i in range(10)] # 创建一个包含 10 个项目的示例数据列表
asyncio.run(process_large_dataset(data)) # 运行主函数
异步信号量:控制并发数量
import asyncio
from asyncio import Semaphore
import aiohttp
async def fetch_with_semaphore(
sem: Semaphore,
session: aiohttp.ClientSession,
url: str
):
"""
使用信号量控制并发请求
:param sem: asyncio.Semaphore 对象,用于控制并发数量
:param session: aiohttp.ClientSession 对象,用于发起 HTTP 请求
:param url: 要请求的 URL
:return: 请求的结果
"""
async with sem: # 使用信号量控制并发
async with session.get(url) as response: # 发起 GET 请求
return await response.text() # 返回响应内容
async def main():
"""
控制并发数量的示例
使用信号量限制并发数量,避免对服务器造成过大压力。
"""
# 创建信号量,限制并发数为5
sem = Semaphore(5)
# 准备URL列表
urls = [f"https://api.example.com/data/{i}" for i in range(20)]
async with aiohttp.ClientSession() as session: # 创建 aiohttp 客户端会话
# 创建所有任务
tasks = [
fetch_with_semaphore(sem, session, url)
for url in urls
]
# 执行所有任务
results = await asyncio.gather(*tasks) # 并发运行所有任务并等待它们完成
print(f"完成 {len(results)} 个请求") # 打印完成的请求数量
# 使用示例
asyncio.run(main()) # 运行主函数