本文是 https://pymotw.com/3/asyncio/synchronization.html 的学习笔记,强烈推荐直接阅读原文。
虽然 asyncio 应用一般都是单线程程序,但是它们仍然是并发程序。 为了支持安全的并发,asyncio 实现了一些 threading 和 multiprocessing 模块中比较常用的类似的同步原语。
Lock¶
Lock 可以用来安全的访问共享资源。
# asyncio_lock.py import asyncio import functools def unlock(lock): print('callback releasing lock') lock.release() async def coro1(lock): print('coro1 wating for the lock') with await lock: print('coro1 acquired lock') print('coro1 released lock') async def coro2(lock): print('coro2 wating for the lock') await lock try: print('coro2 acquired lock') finally: print('coro2 released lock') lock.release() async def main(loop): lock = asyncio.Lock() print('acquiring the lock before starting coroutines') await lock.acquire() print('lock acquired: {}'.format(lock.locked())) loop.call_later(0.1, functools.partial(unlock, lock)) print('waiting for coroutines') await asyncio.wait([coro1(lock), coro2(lock)]) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
可以使用 await 来获取锁,使用 release() 方法来释放锁。也可以用 with await 上下文语句来获取和释放锁。
$ python3.6 asyncio_lock.py acquiring the lock before starting coroutines lock acquired: True waiting for coroutines coro1 wating for the lock coro2 wating for the lock callback releasing lock coro1 acquired lock coro1 released lock coro2 acquired lock coro2 released lock
Event¶
asyncio.Event 类似 threading.Event 用来允许多个消费者等待某个事情发生,不用通过监听一个特殊的值的来说实现类似通知的功能。
# asyncio_event.py import asyncio import functools def set_event(event): print('setting event in callback') event.set() async def coro1(event): print('coro1 waiting for event') await event.wait() print('coro1 triggered') async def coro2(event): print('coro2 waiting for event') await event.wait() print('coro2 triggered') async def main(loop): event = asyncio.Event() print('event start state: {}'.format(event.is_set())) loop.call_later( 0.1, functools.partial(set_event, event) ) await asyncio.wait([coro1(event), coro2(event)]) print('event end state: {}'.format(event.is_set())) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
$ python3.6 asyncio_event.py event start state: False coro1 waiting for event coro2 waiting for event setting event in callback coro1 triggered coro2 triggered event end state: True
Condition¶
Condition 的效果类似 Event,不同的是它不是唤醒所有等待中的 coroutine, 而是通过 notify() 唤醒指定数量的待唤醒 coroutine。
# asyncio_condition.py import asyncio async def consumer(condition, n): with await condition: print('consumer {} is waiting'.format(n)) await condition.wait() print('consumer {} triggered'.format(n)) print('ending consumer {}'.format(n)) async def manipulate_condition(condition): print('starting manipulate_condition') await asyncio.sleep(0.1) for i in range(1, 3): with await condition: print('notifying {} consumers'.format(i)) condition.notify(n=i) await asyncio.sleep(0.1) with await condition: print('notifying remaining consumers') condition.notify_all() print('ending manipulate_condition') async def main(loop): condition = asyncio.Condition() consumers = [ consumer(condition, i) for i in range(5) ] loop.create_task(manipulate_condition(condition)) await asyncio.wait(consumers) event_loop = asyncio.get_event_loop() try: result = event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
在这个例子中,我们启动了五个 Condition 的消费者,每个都使用 wait() 方法来等待它们可以继续处理的通知。manipulate_condition() 首先通知了一个消费者,然后有通知了两个消费者,最后通知剩下的所有消费者。
$ python3.6 asyncio_condition.py starting manipulate_condition consumer 0 is waiting consumer 4 is waiting consumer 1 is waiting consumer 2 is waiting consumer 3 is waiting notifying 1 consumers consumer 0 triggered ending consumer 0 notifying 2 consumers consumer 4 triggered ending consumer 4 consumer 1 triggered ending consumer 1 notifying remaining consumers ending manipulate_condition consumer 2 triggered ending consumer 2 consumer 3 triggered ending consumer 3
Queue¶
asyncio.Queue 为 coroutines 实现了一个先进先出的数据结构,类似多线程中的 queue.Queue ,多进程中的 multiprocessing.Queue
# asyncio_queue.py import asyncio async def consumer(n, q): print('consumer {}: waiting for item'.format(n)) while True: print('consumer {}: waiting for item'.format(n)) item = await q.get() print('consumer {}: has item {}'.format(n, item)) # 在这个程序中 None 是个特殊的值,表示终止信号 if item is None: q.task_done() break else: await asyncio.sleep(0.01 * item) q.task_done() print('consumer {}: ending'.format(n)) async def producer(q, num_workers): print('producer: starting') # 向队列中添加一些数据 for i in range(num_workers * 3): await q.put(i) print('producer: added task {} to the queue'.format(i)) # 通过 None 这个特殊值来通知消费者退出 print('producer: adding stop signals to the queue') for i in range(num_workers): await q.put(None) print('producer: waiting for queue to empty') await q.join() print('producer: ending') async def main(loop, num_consumers): # 创建指定大小的队列,这样的话生产者将会阻塞 # 直到有消费者获取数据 q = asyncio.Queue(maxsize=num_consumers) # 调度消费者 consumers = [ loop.create_task(consumer(i, q)) for i in range(num_consumers) ] # 调度生产者 prod = loop.create_task(producer(q, num_consumers)) # 等待所有 coroutines 都完成 await asyncio.wait(consumers + [prod]) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop, 2)) finally: event_loop.close()
通过 put() 添加项或者通过 get() 移除项都是异步操作,同时有可能队列大小达到指定大小(阻塞添加操作)或者队列变空(阻塞所有获取项的调用)。
$ python3.6 asyncio_queue.py consumer 0: waiting for item consumer 0: waiting for item consumer 1: waiting for item consumer 1: waiting for item producer: starting producer: added task 0 to the queue producer: added task 1 to the queue consumer 0: has item 0 consumer 1: has item 1 producer: added task 2 to the queue producer: added task 3 to the queue consumer 0: waiting for item consumer 0: has item 2 producer: added task 4 to the queue consumer 1: waiting for item consumer 1: has item 3 producer: added task 5 to the queue producer: adding stop signals to the queue consumer 0: waiting for item consumer 0: has item 4 consumer 1: waiting for item consumer 1: has item 5 producer: waiting for queue to empty consumer 0: waiting for item consumer 0: has item None consumer 0: ending consumer 1: waiting for item consumer 1: has item None consumer 1: ending producer: ending
Comments