Dive into Python asyncio - part 2
In the second part of this series on deep diving into asyncio
and async/await
in Python, we will be looking at the following topics:
- task, task groups, task cancellation
- async queues
- async locks and semaphores
- async context managers
- async error handling
Task, Task Groups, Task Cancellation
Tasks are the basic unit of work in asyncio
. A task is a coroutine that is scheduled to run in the event loop. Tasks are created using the asyncio.create_task()
function. The asyncio.create_task()
function takes a coroutine as an argument and returns a Task
object. The Task
object is a subclass of Future
and can be used to cancel the task.
import asyncio
async def foo():
while True:
asyncio.sleep(1)
print("foo")
async def bar():
while True:
asyncio.sleep(1)
print("bar")
async def main():
task1 = asyncio.create_task(foo())
task2 = asyncio.create_task(bar())
await asyncio.sleep(5)
task1.cancel()
task2.cancel()
asyncio.run(main())
Note the above example is very similar to asyncio.run
, the main difference is that tasks can be cancelled before they are done. When an executing async function is cancelled, a CancelledError
exception is raised. The CancelledError
exception can be caught and handled in the async function.
import asyncio
async def foo():
try:
while True:
await asyncio.sleep(1)
print("foo")
except asyncio.CancelledError:
print("foo cancelled")
async def main():
task1 = asyncio.create_task(foo())
await asyncio.sleep(5)
task1.cancel()
>>> asyncio.run(main())
foo
foo
foo
foo
foo
foo cancelled
Dispatching groups of tasks can done by simply calling create_task
in a loop.
import asyncio
async def foo(id, sleep_time = 1):
while True:
await asyncio.sleep(sleep_time)
print("foo with id {}".format(id))
async def main():
tasks = [asyncio.create_task(foo(i)) for i in range(5)]
await asyncio.sleep(5)
for task in tasks:
task.cancel()
asyncio.run(main())
Alternatively, we can use asyncio.gather
, but we’ll lose the task handle and have to access it via the all_tasks
method.
import asyncio
async def foo(id, sleep_time = 1):
while True:
await asyncio.sleep(sleep_time)
print("foo with id {}".format(id))
async def main():
await asyncio.gather(*[foo(i) for i in range(5)])
await asyncio.sleep(5)
# cancel all dispatched tasks
for task in asyncio.all_tasks():
task.cancel()
asyncio.run(main())
The asyncio.create_task
method will use the existing event loop. The current Python syntax does not support passing in a custom event loop, however, create_task
is also a method on the event loop, so to explicitly specify the event loop, we can do the following:
loop = asyncio.new_event_loop()
# task now running on event loop we just created
task = loop.create_task(foo())
In most cases, we will not need to explicitly create a new event loop. One of the few exceptions is if we know we’ll be doing the asyncio in a background thread. In which case, we’ll need to create a custom executor and pass it to the event loop as a parameter.
import asyncio
import concurrent.futures
def blocking_io():
# File operations (such as logging) can block the
# event loop: run them in a thread pool.
with open('/dev/urandom', 'rb') as f:
return f.read(100)
def cpu_bound():
# CPU-bound operations will block the event loop:
# in general it is preferable to run them in a
# process pool.
return sum(i * i for i in range(10 ** 7))
async def main():
loop = asyncio.get_running_loop()
## Options:
# 1. Run in the default loop's executor:
result = await loop.run_in_executor(
None, blocking_io)
print('default thread pool', result)
# 2. Run in a custom thread pool:
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, blocking_io)
print('custom thread pool', result)
# 3. Run in a custom process pool:
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, cpu_bound)
print('custom process pool', result)
asyncio.run(main())
For event loop that runs a CPU bound task, the main loop will not block while the executor is doing the work. The main loop will continue to run and process other tasks. The main loop will be blocked when it tries to get the result of the task from the executor. As a result, if there’s a lot of context switching between the main loop and the executor, it may be more efficient to run the CPU bound task in a separate process.
Async Queues
asyncio
provides a queue implementation that can be used to schedule coroutines to run in a first-in-first-out order. The asyncio.queue
implementation is NOT thread-safe. However, this is not an issue as long as we do not have concurrent tasks executing on different threads. A typical asyncio producer/consumer pattern looks like this:
import asyncio
async def producer(queue, n):
for x in range(n):
print('producing {}/{}'.format(x, n))
# simulate i/o operation using sleep
await asyncio.sleep(1)
item = str(x)
# put the item in the queue
await queue.put(item)
# indicate the producer is done
await queue.put(None)
async def consumer(queue):
while True:
# wait for an item from the producer
item = await queue.get()
if item is None:
# the producer emits None to indicate that it is done
break
print('consuming item {}...'.format(item))
# simulate i/o operation using sleep
await asyncio.sleep(1)
# Notify the queue that the item has been processed
queue.task_done()
async def main():
# Create a queue that we will use to store our messages
queue = asyncio.Queue()
# schedule the consumer
consumer_task = asyncio.create_task(consumer(queue))
# run the producer and wait for completion
await producer(queue, 10)
# wait until the consumer has processed all items
await queue.join()
# the consumer is still awaiting for an item, we can
# cancel it now
consumer_task.cancel()
asyncio.run(main())
>>> producing 0/10
>>> consuming item 0...
>>> producing 1/10
>>> consuming item 1...
>>> producing 2/10
>>> consuming item 2...
>>> producing 3/10
>>> consuming item 3...
...
Async Locks
Locks are a parallel programming concept familiar to most C++/C developrs. In Python, we can use the asyncio.Lock
class to implement a lock. The asyncio.Lock
class is NOT thread-safe. Its purpose is to limit concurrent access to a shared resource. Typical usage example:
import asyncio
async def worker_with(lock, worker_id):
print('worker {} is waiting for the lock'.format(worker_id))
async with lock:
print('worker {} has acquired the lock'.format(worker_id))
await asyncio.sleep(1)
print('worker {} has released the lock'.format(worker_id))
async def main():
lock = asyncio.Lock()
await asyncio.gather(*(worker_with(lock, i) for i in range(3)))
asyncio.run(main())
>>> worker 0 is waiting for the lock
>>> worker 1 is waiting for the lock
>>> worker 2 is waiting for the lock
>>> worker 1 has acquired the lock
>>> worker 1 has released the lock
>>> worker 2 has acquired the lock
>>> worker 2 has released the lock
>>> worker 0 has acquired the lock
>>> worker 0 has released the lock
Note the asyncio.Lock
implements __aenter__
and __aexit__
methods, so we can use it in a with
statement. The asyncio.Lock
class also implements the locked
property, which can be used to check if the lock is currently acquired.
When an async task has the lock, even during an await
context switch, the lock is still held. Preventing other tasks that might have been context-switched to from accessing the protected resource. The resource is typically:
- a shared data structure
- a shared file
- a shared database connection
Semaphores are similar to locks, but they allow a limited number of tasks to acquire the lock. The asyncio.Semaphore
class implements a semaphore.
import asyncio
async def worker_with(semaphore, worker_id):
print('worker {} is waiting for the semaphore'.format(worker_id))
async with semaphore:
print('worker {} has acquired the semaphore'.format(worker_id))
await asyncio.sleep(1)
print('worker {} has released the semaphore'.format(worker_id))
async def main():
semaphore = asyncio.Semaphore(2)
await asyncio.gather(*(worker_with(semaphore, i) for i in range(3)))
asyncio.run(main())
>>> worker 0 is waiting for the semaphore
>>> worker 1 is waiting for the semaphore
>>> worker 0 has acquired the semaphore
>>> worker 2 is waiting for the semaphore
>>> worker 1 has acquired the semaphore
>>> worker 0 has released the semaphore
>>> worker 2 has acquired the semaphore
>>> worker 1 has released the semaphore
>>> worker 2 has released the semaphore
Async Events
Analogous to condition variables in C++, the asyncio.Event
allows tasks that require a certain condition to be satisfied from proceeding to wait until said condition is ready. The asyncio.Event
class implements the wait
and set
, which is analogous to wait
and notify
in C++.
import asyncio
async def waiter(event):
print('waiting for it...')
await event.wait()
print('...got it!')
async def main():
# Create an Event.
event = asyncio.Event()
# Spawn a Task to wait until 'event' is set.
await asyncio.gather(waiter(event), waiter(event), waiter(event))
# Sleep for 1 second and set the event.
await asyncio.sleep(1)
event.set()
asyncio.run(main())
Note that an event
can have multiple tasks waiting on it. When the event is set, all tasks waiting on the event are notified. (analogous to cv.notify_all()
in C++)
Events can be re-used, via the clear()
method, which will block again until the event is set again.
However, asyncio Event does not protect shared resources from concurrent access. It is only used to signal that a certain condition is ready. To enable shared resource conditional access, we can use a asyncio.Condition
object. The asyncio.Condition
class implements the wait
and notify
methods, which is analogous to wait
and notify
in C++.
Async Context Managers
Context managers are a Python feature that allows us to define a block of code that will be executed before and after a block of code. A simple example would be the asyncio.Lock
class implements the __aenter__
and __aexit__
methods. However, this pattern can be very powerful:
async with RequestHandler(store_url) as handler:
async with handler.open_read(obj_id, config=config) as reader:
frames = await reader.read(720, count=480)
# Do other things using reader
...
# Do other things using handler
...
...
This is almost identical to the synchronous with
blocks, the small caveat is that the __enter__
method of the async version can be awaited
, so if some resource is IO bound or blocked for other reasons, the loop can do a context switch and work on other tasks until the resource is ready.
Whew, this post is getting long, let’s move onto the last topic we wanted to cover:
Async Error Handlers
The asyncio
module provides a way to register an error handler for all tasks. This is useful for logging errors, or for debugging purposes. The asyncio
module provides the set_exception_handler
function, which takes a callback function as an argument. The callback function will be called with the following arguments:
import asyncio
def exception_handler(loop : asyncio.AbstractEventLoop, context : dict):
msg = context.get('exception', context['message'])
print('Caught exception: {}'.format(msg))
print('Exception context: {}'.format(context))
print('Stop the event loop')
loop.stop()
loop = asyncio.get_event_loop()
loop.set_exception_handler(exception_handler)
To use a specific handler for only one task, we can use the asyncio.Task.set_exception_handler
method. This method takes a callback function as an argument. The callback function will be called with the following arguments:
import asyncio
async def task_with_exception():
raise Exception('This exception is expected')
async def main():
task = asyncio.create_task(task_with_exception())
task.set_exception_handler(lambda t, c: print('Caught exception: {}'.format(c['exception'])))
await task
asyncio.run(main())
>>> Caught exception: This exception is expected
Ok, I think that just about covered all the topic I wanted to go into for the part 2 of the series.
In part 3, I will try to address some of the real life challenges of using asyncio
in a fairly complex, multiprocess code-base. Including how to interface asyncio
with other synchornous parts of the code-base, and how to use asyncio
in a multi-process environment.