For as long as I have worked in Python land, I never had to touch the async part of the language. I know that asyncio library has gotten a lot of love in the past few years. Recently I’ve came across an opportunity to do a lot of IO and non-cpu bound work in Python. I decided to take a deep dive into the asyncio library and see what it has to offer.

In part 1 of this series (I originally just wanted to write one post and realized the scope is way too big), we’ll cover:

  • How async code interfaces with synchronous code in Python
  • How to convert synchronous code to asynchronous code, including how to prevent blocking of the event loop via custom ThreadPoolExecutor
  • How to use asyncio to run multiple tasks concurrently

Basic example, async hello world

import asyncio

async def hello_world():
    asyncio.sleep(1)
    print("Hello world")

asyncio.run(hello_world())

>>> Hello world

Running two async functions in parallel

import asyncio

async def foo():
    while True:
        asyncio.sleep(1)
        print("foo")

async def bar():
    while True:
        asyncio.sleep(1)
        print("bar")

asyncio.run(asyncio.gather(foo(), bar()))

What if I have existing synchronous methods?

We can wrap a synchronous function in an async function, an example implementation would be a decorator (i love decorators, btw):

def async_wrap(
    loop: Optional[asyncio.BaseEventLoop] = None, executor: Optional[Executor] = None
) -> Callable:
    def _async_wrap(func: Callable) -> Callable:
        @wraps(func)
        async def run(*args, loop=loop, executor=executor, **kwargs):
            if loop is None:
                loop = asyncio.get_event_loop()
            pfunc = partial(func, *args, **kwargs)
            return await loop.run_in_executor(executor, pfunc)

        return run

    return _async_wrap

The above decorator is a higher order decorator (it takes arguments and then generates another decorator), example usage is the following:

import asyncio
import time

@async_wrap()
def foo():
    while True:
        time.sleep(1)
        print("foo from sync")

async def bar():
    while True:
        asyncio.sleep(1)
        print("bar from async")

asyncio.run(asyncio.gather(foo(), bar()))

Above code will work exactly the same to the user as the previous example. The only difference is that the foo function is now synchronous. This is useful when you have a synchronous function that you want to run in parallel with an async function.

Wait, but how? Doesn’t async / event loop based concurrency require awaitable locations to be able to context switch? We addd the thread id and the process id to the print statement to see what’s going on.

import time, os, asyncio, threading

@async_wrap()
def foo():
    while True:
        time.sleep(1)
        # get event loop id
        loop = asyncio.get_event_loop()
        print(f"foo from sync pid: {os.getpid()} tid: {threading.get_ident()}, loop id: {loop.get_debug()}")

async def bar():
    while True:
        asyncio.sleep(1)
        # get event loop id
        loop = asyncio.get_event_loop()
        print(f"bar from sync pid: {os.getpid()} tid: {threading.get_ident()}, loop id: {loop.get_debug()}")

asyncio.run(asyncio.gather(foo(), bar()))

Side note - the above code doesn’t actually work, if we run it we get something like this:

../../miniconda3/lib/python3.8/asyncio/tasks.py:813: in gather
    fut = ensure_future(arg, loop=loop)
../../miniconda3/lib/python3.8/asyncio/tasks.py:660: in ensure_future
    loop = events.get_event_loop()
