- Published on
3.14.异步编程
- Authors

- Name
- xiaobai
1.概述
异步编程是一种非阻塞的编程模式,允许程序在等待I/O操作时执行其他任务,而不是阻塞等待。Python 通过 asyncio 模块提供了强大的异步编程支持。
2.核心概念
2.1.同步 vs 异步
| 特性 | 同步编程 | 异步编程 |
|---|---|---|
| 执行方式 | 按顺序执行,遇到耗时操作会阻塞 | 非阻塞,遇到耗时操作会切换到其他任务 |
| 适用场景 | CPU密集型任务,简单I/O操作 | I/O密集型任务,大量并发操作 |
| 复杂度 | 简单易懂 | 相对复杂,需要理解协程和事件循环 |
| 性能 | 在I/O密集型任务中效率较低 | 在I/O密集型任务中效率显著提升 |
2.2.生活中的例子
- 同步:你去餐馆点菜,一直站在柜台等厨师做好饭菜拿给你,做好你才能走
- 异步:你点完菜后在座位上玩手机或聊天,等菜做好了服务员叫你,这时你再去取餐或者服务员直接送过来
2.3.性能对比示例
import time
import asyncio
# 同步版本
def sync_demo():
def task(name, delay):
print(f"{name} 开始")
time.sleep(delay) # 阻塞调用
print(f"{name} 完成")
start = time.time()
task("任务1", 2)
task("任务2", 1)
print(f"总耗时: {time.time() - start:.2f}秒")
# 异步版本
async def async_demo():
async def task(name, delay):
print(f"{name} 开始")
await asyncio.sleep(delay) # 非阻塞等待
print(f"{name} 完成")
start = time.time()
await asyncio.gather(
task("任务1", 2),
task("任务2", 1)
)
print(f"总耗时: {time.time() - start:.2f}秒")
# 运行对比
print("=== 同步版本 ===")
sync_demo() # 耗时约3秒
print("\n=== 异步版本 ===")
asyncio.run(async_demo()) # 耗时约2秒
3.asyncio 核心概念
3.1.协程 (Coroutine)
协程是 Python 实现异步编程的核心基础。它是一种可以在执行过程中被挂起和恢复的特殊函数。
3.1.1.协程的定义
async def my_coroutine():
print("协程开始执行")
await asyncio.sleep(1)
print("协程结束")
return "协程结果"
3.1.2.协程的运行方式
协程本身只是一个对象,必须由事件循环调度才能运行:
import asyncio
async def simple_coroutine():
print("协程开始")
await asyncio.sleep(1)
print("协程结束")
return "协程结果"
async def coroutine_usage():
# 1. 直接等待
print("=== 直接等待 ===")
result = await simple_coroutine()
print(f"结果: {result}")
# 2. 创建任务
print("\n=== 创建任务 ===")
task = asyncio.create_task(simple_coroutine())
result = await task
print(f"任务结果: {result}")
# 3. 并发执行
print("\n=== 并发执行 ===")
tasks = [
simple_coroutine(),
simple_coroutine(),
simple_coroutine()
]
results = await asyncio.gather(*tasks)
print(f"所有结果: {results}")
asyncio.run(coroutine_usage())
3.1.3.协程与普通函数的区别
| 特性 | 普通函数 | 协程函数 |
|---|---|---|
| 定义 | def function_name(): | async def function_name(): |
| 调用 | 立即执行 | 返回协程对象,需要事件循环调度 |
| 返回值 | 直接返回结果 | 通过 await 获取结果 |
| 阻塞 | 会阻塞调用者 | 不会阻塞,可以挂起 |
3.2.事件循环 (Event Loop)
事件循环是异步编程的核心,负责调度和执行协程任务。
3.2.1.工作机制
- 事件循环驱动:事件循环是总调度中心,负责监控所有任务和事件
- 任务执行与挂起:协程任务在执行过程中,遇到I/O请求时会主动挂起,将控制权交还给事件循环
- 高效切换:事件循环立即切换到另一个已就绪的协程任务去执行
- 事件通知与唤醒:当I/O操作完成时,事件循环会收到通知,将对应的协程任务标记为"就绪"状态
- 从断点恢复:事件循环在适当的时机重新唤醒等待I/O完成的任务,让它从挂起的地方继续执行
3.2.2.事件循环操作
import asyncio
# 获取事件循环
loop = asyncio.get_event_loop()
print(f"事件循环: {loop}")
print(f"循环是否运行: {loop.is_running()}")
print(f"循环是否关闭: {loop.is_closed()}")
# 手动管理事件循环
async def simple_task():
await asyncio.sleep(1)
return "任务完成"
# 方式1:手动管理
loop = asyncio.get_event_loop()
try:
if not loop.is_running():
result = loop.run_until_complete(simple_task())
print(f"手动执行结果: {result}")
finally:
loop.close()
# 方式2:使用 asyncio.run() (推荐)
result = asyncio.run(simple_task())
print(result)
4.异步编程核心语法
4.1.async/await 关键字
import asyncio
class AsyncExamples:
"""异步编程示例"""
async def io_operation(self, name, delay):
"""模拟I/O操作"""
print(f"{name}: 开始I/O操作,需要{delay}秒")
await asyncio.sleep(delay)
print(f"{name}: I/O操作完成")
return f"{name}_result"
async def cpu_operation(self, name, iterations):
"""模拟CPU密集型操作"""
print(f"{name}: 开始CPU计算")
result = 0
for i in range(iterations):
result += i
print(f"{name}: CPU计算完成")
return result
async def concurrent_operations(self):
"""并发操作示例"""
tasks = [
self.io_operation(f"任务{i}", i)
for i in range(1, 4)
]
print("=== 并发I/O操作 ===")
results = await asyncio.gather(*tasks)
print(f"并发结果: {results}")
# 使用示例
examples = AsyncExamples()
asyncio.run(examples.concurrent_operations())
4.2.异步上下文管理器
异步上下文管理器用于异步资源的获取与释放。
4.2.1.基本语法
import asyncio
class AsyncDatabaseConnection:
async def __aenter__(self):
print("建立数据库连接...")
await asyncio.sleep(0.5)
print("数据库连接已建立")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("关闭数据库连接...")
await asyncio.sleep(0.2)
print("数据库连接已关闭")
return False
async def execute_query(self, query):
print(f"执行查询: {query}")
await asyncio.sleep(0.3)
return f"查询结果: {query}"
async def async_context():
async with AsyncDatabaseConnection() as db:
result = await db.execute_query("SELECT * FROM users")
print(result)
asyncio.run(async_context())
4.2.2.应用场景
- 数据库连接:在查询前建立连接,结束之后自动关闭
- 文件操作:异步打开与关闭文件,确保资源不会泄漏
- 网络会话:请求前建立异步会话,请求后自动关闭
4.3.异步迭代器
异步迭代器使得我们可以遍历异步生成的数据流。
4.3.1.基本实现
import asyncio
class AsyncDataStream:
"""异步数据流"""
def __init__(self, data_list, delay=0.5):
self.data = data_list
self.delay = delay
self.index = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.index >= len(self.data):
raise StopAsyncIteration
item = self.data[self.index]
self.index += 1
await asyncio.sleep(self.delay)
return item
async def async_iterator():
"""异步迭代器演示"""
stream = AsyncDataStream(["数据1", "数据2", "数据3", "数据4"], 0.3)
print("使用异步for循环:")
async for item in stream:
print(f"接收到: {item}")
print("\n使用异步推导式:")
stream2 = AsyncDataStream(["A", "B", "C", "D"], 0.2)
results = [item async for item in stream2]
print(f"所有数据: {results}")
asyncio.run(async_iterator())
4.3.2.适用场景
- 异步读取大文件的每一行
- 网络抓取数据流
- 处理异步产生的数据队列
- 异步推导式:
[item async for item in async_iterator]
5.任务和Future
5.1.任务管理
在异步编程中,任务(Task) 是对协程的进一步封装,它将协程排入事件循环等待执行,并提供了更丰富的管理与监控接口。
5.1.1.任务的基本操作
| 操作 | 方法 | 描述 |
|---|---|---|
| 创建任务 | asyncio.create_task(coroutine) | 将协程对象注入到事件循环,立即启动执行 |
| 等待任务 | await task | 等待任务完成并获取结果 |
| 取消任务 | task.cancel() | 取消进行中的任务 |
| 检查状态 | task.done() | 检查任务是否完成 |
| 获取结果 | task.result() | 获取任务结果(仅在任务完成后) |
5.1.2.任务管理示例
import asyncio
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
class TaskManager:
"""任务管理器"""
def __init__(self):
self.tasks = set()
async def worker(self, name, duration):
"""工作协程"""
logging.info(f"任务 {name} 开始")
try:
await asyncio.sleep(duration)
logging.info(f"任务 {name} 完成")
return f"{name}_result"
except asyncio.CancelledError:
logging.info(f"任务 {name} 被取消")
raise
async def create_tasks(self, count):
"""创建多个任务"""
for i in range(count):
task = asyncio.create_task(
self.worker(f"worker_{i}", i + 1),
name=f"task_{i}"
)
self.tasks.add(task)
task.add_done_callback(self.tasks.discard)
logging.info(f"创建了 {count} 个任务")
async def monitor_tasks(self):
"""监控任务状态"""
while True:
running_tasks = [t for t in self.tasks if not t.done()]
logging.info(f"运行中的任务: {len(running_tasks)}")
if not running_tasks:
break
await asyncio.sleep(0.5)
async def cancel_all_tasks(self):
"""取消所有任务"""
logging.info("取消所有任务")
for task in self.tasks:
task.cancel()
await asyncio.gather(*self.tasks, return_exceptions=True)
async def task_management():
"""任务管理演示"""
manager = TaskManager()
await manager.create_tasks(5)
monitor_task = asyncio.create_task(manager.monitor_tasks())
await asyncio.sleep(2)
await manager.cancel_all_tasks()
await monitor_task
asyncio.run(task_management())
5.2.Future 对象
Future 是一个底层对象,表示一个将来才会有结果的对象。Task 是 Future 的子类,具备 Future 的所有接口。
5.2.1.Future 的特性
- 可以通过
set_result()或set_exception()为 Future 设置结果或异常 - 可以通过
await future挂起,直到 Future 完成 - 这是构建异步回调和事件驱动编程的基础
5.2.2.Future 示例
import asyncio
async def future_example():
"""Future 对象示例"""
# 创建一个Future对象
future = asyncio.Future()
print(f"Future初始状态: {future.done()}")
async def set_future_result():
await asyncio.sleep(1)
if not future.done():
future.set_result("Future结果")
print("Future结果已设置")
async def use_future():
print("等待Future结果...")
result = await future
print(f"收到Future结果: {result}")
return result
# 并发执行设置和获取Future结果
results = await asyncio.gather(
set_future_result(),
use_future()
)
print(f"所有操作完成: {results}")
asyncio.run(future_example())
5.2.3.总结
- Future 是低层异步原语,是 Task 和协程运行的基础
- 如果只做高层业务,通常直接用
await协程和Task,无需手动操作 Future - 了解其原理有助于阅读、设计高级异步库和协议
6.高级异步模式
6.1.生产者-消费者模式
生产者-消费者模式是一种经典的并发设计模式,常用于解耦"数据的生产"和"数据的消费"过程。
6.1.1.核心思想
- 生产者:异步地生产数据,并通过
queue.put()投递到队列 - 消费者:异步地从队列用
queue.get()取数据进行处理 - 队列:连接生产者与消费者,起到缓冲区作用,可以限制队列大小,避免内存溢出
6.1.2.异步生产者-消费者优势
- 降低耦合:生产与消费速度可以不同步,互不阻塞
- 自动流控:队列满时生产者会等待,队列空时消费者会等待
- 易扩展:可以轻松扩展为多生产者/多消费者模式
6.1.3.实现示例
import asyncio
import random
class AsyncProducerConsumer:
"""异步生产者-消费者"""
def __init__(self, queue_size=5):
self.queue = asyncio.Queue(maxsize=queue_size)
self.producers = []
self.consumers = []
async def producer(self, name, count):
"""生产者"""
for i in range(count):
item = f"{name}_item_{i}"
production_time = random.uniform(0.1, 0.5)
await asyncio.sleep(production_time)
await self.queue.put(item)
print(f"生产者 {name} 生产了: {item} (队列大小: {self.queue.qsize()})")
print(f"生产者 {name} 完成")
async def consumer(self, name):
"""消费者"""
while True:
try:
item = await asyncio.wait_for(self.queue.get(), timeout=2.0)
consumption_time = random.uniform(0.2, 0.8)
await asyncio.sleep(consumption_time)
print(f"消费者 {name} 消费了: {item} (队列大小: {self.queue.qsize()})")
self.queue.task_done()
except asyncio.TimeoutError:
print(f"消费者 {name} 超时,退出")
break
async def run(self, producer_count=2, items_per_producer=5, consumer_count=3):
"""运行生产者-消费者系统"""
print("启动生产者-消费者系统...")
# 创建生产者任务
for i in range(producer_count):
producer_task = asyncio.create_task(
self.producer(f"P{i}", items_per_producer)
)
self.producers.append(producer_task)
# 创建消费者任务
for i in range(consumer_count):
consumer_task = asyncio.create_task(self.consumer(f"C{i}"))
self.consumers.append(consumer_task)
# 等待所有生产者完成
await asyncio.gather(*self.producers)
print("所有生产者已完成")
# 等待队列中所有项目被消费
await self.queue.join()
print("队列已清空")
# 取消所有消费者任务
for consumer in self.consumers:
consumer.cancel()
await asyncio.gather(*self.consumers, return_exceptions=True)
print("所有消费者已结束")
# 使用示例
system = AsyncProducerConsumer()
asyncio.run(system.run())
6.2.异步锁和信号量
在异步编程中,并发访问共享资源可能导致数据不一致或资源竞争问题。常用的同步原语包括异步锁和信号量。
6.2.1.异步锁(asyncio.Lock)
异步锁用于保护对共享资源的互斥访问,确保同一时刻只有一个协程可以进入临界区。
import asyncio
# 基本用法
lock = asyncio.Lock()
async def task_with_lock():
async with lock:
# 此处代码同一时刻只能有一个协程执行
print("task start")
await asyncio.sleep(1)
print("task end")
6.2.2.信号量(asyncio.Semaphore)
信号量允许"最多N个"协程同时访问某一资源,适用于连接池、限流器等场景。
import asyncio
# 创建最大允许2个协程同时运行的信号量
sem = asyncio.Semaphore(2)
async def limited_task():
async with sem:
print("task start")
await asyncio.sleep(1)
print("task end")
async def main():
tasks = [limited_task() for _ in range(5)]
await asyncio.gather(*tasks)
asyncio.run(main())
6.2.3.综合示例
import asyncio
class AsyncResourceManager:
"""异步资源管理"""
def __init__(self, max_concurrent=3):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.lock = asyncio.Lock()
self.shared_resource = 0
async def access_limited_resource(self, name):
"""访问受限制的资源"""
async with self.semaphore:
print(f"{name} 获得资源访问权")
await asyncio.sleep(1) # 模拟资源使用
print(f"{name} 释放资源访问权")
async def update_shared_resource(self, name, value):
"""更新共享资源(需要锁)"""
async with self.lock:
print(f"{name} 获得锁,当前资源值: {self.shared_resource}")
await asyncio.sleep(0.5) # 模拟处理时间
self.shared_resource += value
print(f"{name} 更新资源为: {self.shared_resource}")
print(f"{name} 释放锁")
async def concurrency_control():
"""并发控制演示"""
manager = AsyncResourceManager(max_concurrent=2)
# 测试信号量
print("=== 信号量测试 ===")
semaphore_tasks = [
manager.access_limited_resource(f"任务{i}")
for i in range(5)
]
await asyncio.gather(*semaphore_tasks)
# 测试锁
print("\n=== 锁测试 ===")
lock_tasks = [
manager.update_shared_resource(f"更新者{i}", i+1)
for i in range(3)
]
await asyncio.gather(*lock_tasks)
asyncio.run(concurrency_control())
7.实际应用场景
7.1.异步Web请求
# 导入asyncio模块,用于异步编程
import asyncio
# 导入aiohttp模块,用于异步HTTP请求
import aiohttp
# 导入time模块,用于计时
import time
# 定义异步Web客户端类
class AsyncWebClient:
# 声明类的用途为异步Web客户端
"""异步Web客户端"""
# 初始化方法
def __init__(self):
# 初始化session为None
self.session = None
# 定义异步上下文管理器进入方法
async def __aenter__(self):
# 创建aiohttp客户端会话
self.session = aiohttp.ClientSession()
# 返回自身
return self
# 定义异步上下文管理器退出方法
async def __aexit__(self, exc_type, exc_val, exc_tb):
# 关闭aiohttp客户端会话
await self.session.close()
# 定义异步方法,用于获取指定URL的内容
async def fetch_url(self, url, name):
# 获取URL内容
"""获取URL内容"""
# 打印请求开始信息
print(f"{name}: 开始请求 {url}")
try:
# 以异步方式发起GET请求,10秒超时
async with self.session.get(url, timeout=10) as response:
# 读取响应的文本内容
content = await response.text()
# 打印请求完成信息,包括状态码和内容长度
print(f"{name}: 完成,状态码 {response.status}, 长度 {len(content)}")
# 返回内容长度
return len(content)
except Exception as e:
# 捕获异常并打印错误信息
print(f"{name}: 错误 {e}")
# 出错时返回0
return 0
# 定义异步方法,用于并发请求多个URL
async def concurrent_requests(self, urls):
# 并发请求多个URL
"""并发请求多个URL"""
# 创建任务列表
tasks = []
# 遍历所有URL,创建对应的fetch_url任务
for i, url in enumerate(urls):
# 创建单个请求任务
task = self.fetch_url(url, f"请求{i+1}")
# 添加任务到任务列表
tasks.append(task)
# 打印并发请求开始信息
print("开始并发请求...")
# 记录开始时间
start_time = time.time()
# 并发执行所有异步请求,等待结果
results = await asyncio.gather(*tasks)
# 记录结束时间
end_time = time.time()
# 打印所有请求完成和耗时信息
print(f"所有请求完成,耗时: {end_time - start_time:.2f}秒")
# 打印所有任务的结果
print(f"结果: {results}")
# 返回结果列表
return results
# 定义异步函数,演示Web客户端的用法
async def web_client():
# Web客户端演示
"""Web客户端演示"""
# 准备待请求的URL列表
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
"https://httpbin.org/status/200",
"https://httpbin.org/status/404"
]
# 使用AsyncWebClient异步上下文管理器
async with AsyncWebClient() as client:
# 并发请求所有URL
await client.concurrent_requests(urls)
# 运行Web客户端演示
asyncio.run(web_client())
7.2.异步数据库操作
# 导入异步IO相关库
import asyncio
# 导入asyncpg模块,用于异步操作PostgreSQL数据库。需要预先安装:pip install asyncpg
import asyncpg # 需要安装: pip install asyncpg
# 定义异步数据库操作类
class AsyncDatabase:
# 提供类说明
"""异步数据库操作"""
# 类的初始化方法,设定连接字符串并初始化数据库连接池为None
def __init__(self, connection_string):
self.connection_string = connection_string
self.pool = None
# 定义异步的数据库连接方法
async def connect(self):
# 连接数据库,创建连接池
"""连接数据库"""
self.pool = await asyncpg.create_pool(self.connection_string)
# 输出连接池创建成功的信息
print("数据库连接池已创建")
# 定义异步关闭数据库连接池的方法
async def close(self):
# 关闭连接池(如已存在)
"""关闭连接"""
if self.pool:
await self.pool.close()
# 输出连接池关闭的信息
print("数据库连接池已关闭")
# 定义异步执行SQL查询的方法
async def execute_query(self, query, *args):
# 执行SQL查询,并返回结果
"""执行查询"""
# 从连接池获取连接,执行查询
async with self.pool.acquire() as connection:
result = await connection.fetch(query, *args)
return result
# 定义异步并发执行多条SQL查询的方法
async def concurrent_queries(self, queries):
# 并发执行多个查询
"""并发执行多个查询"""
# 存放任务的列表
tasks = []
# 遍历所有查询,准备每个任务
for i, (query, params) in enumerate(queries):
task = self.execute_query(query, *params)
tasks.append(task)
# 输出开始并发查询的提示信息
print("开始并发查询...")
# 并发执行所有任务,等待结果
results = await asyncio.gather(*tasks)
# 输出所有查询完成的信息
print(f"完成 {len(results)} 个查询")
# 返回所有结果
return results
# 定义一个用于模拟(不实际访问数据库)的异步数据库类
class MockAsyncDatabase:
# 模拟异步数据库说明
"""模拟的异步数据库"""
# 模拟异步执行SQL查询的方法
async def execute_query(self, query, name, delay=1):
# 输出开始执行的提示信息
"""模拟数据库查询"""
print(f"{name}: 开始执行查询: {query}")
# 异步睡眠以模拟耗时操作
await asyncio.sleep(delay) # 模拟数据库延迟
# 输出查询完成信息
print(f"{name}: 查询完成")
# 返回模拟的查询结果
return f"{name}_result"
# 模拟并发执行多条SQL查询的方法
async def concurrent_queries(self):
# 并发查询演示说明
"""并发查询演示"""
# 定义多个模拟查询
queries = [
("SELECT * FROM users", "用户查询", 0.5),
("SELECT COUNT(*) FROM orders", "订单统计", 0.3),
("UPDATE products SET stock = stock - 1", "库存更新", 0.2),
("INSERT INTO logs (message) VALUES ('test')", "日志插入", 0.4)
]
# 存放任务的列表
tasks = []
# 遍历所有查询,准备每个任务
for query, name, delay in queries:
task = self.execute_query(query, name, delay)
tasks.append(task)
# 并发执行所有任务,等待所有结果
results = await asyncio.gather(*tasks)
# 返回所有结果
return results
# 定义演示数据库操作的主异步函数
async def database():
# 数据库操作演示说明
"""数据库操作演示"""
# 创建模拟数据库对象
db = MockAsyncDatabase()
# 调用并发查询方法并获取结果
results = await db.concurrent_queries()
# 打印所有查询结果
print(f"所有查询结果: {results}")
# 执行主异步函数
asyncio.run(database())
8.错误处理和调试
8.1.异步异常处理
在异步编程中,异常处理和同步代码类似,也可以使用 try/except 语句来捕获异常。例如:
import asyncio
async def some_async_func():
# 示例异步函数,实现你自己的逻辑
await asyncio.sleep(1)
return "执行成功"
async def main():
try:
result = await some_async_func()
print(result)
except Exception as e:
print("发生异常:", e)
if __name__ == "__main__":
asyncio.run(main())
此方法适合单个协程的错误捕获。
8.1.1.asyncio.gather 的异常机制
asyncio.gather 并发执行多个协程。当其中任何一个协程抛出异常时,默认会终止其他协程并传播第一个异常。
但如果将 return_exceptions=True,则所有异常会被包装为异常对象返回,不会中断其他任务。示例:
import asyncio
async def task1():
await asyncio.sleep(1)
return '任务1完成'
async def task2():
await asyncio.sleep(1)
raise Exception('任务2出错')
async def main():
results = await asyncio.gather(task1(), task2(), return_exceptions=True)
for result in results:
if isinstance(result, Exception):
print("任务异常:", result)
else:
print("任务成功:", result)
if __name__ == "__main__":
asyncio.run(main())
8.1.2.asyncio.wait 的异常捕获
asyncio.wait 执行的一组任务可以分为已完成 (done) 和未完成 (pending)。可以遍历 done 中的任务,通过 task.exception() 查看每个任务是否有异常:
# 导入 asyncio 库,用于异步编程
import asyncio
# 定义一个异步任务函数,接受一个参数 n
async def example_task(n):
# 异步休眠 1 秒
await asyncio.sleep(1)
# 如果 n 是偶数,返回任务完成信息
if n % 2 == 0:
return f"Task {n} completed"
# 否则抛出异常
else:
raise Exception(f"Task {n} error")
# 定义主异步函数
async def main():
# 创建 5 个异步任务,分别传入 0 到 4
tasks = [asyncio.create_task(example_task(i)) for i in range(5)]
# 等待所有任务完成,done 是已完成任务集合,pending 是未完成任务集合
done, pending = await asyncio.wait(tasks)
# 遍历所有已完成的任务
for task in done:
# 如果任务有异常
if task.exception():
# 打印捕获到的异常信息
print("捕获到异常:", task.exception())
# 否则打印任务的返回结果
else:
print("任务结果:", task.result())
# 程序入口,判断是否为主模块运行
if __name__ == '__main__':
# 运行主异步函数
asyncio.run(main())
8.1.3.小结
- 单个协程用
try/except。 - 并发推荐
gather(..., return_exceptions=True)。 - 也可用
wait配合检查每个任务的异常。 这些方法可以提高异步程序的健壮性和可调试性。
8.2.异步调试
# 导入异步IO模块
import asyncio
# 导入日志模块
import logging
# 定义异步调试函数
async def async_debugging():
"""异步调试演示"""
# 配置日志调试参数
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 定义一个异步任务,接收任务名和延迟参数
async def debug_task(name, delay):
# 日志记录任务开始
logging.debug(f"任务 {name} 开始")
# 异步休眠指定秒数
await asyncio.sleep(delay)
# 日志记录任务结束
logging.debug(f"任务 {name} 结束")
# 返回任务结果字符串
return f"{name}_result"
# 创建第一个异步任务,并设置任务名称
task1 = asyncio.create_task(debug_task("调试任务1", 1), name="debug_task_1")
# 创建第二个异步任务,并设置任务名称
task2 = asyncio.create_task(debug_task("调试任务2", 0.5), name="debug_task_2")
# 打印当前所有运行的任务信息
print("当前运行的任务:")
# 遍历所有当前的异步任务
for task in asyncio.all_tasks():
# 打印每个任务的名称和协程对象
print(f" - {task.get_name()}: {task.get_coro()}") # 修正:get_name()
# 等待所有任务完成,并收集返回结果
results = await asyncio.gather(task1, task2)
# 打印任务结果
print(f"任务结果: {results}")
# 导入os模块,用于设置环境变量
import os
# 启用异步调试模式
os.environ['PYTHONASYNCIODEBUG'] = '1'
# 运行异步调试函数
asyncio.run(async_debugging())
9.性能优化和最佳实践
9.1.异步性能测试
# 导入异步相关库
import asyncio
# 导入时间库用于计时
import time
# 导入requests库用于同步HTTP请求
import requests
# 导入aiohttp库用于异步HTTP请求
import aiohttp
# 定义性能比较器类
class PerformanceComparator:
# 类的说明文档
"""性能比较器"""
# 定义静态方法用于异步HTTP请求
@staticmethod
async def async_http_requests(urls):
# 方法的说明文档
"""异步HTTP请求"""
# 创建异步HTTP会话
async with aiohttp.ClientSession() as session:
# 初始化任务列表
tasks = []
# 遍历每一个url
for url in urls:
# 创建GET请求任务
task = session.get(url)
# 添加任务到任务列表
tasks.append(task)
# 并发执行所有任务并获取响应
responses = await asyncio.gather(*tasks)
# 返回所有响应的状态码
return [resp.status for resp in responses]
# 定义静态方法用于同步HTTP请求
@staticmethod
def sync_http_requests(urls):
# 方法的说明文档
"""同步HTTP请求"""
# 初始化结果列表
results = []
# 遍历每一个url
for url in urls:
# 发送GET请求
response = requests.get(url)
# 添加响应状态码到结果列表
results.append(response.status_code)
# 返回所有结果
return results
# 定义异步方法用于比较性能
async def compare_performance(self, urls):
# 方法的说明文档
"""比较性能"""
# 打印待测试的URL数量
print(f"测试 {len(urls)} 个URL")
# --- 同步部分 ---
# 打印同步测试标题
print("=== 同步版本 ===")
# 记录开始时间
start = time.time()
# 进行同步HTTP请求
sync_results = self.sync_http_requests(urls)
# 计算同步请求耗时
sync_time = time.time() - start
# 打印同步请求耗时
print(f"同步耗时: {sync_time:.2f}秒")
# --- 异步部分 ---
# 打印异步测试标题
print("\n=== 异步版本 ===")
# 记录开始时间
start = time.time()
# 进行异步HTTP请求
async_results = await self.async_http_requests(urls)
# 计算异步请求耗时
async_time = time.time() - start
# 打印异步请求耗时
print(f"异步耗时: {async_time:.2f}秒")
# 计算性能提升倍数
improvement = sync_time / async_time
# 打印性能提升结果
print(f"\n性能提升: {improvement:.2f}x")
# 返回同步和异步的结果
return sync_results, async_results
# 定义异步性能演示函数
async def performance():
# 方法文档说明
"""性能演示"""
# 使用httpbin.org作为测试URL,重复5次
urls = ["https://httpbin.org/delay/1"] * 5
# 创建性能比较器对象
comparator = PerformanceComparator()
# 调用性能比较方法并等待结果
await comparator.compare_performance(urls)
# 启动性能测试
asyncio.run(performance())
9.2.最佳实践
# 导入异步IO模块
import asyncio
# 定义异步最佳实践类
class AsyncBestPractices:
"""异步编程最佳实践"""
# 静态方法:正确的任务管理
@staticmethod
async def proper_task_management():
"""正确的任务管理"""
# 定义一个异步工人函数
async def worker(name):
# 异步睡眠1秒
await asyncio.sleep(1)
# 返回结果字符串
return f"{name}_done"
# 创建任务列表用于保存引用
tasks = []
# 循环创建3个任务
for i in range(3):
# 创建一个异步任务
task = asyncio.create_task(worker(f"worker_{i}"))
# 把任务添加到任务列表
tasks.append(task)
# 等待所有任务完成,并获取结果
results = await asyncio.gather(*tasks)
# 返回所有任务结果
return results
# 静态方法:避免阻塞调用
@staticmethod
async def avoid_blocking_calls():
"""避免阻塞调用"""
# 导入time模块,用于演示阻塞
import time
# 定义一个好的异步IO操作
async def good_io_operation():
# 使用异步sleep
await asyncio.sleep(0.1)
# 定义一个坏的IO操作(阻塞)
async def bad_io_operation():
# 使用同步sleep,会阻塞事件循环
time.sleep(0.1) # 这会阻塞事件循环!
# 调用好的异步IO操作
await good_io_operation()
# 静态方法:使用超时
@staticmethod
async def use_timeouts():
"""使用超时"""
# 定义一个慢操作的异步函数
async def slow_operation():
# 模拟慢操作,睡眠5秒
await asyncio.sleep(5)
# 返回“完成”
return "完成"
try:
# 使用asyncio.wait_for设置超时为2秒
result = await asyncio.wait_for(slow_operation(), timeout=2.0)
# 如果未超时,返回结果
return result
except asyncio.TimeoutError:
# 捕获超时异常,返回“操作超时”
return "操作超时"
# 静态方法:资源清理
@staticmethod
async def resource_cleanup():
"""资源清理"""
# 定义一个资源类
class Resource:
# 定义异步清理方法
async def cleanup(self):
# 清理前异步等待0.1秒
await asyncio.sleep(0.1)
# 打印资源已清理
print("资源已清理")
# 实例化资源对象
resource = Resource()
try:
# 使用资源(假装占用0.5秒)
await asyncio.sleep(0.5)
finally:
# 最终确保资源被清理
await resource.cleanup()
# 定义主函数,演示最佳实践
async def best_practices():
"""最佳实践演示"""
# 创建最佳实践对象
practices = AsyncBestPractices()
# 打印任务管理演示
print("=== 任务管理 ===")
# 调用并等待任务管理方法
results = await practices.proper_task_management()
# 打印任务结果
print(f"任务结果: {results}")
# 打印超时控制演示
print("\n=== 超时控制 ===")
# 调用并等待超时方法
timeout_result = await practices.use_timeouts()
# 打印超时结果
print(f"超时结果: {timeout_result}")
# 打印资源清理演示
print("\n=== 资源清理 ===")
# 调用并等待资源清理方法
await practices.resource_cleanup()
# 运行主异步函数
asyncio.run(best_practices())
10.实际项目示例
10.1.异步Web爬虫
# 导入asyncio模块,用于异步编程
import asyncio
# 导入aiohttp模块,用于异步HTTP请求
import aiohttp
# 导入urljoin和urlparse方法,用于处理URL
from urllib.parse import urljoin, urlparse
# 导入time模块,用于计时
import time
# 定义异步Web爬虫类
class AsyncWebCrawler:
# 类文档字符串,说明用途
"""异步Web爬虫"""
# 初始化方法,设置最大并发数和延迟时间
def __init__(self, max_concurrent=10, delay=1):
# 最大并发任务数
self.max_concurrent = max_concurrent
# 每次请求的延迟(秒)
self.delay = delay
# 已访问的URL集合
self.visited = set()
# 创建异步信号量控制并发数
self.semaphore = asyncio.Semaphore(max_concurrent)
# HTTP会话对象
self.session = None
# 爬虫的主入口,异步方法
async def crawl(self, start_url, max_pages=20):
# 文档字符串:开始爬取
"""开始爬取"""
# 创建aiohttp的会话
self.session = aiohttp.ClientSession()
# 创建异步队列
queue = asyncio.Queue()
# 将开始URL放入队列
await queue.put(start_url)
# 创建多个worker协程任务
workers = [
asyncio.create_task(self.worker(f"worker-{i}", queue, max_pages))
for i in range(self.max_concurrent)
]
try:
# 等待队列消费完毕
await queue.join()
finally:
# 取消所有worker任务
for worker in workers:
worker.cancel()
# 关闭HTTP会话
await self.session.close()
# 输出总共访问页面数量
print(f"爬取完成! 总共访问了 {len(self.visited)} 个页面")
# 返回访问过的页面列表
return list(self.visited)
# worker工作线程,异步方法
async def worker(self, name, queue, max_pages):
# 文档字符串:爬虫工作线程
"""爬虫工作线程"""
# 当未达到最大页面数时循环工作
while len(self.visited) < max_pages:
try:
# 尝试从队列获取新的URL,设置超时时间为5秒
url = await asyncio.wait_for(queue.get(), timeout=5.0)
# 如果URL已经访问过,则标记完成后继续下一个
if url in self.visited:
queue.task_done()
continue
# 使用信号量限制并发请求
async with self.semaphore:
# 处理单个页面
await self.process_page(url, queue)
# 将当前URL添加到已访问集合
self.visited.add(url)
# 通知队列该任务完成
queue.task_done()
# 延迟,避免过快请求
await asyncio.sleep(self.delay)
# 如果队列空闲超过超时时间,退出
except asyncio.TimeoutError:
break
# 其他异常输出错误信息并继续
except Exception as e:
print(f"{name} 处理错误: {e}")
queue.task_done()
# 处理单个页面的方法
async def process_page(self, url, queue):
# 文档字符串:处理单个页面
"""处理单个页面"""
try:
# 发送HTTP GET请求,设置超时为10秒
async with self.session.get(url, timeout=10) as response:
# 如果响应状态为200
if response.status == 200:
# 获取页面内容
content = await response.text()
# 输出爬取成功消息和内容长度
print(f"成功爬取: {url} (长度: {len(content)})")
# 可以在这里解析页面并提取更多链接
# 简化版:只做演示用途
# 只在已访问数量小于10时添加新链接
if len(self.visited) < 10: # 限制链接数量
# 提取页面中的链接
new_links = self.extract_links(content, url)
# 遍历新链接,将未访问过的加入队列
for link in new_links:
if link not in self.visited:
await queue.put(link)
# 捕获并输出请求异常
except Exception as e:
print(f"爬取失败 {url}: {e}")
# 简单的链接提取方法
def extract_links(self, content, base_url):
# 文档字符串:提取链接(简化版)
"""提取链接(简化版)"""
# 实际项目中通常使用BeautifulSoup等解析HTML
# 这里仅作示例,返回模拟链接
return [
f"{base_url}/page1",
f"{base_url}/page2",
f"{base_url}/page3"
]
# 定义异步函数,用于演示Web爬虫使用
async def web_crawler():
# 文档字符串:Web爬虫演示
"""Web爬虫演示"""
# 创建爬虫对象,设置最大并发数和延迟
crawler = AsyncWebCrawler(max_concurrent=5, delay=0.5)
# 记录开始时间
start_time = time.time()
# 执行爬虫,爬取最多10个页面
results = await crawler.crawl("https://httpbin.org/html", max_pages=10)
# 记录结束时间
end_time = time.time()
# 输出爬虫运行耗时
print(f"爬虫耗时: {end_time - start_time:.2f}秒")
# 输出访问的页面列表
print(f"访问的页面: {results}")
# 运行Web爬虫演示异步函数
asyncio.run(web_crawler())
11.总结
11.1.核心概念
- async/await: 定义和调用协程
- 事件循环: 管理和调度异步任务
- 协程: 异步函数,使用async def定义
- 任务: 对协程的封装,用于并发执行
- Future: 代表异步操作的最终结果
11.2.关键函数
asyncio.run(): 运行异步程序asyncio.create_task(): 创建任务asyncio.gather(): 并发执行多个协程asyncio.sleep(): 异步等待asyncio.wait(): 等待多个任务完成
11.3.最佳实践
- 避免在异步代码中使用阻塞操作
- 使用异步上下文管理器管理资源
- 合理控制并发数量
- 正确处理异常和超时
- 使用适当的调试工具

