🔒 为什么协程需要同步机制?
在异步编程中,虽然协程是单线程运行的,但当多个协程访问共享资源时,仍然可能产生竞争条件。这是因为协程的执行可能在任意await点挂起,切换到其他协程执行。
import asyncio
shared_counter = 0
async def increment_counter():
global shared_counter
# 读取当前值
current = shared_counter
# 模拟IO操作
await asyncio.sleep(0.01)
# 修改值
shared_counter = current + 1
async def main():
global shared_counter
# 创建100个协程同时增加计数器
tasks = [asyncio.create_task(increment_counter()) for _ in range(100)]
await asyncio.gather(*tasks)
print(f"Final counter value: {shared_counter}")
asyncio.run(main())
# 预期输出100,实际输出可能小于100
上述代码展示了在异步环境中共享资源的竞争条件问题。由于协程在await asyncio.sleep()
时会挂起,导致多个协程读取到相同的值,最终计数结果会小于预期值。
注意: 虽然asyncio是单线程的,但协程的挂起/恢复机制意味着共享状态的修改仍然需要同步控制,以避免数据不一致问题。
🔐 Lock(锁)的使用
Lock是最基本的同步原语,用于确保一次只有一个协程可以访问共享资源。
基本用法
import asyncio
shared_counter = 0
lock = asyncio.Lock()
async def safe_increment():
global shared_counter
async with lock: # 获取锁
current = shared_counter
await asyncio.sleep(0.01)
shared_counter = current + 1
# 离开async with块时自动释放锁
async def main():
tasks = [asyncio.create_task(safe_increment()) for _ in range(100)]
await asyncio.gather(*tasks)
print(f"Final counter value: {shared_counter}") # 总是输出100
asyncio.run(main())
手动获取/释放锁
async def manual_lock_example():
await lock.acquire()
try:
# 临界区代码
print("Accessing shared resource")
await asyncio.sleep(0.1)
finally:
lock.release()
重要: 总是使用try/finally确保锁被释放,避免死锁。使用async with语句是更安全、更简洁的方式。
🚦 Semaphore(信号量)的使用
信号量用于控制同时访问共享资源的协程数量,常用于限制并发连接数。
import asyncio
# 限制最多3个并发请求
sem = asyncio.Semaphore(3)
async def limited_task(task_id):
async with sem:
print(f"Task {task_id} started")
await asyncio.sleep(1) # 模拟IO操作
print(f"Task {task_id} finished")
async def main():
tasks = [asyncio.create_task(limited_task(i)) for i in range(10)]
await asyncio.gather(*tasks)
asyncio.run(main())
运行上述代码,你会发现最多只有3个任务同时运行,其他任务等待信号量释放。
应用场景: 限制数据库连接池大小、控制API请求频率、限制同时打开的文件数等。
🚩 Event(事件)的使用
事件用于协程之间的通知机制,一个协程等待事件发生,其他协程触发事件。
import asyncio
# 创建事件对象
event = asyncio.Event()
async def waiter():
print("Waiter: waiting for event")
await event.wait() # 等待事件被设置
print("Waiter: event triggered!")
async def setter():
print("Setter: sleeping for 2 seconds")
await asyncio.sleep(2)
print("Setter: setting event")
event.set() # 触发事件
async def main():
await asyncio.gather(waiter(), setter())
asyncio.run(main())
输出顺序:
- Waiter: waiting for event
- Setter: sleeping for 2 seconds
- Setter: setting event
- Waiter: event triggered!
提示: 事件被触发后,所有等待的协程会同时被唤醒。如果需要清除事件状态,可以使用event.clear()
。
🔄 Condition(条件变量)的使用
条件变量用于更复杂的协调场景,协程可以等待特定条件成立,并在条件改变时通知其他协程。
import asyncio
import random
queue = []
MAX_ITEMS = 5
cond = asyncio.Condition()
async def producer(id):
for _ in range(3):
await asyncio.sleep(random.uniform(0.1, 0.5))
async with cond:
# 如果队列满,等待
while len(queue) >= MAX_ITEMS:
print(f"Producer {id}: queue full, waiting")
await cond.wait()
item = f"item-{id}-{random.randint(1, 100)}"
queue.append(item)
print(f"Producer {id}: produced {item}")
# 通知消费者
cond.notify_all()
async def consumer(id):
for _ in range(4):
await asyncio.sleep(random.uniform(0.1, 0.7))
async with cond:
# 如果队列空,等待
while not queue:
print(f"Consumer {id}: queue empty, waiting")
await cond.wait()
item = queue.pop(0)
print(f"Consumer {id}: consumed {item}")
# 通知生产者
cond.notify_all()
async def main():
producers = [asyncio.create_task(producer(i)) for i in range(2)]
consumers = [asyncio.create_task(consumer(i)) for i in range(3)]
await asyncio.gather(*producers, *consumers)
asyncio.run(main())
此代码展示了经典的生产者-消费者问题,条件变量确保:
- 队列满时生产者等待
- 队列空时消费者等待
- 添加/移除元素时通知等待的协程
📦 Queue(队列)的使用
asyncio队列是线程安全/协程安全的队列实现,特别适合生产者-消费者模式。
import asyncio
import random
async def producer(queue, id):
for i in range(5):
item = f"item-{id}-{i}"
await asyncio.sleep(random.uniform(0.1, 0.3))
await queue.put(item)
print(f"Producer {id}: produced {item}")
async def consumer(queue, id):
while True:
item = await queue.get()
await asyncio.sleep(random.uniform(0.1, 0.5))
print(f"Consumer {id}: consumed {item}")
queue.task_done() # 标记任务完成
async def main():
queue = asyncio.Queue(maxsize=3)
# 创建生产者和消费者
producers = [asyncio.create_task(producer(queue, i)) for i in range(2)]
consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
# 等待生产者完成
await asyncio.gather(*producers)
# 等待队列中的所有任务被处理
await queue.join()
# 取消消费者任务
for c in consumers:
c.cancel()
asyncio.run(main())
asyncio.Queue提供以下方法:
put(item)
: 添加元素,队列满时阻塞get()
: 获取元素,队列空时阻塞task_done()
: 标记任务处理完成join()
: 阻塞直到所有任务完成
📊 同步原语对比
原语 | 用途 | 适用场景 | 注意事项 |
---|---|---|---|
Lock | 互斥访问共享资源 | 简单临界区保护 | 避免嵌套使用,防止死锁 |
Semaphore | 限制并发数量 | 资源池管理,限流 | 确保最终释放所有许可 |
Event | 一次性事件通知 | 启动/停止信号,初始化完成 | 触发后所有等待者被唤醒 |
Condition | 复杂条件等待 | 生产者-消费者,状态变化 | 配合Lock使用,避免虚假唤醒 |
Queue | 协程间安全数据传输 | 生产者-消费者,任务分发 | 注意队列大小和阻塞行为 |
✅ 最佳实践与常见陷阱
最佳实践
- 优先使用高级同步结构(如Queue)而非低级原语
- 使用
async with
管理锁资源,确保异常时也能释放 - 限制锁的持有时间,避免阻塞事件循环
- 使用
asyncio.gather(*, return_exceptions=True)
避免一个协程失败影响整体 - 为协程设置合理的超时时间
常见陷阱
死锁: 多个协程相互等待对方释放资源。避免嵌套获取多个锁,或按固定顺序获取锁。
饥饿: 某些协程长时间无法获取资源。使用公平队列或超时机制避免。
事件循环阻塞: 在协程中执行CPU密集型操作会阻塞整个事件循环。使用loop.run_in_executor()
将CPU密集型任务移到线程池执行。
🎯 总结
Python的asyncio提供了丰富的同步原语来协调协程:
- 使用Lock保护共享资源的互斥访问
- 使用Semaphore控制并发数量
- 使用Event进行简单的事件通知
- 使用Condition实现复杂的条件等待
- 使用Queue实现生产者-消费者模式
正确使用这些同步机制,可以构建高效、安全的异步应用程序。记住:尽管asyncio是单线程的,但共享状态的修改仍然需要同步控制!
发表评论