../../miniconda3/lib/python3.8/asyncio/events.py:639: in get_event_loop
    raise RuntimeError('There is no current event loop in thread %r.'
E   RuntimeError: There is no current event loop in thread 'MainThread'.

Ater a lot of debugging - I got the following to work, it looks like in a decorated synchronous function, the standard asyncio methods to retrieve running loops etc doesn’t work.

def test_wrapped_async_ids() -> None:
    @async_wrap()
    def foo():
        while True:
            time.sleep(1)
            # get event loop id
            print(f"\nfoo from pid: {os.getpid()} tid: {threading.get_ident()}\n")

    async def bar():
        loop = asyncio.get_running_loop()
        while True:
            await asyncio.sleep(1)
            # get event loop id
            print(f"\nbar from pid: {os.getpid()} tid: {threading.get_ident()}\n")

    async def main():
        print("main")
        await asyncio.gather(foo(), bar())
    
    asyncio.run(main())

Anyway, if you run the above function, you will get someting like this:

bar from pid: 21883 tid: 140157781362496

foo from pid: 21883 tid: 140157642589952



bar from pid: 21883 tid: 140157781362496

foo from pid: 21883 tid: 140157642589952



foo from pid: 21883 tid: 140157642589952


bar from pid: 21883 tid: 140157781362496

The observation here is that both tasks are running on the same Python process, but coming from different threads.

What if we stopped using the async_wrap decorator? and call gather on two async functions?

...
    async def foo():
        while True:
            await asyncio.sleep(1)
            # get event loop id
            print(f"\nfoo from pid: {os.getpid()} tid: {threading.get_ident()}\n")
...

Output of this code is:

bar from pid: 22395 tid: 140233902278464


foo from pid: 22395 tid: 140233902278464


bar from pid: 22395 tid: 140233902278464


foo from pid: 22395 tid: 140233902278464


bar from pid: 22395 tid: 140233902278464

As we can see, the tid and pids are all the same but we are getting prints concurrently from both async functions.

It appears asyncio gather is pretty smart and is changing the number of threads the executor is running on based on the number of concurrent tasks submitted - let’s try to prove this.

def test_wrapped_concurrent_thread_limit() -> None:
    @async_wrap()
    def foo():
        time.sleep(0.1)
        return threading.get_ident()

    async def bar():
        await asyncio.sleep(0.1)
        return threading.get_ident()

    async def main_wrapped():
        for n_concurrent_tasks in range(1, 20, 4):
            print(f"n_concurrent_tasks: {n_concurrent_tasks}")
            res = await asyncio.gather(*[foo() for _ in range(n_concurrent_tasks)])
            num_threads_used = len(set(res))
            print(f"Dispatched {n_concurrent_tasks} tasks, used {num_threads_used} threads")
            assert num_threads_used == n_concurrent_tasks

    async def main_async():
        for n_concurrent_tasks in range(1, 20, 4):
            print(f"n_concurrent_tasks: {n_concurrent_tasks}")
            res = await asyncio.gather(*[bar() for _ in range(n_concurrent_tasks)])
            num_threads_used = len(set(res))
            print(f"Dispatched {n_concurrent_tasks} tasks, used {num_threads_used} threads")
            assert num_threads_used == 1
    
    asyncio.run(main_wrapped())
    asyncio.run(main_async())

If we run this unit test, we’ll get something like this:

tests/test_async_wrapper.py::test_wrapped_concurrent_thread_limit n_concurrent_tasks: 1
Dispatched 1 tasks, used 1 threads
n_concurrent_tasks: 5
Dispatched 5 tasks, used 5 threads
n_concurrent_tasks: 9
Dispatched 9 tasks, used 9 threads
n_concurrent_tasks: 13
Dispatched 13 tasks, used 13 threads
n_concurrent_tasks: 17
Dispatched 17 tasks, used 17 threads
n_concurrent_tasks: 1
Dispatched 1 tasks, used 1 threads
n_concurrent_tasks: 5
Dispatched 5 tasks, used 1 threads
n_concurrent_tasks: 9
Dispatched 9 tasks, used 1 threads
n_concurrent_tasks: 13
Dispatched 13 tasks, used 1 threads
n_concurrent_tasks: 17
Dispatched 17 tasks, used 1 threads
PASSED

As we can see, when we use the async_wrap decorator, we are using a different thread for each task, but when we use the async sleep, we are using the same thread for all tasks.

In other words, the event loop can use multiple threads to run tasks concurrently, but it will only use one thread if all tasks are async.

Is there a way to limit the # of threads an event loop can use?

One way is through the executor argument of our decorator to explicitly specify a custom executor for our tasks.

def test_async_limited_threads() -> None:

    # executor with only 2 threads
    executor = ThreadPoolExecutor(max_workers=2)

    @async_wrap(executor=executor)
    def foo():
        time.sleep(0.1)
        return threading.get_ident()
    
    async def main_wrapped():
        for n_concurrent_tasks in range(1, 20, 4):
            print(f"n_concurrent_tasks: {n_concurrent_tasks}")
            res = await asyncio.gather(*[foo() for _ in range(n_concurrent_tasks)])
            num_threads_used = len(set(res))
            print(f"Dispatched {n_concurrent_tasks} tasks, used {num_threads_used} threads")
            assert num_threads_used <= 2

    asyncio.run(main_wrapped())

Output:

tests/test_async_threads.py::test_async_limited_threads n_concurrent_tasks: 1
Dispatched 1 tasks, used 1 threads
n_concurrent_tasks: 5
Dispatched 5 tasks, used 2 threads
n_concurrent_tasks: 9
Dispatched 9 tasks, used 2 threads
n_concurrent_tasks: 13
Dispatched 13 tasks, used 2 threads
n_concurrent_tasks: 17
Dispatched 17 tasks, used 2 threads

As we can see, we are using at most 2 threads for all tasks.

Part 1 summary

  • asyncio event loop can run tasks concurrently
  • asyncio event loop can run using different executors which gives you the ability to run things on different threads and different process pools
  • since threads in Python is single core only due to GIL, it cannot speed up compute-bound tasks like a expensive calculation or a neural network training step - but it is handy if there’s a lot of IO bound tasks.
  • We can wrap existing synchronous code with decorator utilities to make it play nice with the async framework