Python 异步编程完全指南:从入门到实战

异步编程是现代 Python 开发的必备技能,尤其在处理 I/O 密集型任务时,能显著提升程序性能。


一、为什么需要异步编程?

1.1 同步编程的痛点

import time

def fetch_data(url):
    """模拟网络请求"""
    time.sleep(2)  # 假设请求需要2秒
    return f"Data from {url}"

# 同步执行:总共需要 6 秒
start = time.time()
result1 = fetch_data("api/users")
result2 = fetch_data("api/posts")
result3 = fetch_data("api/comments")
print(f"同步耗时: {time.time() - start:.2f}秒")  # ~6秒

1.2 异步编程的优势

import asyncio
import time

async def fetch_data(url):
    """异步模拟网络请求"""
    await asyncio.sleep(2)  # 异步等待
    return f"Data from {url}"

async def main():
    start = time.time()
    # 并发执行:总共只需约 2 秒
    results = await asyncio.gather(
        fetch_data("api/users"),
        fetch_data("api/posts"),
        fetch_data("api/comments")
    )
    print(f"异步耗时: {time.time() - start:.2f}秒")  # ~2秒

asyncio.run(main())

二、核心概念解析

2.1 协程 (Coroutine)

协程是异步编程的基本单位,使用 async def 定义:

async def my_coroutine():
    print("开始执行")
    await asyncio.sleep(1)  # 挂起点
    print("继续执行")
    return "完成"

关键点:

  • async def 定义协程函数
  • await 暂停协程执行,等待异步操作完成
  • 协程不会自动执行,需要被调度

2.2 事件循环 (Event Loop)

事件循环是异步编程的核心,负责调度和执行协程:

import asyncio

async def say_hello():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

# 方式1:Python 3.7+
asyncio.run(say_hello())

# 方式2:手动获取事件循环
loop = asyncio.get_event_loop()
loop.run_until_complete(say_hello())
loop.close()

2.3 Task 和 Future

import asyncio

async def fetch_data(delay):
    await asyncio.sleep(delay)
    return f"数据(延迟{delay}秒)"

async def main():
    # 创建 Task
    task1 = asyncio.create_task(fetch_data(2))
    task2 = asyncio.create_task(fetch_data(1))
    
    # 等待所有任务完成
    results = await asyncio.gather(task1, task2)
    print(results)

asyncio.run(main())

三、实战案例

3.1 异步 HTTP 请求

import asyncio
import aiohttp

async def fetch_url(session, url):
    """异步获取 URL 内容"""
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        "https://httpbin.org/get",
        "https://httpbin.org/ip",
        "https://httpbin.org/headers"
    ]
    
    async with aiohttp.ClientSession() as session:
        # 并发请求所有 URL
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        
        for url, result in zip(urls, results):
            print(f"{url}: {len(result)} 字节")

asyncio.run(main())

3.2 异步数据库操作

import asyncio
import aiosqlite

async def init_db():
    """初始化数据库"""
    async with aiosqlite.connect("example.db") as db:
        await db.execute("""
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY,
                name TEXT,
                email TEXT
            )
        """)
        await db.commit()

async def insert_user(db, name, email):
    """异步插入用户"""
    await db.execute(
        "INSERT INTO users (name, email) VALUES (?, ?)",
        (name, email)
    )

async def main():
    async with aiosqlite.connect("example.db") as db:
        # 并发插入多个用户
        users = [
            ("张三", "zhangsan@example.com"),
            ("李四", "lisi@example.com"),
            ("王五", "wangwu@example.com")
        ]
        
        tasks = [insert_user(db, name, email) for name, email in users]
        await asyncio.gather(*tasks)
        await db.commit()
        
        # 查询验证
        async with db.execute("SELECT * FROM users") as cursor:
            async for row in cursor:
                print(row)

asyncio.run(init_db())
asyncio.run(main())

3.3 异步生产者-消费者模式

import asyncio
import random

async def producer(queue, name, count):
    """生产者:生成数据"""
    for i in range(count):
        item = f"{name}-item-{i}"
        await asyncio.sleep(random.uniform(0.1, 0.5))
        await queue.put(item)
        print(f"[生产者 {name}] 生产: {item}")

async def consumer(queue, name):
    """消费者:处理数据"""
    while True:
        item = await queue.get()
        if item is None:  # 毒丸模式,用于停止消费者
            break
        await asyncio.sleep(random.uniform(0.2, 0.8))
        print(f"[消费者 {name}] 处理: {item}")
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=10)
    
    # 创建生产者和消费者
    producers = [
        asyncio.create_task(producer(queue, "P1", 5)),
        asyncio.create_task(producer(queue, "P2", 5))
    ]
    consumers = [
        asyncio.create_task(consumer(queue, "C1")),
        asyncio.create_task(consumer(queue, "C2"))
    ]
    
    # 等待生产者完成
    await asyncio.gather(*producers)
    
    # 发送停止信号
    for _ in consumers:
        await queue.put(None)
    
    # 等待消费者完成
    await asyncio.gather(*consumers)

