Python 异步编程完全指南:从入门到实战
Python 异步编程完全指南:从入门到实战
异步编程是现代 Python 开发的必备技能,尤其在处理 I/O 密集型任务时,能显著提升程序性能。
一、为什么需要异步编程?
1.1 同步编程的痛点
import time
def fetch_data(url):
"""模拟网络请求"""
time.sleep(2) # 假设请求需要2秒
return f"Data from {url}"
# 同步执行:总共需要 6 秒
start = time.time()
result1 = fetch_data("api/users")
result2 = fetch_data("api/posts")
result3 = fetch_data("api/comments")
print(f"同步耗时: {time.time() - start:.2f}秒") # ~6秒
1.2 异步编程的优势
import asyncio
import time
async def fetch_data(url):
"""异步模拟网络请求"""
await asyncio.sleep(2) # 异步等待
return f"Data from {url}"
async def main():
start = time.time()
# 并发执行:总共只需约 2 秒
results = await asyncio.gather(
fetch_data("api/users"),
fetch_data("api/posts"),
fetch_data("api/comments")
)
print(f"异步耗时: {time.time() - start:.2f}秒") # ~2秒
asyncio.run(main())
二、核心概念解析
2.1 协程 (Coroutine)
协程是异步编程的基本单位,使用 async def 定义:
async def my_coroutine():
print("开始执行")
await asyncio.sleep(1) # 挂起点
print("继续执行")
return "完成"
关键点:
async def定义协程函数await暂停协程执行,等待异步操作完成- 协程不会自动执行,需要被调度
2.2 事件循环 (Event Loop)
事件循环是异步编程的核心,负责调度和执行协程:
import asyncio
async def say_hello():
print("Hello")
await asyncio.sleep(1)
print("World")
# 方式1:Python 3.7+
asyncio.run(say_hello())
# 方式2:手动获取事件循环
loop = asyncio.get_event_loop()
loop.run_until_complete(say_hello())
loop.close()
2.3 Task 和 Future
import asyncio
async def fetch_data(delay):
await asyncio.sleep(delay)
return f"数据(延迟{delay}秒)"
async def main():
# 创建 Task
task1 = asyncio.create_task(fetch_data(2))
task2 = asyncio.create_task(fetch_data(1))
# 等待所有任务完成
results = await asyncio.gather(task1, task2)
print(results)
asyncio.run(main())
三、实战案例
3.1 异步 HTTP 请求
import asyncio
import aiohttp
async def fetch_url(session, url):
"""异步获取 URL 内容"""
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
"https://httpbin.org/get",
"https://httpbin.org/ip",
"https://httpbin.org/headers"
]
async with aiohttp.ClientSession() as session:
# 并发请求所有 URL
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for url, result in zip(urls, results):
print(f"{url}: {len(result)} 字节")
asyncio.run(main())
3.2 异步数据库操作
import asyncio
import aiosqlite
async def init_db():
"""初始化数据库"""
async with aiosqlite.connect("example.db") as db:
await db.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT,
email TEXT
)
""")
await db.commit()
async def insert_user(db, name, email):
"""异步插入用户"""
await db.execute(
"INSERT INTO users (name, email) VALUES (?, ?)",
(name, email)
)
async def main():
async with aiosqlite.connect("example.db") as db:
# 并发插入多个用户
users = [
("张三", "zhangsan@example.com"),
("李四", "lisi@example.com"),
("王五", "wangwu@example.com")
]
tasks = [insert_user(db, name, email) for name, email in users]
await asyncio.gather(*tasks)
await db.commit()
# 查询验证
async with db.execute("SELECT * FROM users") as cursor:
async for row in cursor:
print(row)
asyncio.run(init_db())
asyncio.run(main())
3.3 异步生产者-消费者模式
import asyncio
import random
async def producer(queue, name, count):
"""生产者:生成数据"""
for i in range(count):
item = f"{name}-item-{i}"
await asyncio.sleep(random.uniform(0.1, 0.5))
await queue.put(item)
print(f"[生产者 {name}] 生产: {item}")
async def consumer(queue, name):
"""消费者:处理数据"""
while True:
item = await queue.get()
if item is None: # 毒丸模式,用于停止消费者
break
await asyncio.sleep(random.uniform(0.2, 0.8))
print(f"[消费者 {name}] 处理: {item}")
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10)
# 创建生产者和消费者
producers = [
asyncio.create_task(producer(queue, "P1", 5)),
asyncio.create_task(producer(queue, "P2", 5))
]
consumers = [
asyncio.create_task(consumer(queue, "C1")),
asyncio.create_task(consumer(queue, "C2"))
]
# 等待生产者完成
await asyncio.gather(*producers)
# 发送停止信号
for _ in consumers:
await queue.put(None)
# 等待消费者完成
await asyncio.gather(*consumers)
asyncio.run(main())
3.4 异步 Web 爬虫
import asyncio
import aiohttp
from bs4 import BeautifulSoup
class AsyncCrawler:
def __init__(self, max_concurrent=5):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.results = []
async def fetch_page(self, session, url):
"""获取页面内容"""
async with self.semaphore:
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
return await resp.text()
except Exception as e:
print(f"请求失败 {url}: {e}")
return None
async def parse_page(self, html, url):
"""解析页面"""
if html is None:
return None
soup = BeautifulSoup(html, 'html.parser')
return {
"url": url,
"title": soup.title.string if soup.title else "无标题",
"links": len(soup.find_all('a'))
}
async def crawl(self, urls):
"""爬取多个页面"""
async with aiohttp.ClientSession() as session:
tasks = []
for url in urls:
tasks.append(self._process_url(session, url))
self.results = await asyncio.gather(*tasks)
return [r for r in self.results if r is not None]
async def _process_url(self, session, url):
html = await self.fetch_page(session, url)
return await self.parse_page(html, url)
# 使用示例
async def main():
crawler = AsyncCrawler(max_concurrent=3)
urls = [
"https://example.com",
"https://httpbin.org",
"https://python.org"
]
results = await crawler.crawl(urls)
for result in results:
print(f"URL: {result['url']}")
print(f"标题: {result['title']}")
print(f"链接数: {result['links']}")
print("---")
asyncio.run(main())
四、最佳实践
4.1 正确使用 asyncio.gather vs asyncio.wait
# gather: 按顺序返回结果
results = await asyncio.gather(task1, task2, task3)
# wait: 返回完成和未完成的任务集合
done, pending = await asyncio.wait(
[task1, task2, task3],
return_when=asyncio.FIRST_COMPLETED # 第一个完成就返回
)
4.2 异常处理
import asyncio
async def risky_operation():
await asyncio.sleep(1)
raise ValueError("模拟错误")
async def main():
# 方式1:gather 中捕获异常
try:
await asyncio.gather(risky_operation())
except ValueError as e:
print(f"捕获异常: {e}")
# 方式2:使用 return_exceptions=True
results = await asyncio.gather(
risky_operation(),
return_exceptions=True
)
for result in results:
if isinstance(result, Exception):
print(f"任务失败: {result}")
asyncio.run(main())
4.3 超时控制
import asyncio
async def slow_operation():
await asyncio.sleep(10)
return "完成"
async def main():
try:
# 方式1:asyncio.wait_for
result = await asyncio.wait_for(slow_operation(), timeout=3)
except asyncio.TimeoutError:
print("操作超时")
# 方式2:asyncio.timeout (Python 3.11+)
try:
async with asyncio.timeout(3):
result = await slow_operation()
except TimeoutError:
print("操作超时")
asyncio.run(main())
4.4 避免常见陷阱
# ❌ 错误:在协程中使用阻塞调用
async def bad_example():
import time
time.sleep(5) # 会阻塞整个事件循环!
return "done"
# ✅ 正确:使用异步版本
async def good_example():
await asyncio.sleep(5) # 正确的异步等待
return "done"
# ✅ 如果必须使用阻塞调用,使用 run_in_executor
async def better_example():
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, # 使用默认执行器
blocking_function,
arg1, arg2
)
return result
4.5 资源管理
import asyncio
class AsyncResourceManager:
def __init__(self):
self.resources = []
async def __aenter__(self):
print("获取资源")
self.resource = await self._acquire_resource()
return self.resource
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("释放资源")
await self._release_resource(self.resource)
async def _acquire_resource(self):
await asyncio.sleep(0.1) # 模拟获取资源
return "resource"
async def _release_resource(self, resource):
await asyncio.sleep(0.1) # 模拟释放资源
# 使用异步上下文管理器
async def main():
async with AsyncResourceManager() as resource:
print(f"使用资源: {resource}")
asyncio.run(main())
五、性能对比
| 场景 | 同步耗时 | 异步耗时 | 提升倍数 |
|---|---|---|---|
| 10个HTTP请求 | ~10秒 | ~1秒 | 10x |
| 100个数据库查询 | ~20秒 | ~2秒 | 10x |
| 文件批量处理 | ~15秒 | ~3秒 | 5x |
六、总结
异步编程的核心要点:
- 使用
async def定义协程 - 使用
await等待异步操作 - 使用
asyncio.gather并发执行多个任务 - 避免在协程中使用阻塞调用
- 合理使用信号量控制并发数
- 正确处理异常和超时
异步编程不是万能的,适合 I/O 密集型场景(网络请求、文件操作、数据库查询)。对于 CPU 密集型任务,考虑使用多进程。
相关资源:
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 枫月Blog
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果