Let's build an asyncio runtime from scratch in Python
asyncio
in Python is a library that provides a way to write concurrent code using the async
and await
syntax. It is built on top of the asyncio
event loop, which is a single-threaded event loop that runs tasks concurrently. Inspired by a similar post by Jacob, we will explore how asyncio
works from scratch by implementing our own event loop runtime with Python generators.
Coroutine basics in Python
Before we can dive into the implementation, a quick primer on how generators work in Python, this is all with vanilla Python and does not touch upon anything with async
keywords. David Beasly has a great talk on this called “A Curious Course on Coroutines and Concurrency” which I highly recommend.
yield
in Python generators / coroutines
Most people are familiar with generators that yield
something and then we can iterate over it. For example:
def iterable_dirs(regex : str) -> Iterable[str]:
"""Iterates over directories that match the regex"""
for root, dirs, files in os.walk('.'):
for dir in dirs:
if re.match(regex, dir):
yield os.path.join(root, dir)
# to use this
for dir in iterable_dirs(r'^[a-z]'):
print(dir)
send
and yield
to communicate with a coroutine
However, yield can also be used to communicate with the caller. For example, we can send values to the generator using the send
method. This is how we can implement a simple coroutine in Python:
def coroutine():
print("coroutine started")
while True:
value = yield
print(f"received value: {value}")
# to use this
c = coroutine()
next(c)
c.send(1)
c.send(2)
# we would see
# > coroutine started
# > received value: 1
# > received value: 2
Note the first call to next(c)
is needed to start the coroutine. This is because the first call to send
will not work. This is because the coroutine is not yet started. The next
call will start the coroutine and then the first send
call will work.
yield from
to delegate to another coroutine
We can also use yield from
to delegate to another coroutine. This is useful when we want to call another coroutine and then return the value back to the caller. For example:
def coroutine():
print("coroutine started")
while True:
value = yield from sub_coroutine()
print(f"received value: {value}")
def sub_coroutine():
print("sub coroutine started")
while True:
value = yield
print(f"sub coroutine received value: {value}")
# to use this
c = coroutine()
next(c)
c.send(1)
c.send(2)
# we would see
# > coroutine started
# > sub coroutine started
# > sub coroutine received value: 1
# > received value: 1
# > sub coroutine received value: 2
# > received value: 2
Returning values from a coroutine in Python
Typically values are “yielded” to the caller via the yield
keyword. However, you can also return using the return
keyword just like a regular function. However, the return value is wrapped in a StopIteration
exception. For example:
def corou():
print("coroutine started")
while value := yield:
print(f"received value: {value}")
return "done"
If we run this coroutine, we would see:
c = corou()
next(c)
c.send(1)
c.send(2)
c.send(None)
# we would see
# > coroutine started
# > received value: 1
# > received value: 2
# > StopIteration: done
The lifetime of variables declared in a coroutine is tied to the lifetime of the coroutine, as a result, as we go back and forth between the caller and the coroutine, the variables are preserved. This is how we can implement a simple state machine using coroutines.
Implementing a Asyncio Runtime
Cooperative multitasking with generators and coroutines
At the heart of the asyncio runtime is an event loop, in the asyncio impl, the event loop is written in C, but here we can implement it in Python. Each event in the event loop is something that can be resumed, suspended, and fits the generator paradigm quite nicely. We can use next()
mechanism to wake up a task, run the code until some blocking event, and then yield
to return control back to the loop for it to work on something else.
A simple example of the above concept can be illustrated below.
def task1():
while True:
print("task1")
yield
def task2():
while True:
print("task2")
yield
event_loop = [task1(), task2()]
while True:
for task in event_loop:
next(task)
Here we have a static event loop with only two tasks (not very useful in real life), but you can see how we can run the tasks concurrently. The next(task)
call will run the task until the next yield
statement and then move on to the next task. This is the basic idea behind the asyncio event loop.
Dealing with blocking IO - asyncio.sleep
example
Now let’s see if we can impelment asyncio.sleep
with coroutines. The idea here is we want to wake up this thread that’s running the async sleep call and check if the time elapsed has reached, if not, we suspend the task and give back control.
import time
def sleep_async(seconds):
start = time.time()
while True:
if time.time() - start >= seconds:
return
yield
def task1():
while True:
print("task1")
yield from sleep_async(1)
def task2():
while True:
print("task2")
yield from sleep_async(2)
event_loop = [task1(), task2()]
while True:
for task in event_loop:
next(task)
You could probably guess what this is doing, the yield from
statement here is similar to await
in asyncio, it suspends the task until the sleep is done. The sleep_async
function is a simple coroutine that checks if the time elapsed is greater than the time we want to sleep for, if not, it yields control back to the event loop.
At its core, the asyncio.sleep
basically does the same thing.
Apply syntactic sugar with async
and await
What does it mean to have a async
keyword in front of a method in Python? Well, it turned out things are Avaiable
if it implements the __await__
method. This is how we can implement the sleep
function with the async
keyword.
This is an exampel of how await
is implemented on asyncio.Future
:
def __await__(self):
if not self.done():
self._asyncio_future_blocking = True
yield self
if not self.done():
raise RuntimeError("Event loop stopped before done")
return self.result()
Since we cannot give __await__
to a function, we need to create a class to encapsulate the state of the coroutine. Let’s create a simple Task
wrapper for this:
event_loop = Queue()
class Task:
def __init__(self, coro):
self.coro = coro
self.finished = False
self.res = None
def done(self):
return self.finished
def result(self):
return self.res
def __await__(self):
while not self.done():
yield self
return self.result()
@staticmethod
def new(coro : Generator) -> Task:
"""Create a new task from a coroutine"""
task = Task(coro)
event_loop.put(Task(coro))
return task
When we call await
on an instance of the Task
, it will yield until someone else sets the finished
state to True
.
from queue import Queue
event_loop = Queue()
def run(main: Generator) -> None:
event_loop.put(Task(main))
while not event_loop.empty():
task = event_loop.get()
try:
# wakes up the task and runs until the next yield
task.coro.send(None)
except StopIteration as e:
task.finished = True
task.res = e.value
else:
event_loop.put(task)
This code looks more complex now, let’s break it down:
- We have a
Queue
to hold the tasks (this is our simple event loop) - We have a
run
function that takes a generator and puts it in the queue - We then run the event loop until the queue is empty
- We then get the task from the queue and run it until the next
yield
statement - If the task is done, we set the
finished
flag toTrue
, we also extract the StopIteration value and set it to theresult
field of the task - If the task is not done, we put it back in the queue so it can be run again
With this run
method, we can now run our tasks with the await
keyword. For example:
def _sleep(seconds : float) -> None:
start_time = time.time()
while time.time() - start_time < seconds:
yield
async def sleep(seconds : float):
task = Task.new(_sleep(seconds))
return await task
Putting it all together
Putting it all together, we now have the following:
from queue import Queue
from typing import Generator
import time
event_loop = Queue()
class Task:
def __init__(self, coro):
self.coro = coro
self.finished = False
self.res = None
def __repr__(self) -> str:
return f"{self.coro}, finished {self.finished}, res : {self.res}"
def done(self):
return self.finished
def result(self):
return self.res
def __await__(self):
while not self.done():
yield self
return self.result()
@staticmethod
def new(coro) -> 'Task':
"""Create a new task from a coroutine"""
task = Task(coro)
event_loop.put(task)
return task
def run(main: Generator) -> None:
event_loop.put(Task(main))
while not event_loop.empty():
task = event_loop.get()
# print(f"Running task {task}")
try:
# wakes up the task and runs until the next yield
task.coro.send(None)
except StopIteration as e:
task.finished = True
task.res = e.value
else:
event_loop.put(task)
def _sleep(seconds : float):
start_time = time.time()
while (time.time() - start_time) < seconds:
yield
return time.time() - start_time
async def sleep(seconds : float) -> None:
task = Task.new(_sleep(seconds))
return await task
async def task1():
for _ in range(2):
print('Task 1')
await sleep(1)
async def task2():
for _ in range(3):
print('Task 2')
await sleep(2)
async def main():
one = Task.new(task1())
two = Task.new(task2())
await one
await two
print("done")
if __name__ == '__main__':
run(main())
If we run this code, we will see the following:
~/Tmp » python coro.py 146 ↵ bolu@BobookAir
Task 1
Task 2
Task 1
Task 2
Task 2
done
You will notice that there are some commented out print
in the code, if we uncomment them, we’ll see the context switches in action on the event loop:
Running task <coroutine object task2 at 0x102ab82b0>, finished False, res : None
Running task <coroutine object main at 0x102827920>, finished False, res : None
Running task <generator object _sleep at 0x10284af80>, finished False, res : None
Running task <coroutine object task2 at 0x102ab82b0>, finished False, res : None
Running task <coroutine object main at 0x102827920>, finished False, res : None
Running task <generator object _sleep at 0x10284af80>, finished False, res : None
Running task <coroutine object task2 at 0x102ab82b0>, finished False, res : None
Running task <coroutine object main at 0x102827920>, finished False, res : None
If you got this far, I hope you have enjoyed the post. Albeit being simple (we didn’t even do any epoll / io_uring or any kind of OS level IO event integration), this taught me a lot about how asyncio works behind the scenes. It is obviously not as efficient as the C implementation and would not survive any production load, but the mental model is there. I hope you enjoyed this post as much as I did writing it.