asyncio.run(main())

3.4 异步 Web 爬虫

import asyncio
import aiohttp
from bs4 import BeautifulSoup

class AsyncCrawler:
    def __init__(self, max_concurrent=5):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.results = []
    
    async def fetch_page(self, session, url):
        """获取页面内容"""
        async with self.semaphore:
            try:
                async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
                    return await resp.text()
            except Exception as e:
                print(f"请求失败 {url}: {e}")
                return None
    
    async def parse_page(self, html, url):
        """解析页面"""
        if html is None:
            return None
        
        soup = BeautifulSoup(html, 'html.parser')
        return {
            "url": url,
            "title": soup.title.string if soup.title else "无标题",
            "links": len(soup.find_all('a'))
        }
    
    async def crawl(self, urls):
        """爬取多个页面"""
        async with aiohttp.ClientSession() as session:
            tasks = []
            for url in urls:
                tasks.append(self._process_url(session, url))
            
            self.results = await asyncio.gather(*tasks)
            return [r for r in self.results if r is not None]
    
    async def _process_url(self, session, url):
        html = await self.fetch_page(session, url)
        return await self.parse_page(html, url)

# 使用示例
async def main():
    crawler = AsyncCrawler(max_concurrent=3)
    urls = [
        "https://example.com",
        "https://httpbin.org",
        "https://python.org"
    ]
    
    results = await crawler.crawl(urls)
    for result in results:
        print(f"URL: {result['url']}")
        print(f"标题: {result['title']}")
        print(f"链接数: {result['links']}")
        print("---")

asyncio.run(main())

四、最佳实践

4.1 正确使用 asyncio.gather vs asyncio.wait

# gather: 按顺序返回结果
results = await asyncio.gather(task1, task2, task3)

# wait: 返回完成和未完成的任务集合
done, pending = await asyncio.wait(
    [task1, task2, task3],
    return_when=asyncio.FIRST_COMPLETED  # 第一个完成就返回
)

4.2 异常处理

import asyncio

async def risky_operation():
    await asyncio.sleep(1)
    raise ValueError("模拟错误")

async def main():
    # 方式1:gather 中捕获异常
    try:
        await asyncio.gather(risky_operation())
    except ValueError as e:
        print(f"捕获异常: {e}")
    
    # 方式2:使用 return_exceptions=True
    results = await asyncio.gather(
        risky_operation(),
        return_exceptions=True
    )
    for result in results:
        if isinstance(result, Exception):
            print(f"任务失败: {result}")

asyncio.run(main())

4.3 超时控制

import asyncio

async def slow_operation():
    await asyncio.sleep(10)
    return "完成"

async def main():
    try:
        # 方式1:asyncio.wait_for
        result = await asyncio.wait_for(slow_operation(), timeout=3)
    except asyncio.TimeoutError:
        print("操作超时")
    
    # 方式2:asyncio.timeout (Python 3.11+)
    try:
        async with asyncio.timeout(3):
            result = await slow_operation()
    except TimeoutError:
        print("操作超时")

asyncio.run(main())

4.4 避免常见陷阱

# ❌ 错误:在协程中使用阻塞调用
async def bad_example():
    import time
    time.sleep(5)  # 会阻塞整个事件循环!
    return "done"

# ✅ 正确:使用异步版本
async def good_example():
    await asyncio.sleep(5)  # 正确的异步等待
    return "done"

# ✅ 如果必须使用阻塞调用,使用 run_in_executor
async def better_example():
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(
        None,  # 使用默认执行器
        blocking_function,
        arg1, arg2
    )
    return result

4.5 资源管理

import asyncio

class AsyncResourceManager:
    def __init__(self):
        self.resources = []
    
    async def __aenter__(self):
        print("获取资源")
        self.resource = await self._acquire_resource()
        return self.resource
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("释放资源")
        await self._release_resource(self.resource)
    
    async def _acquire_resource(self):
        await asyncio.sleep(0.1)  # 模拟获取资源
        return "resource"
    
    async def _release_resource(self, resource):
        await asyncio.sleep(0.1)  # 模拟释放资源

# 使用异步上下文管理器
async def main():
    async with AsyncResourceManager() as resource:
        print(f"使用资源: {resource}")

asyncio.run(main())

五、性能对比

场景 同步耗时 异步耗时 提升倍数
10个HTTP请求 ~10秒 ~1秒 10x
100个数据库查询 ~20秒 ~2秒 10x
文件批量处理 ~15秒 ~3秒 5x

六、总结

异步编程的核心要点:

  1. 使用 async def 定义协程
  2. 使用 await 等待异步操作
  3. 使用 asyncio.gather 并发执行多个任务
  4. 避免在协程中使用阻塞调用
  5. 合理使用信号量控制并发数
  6. 正确处理异常和超时

异步编程不是万能的,适合 I/O 密集型场景(网络请求、文件操作、数据库查询)。对于 CPU 密集型任务,考虑使用多进程。


相关资源: