Coroutines with async and await

Sweat your CPUs!

GothPy 2017-08-23

Magnus Lyckå

IRQ

GoCD notifications weren't enough...

Jag fick en liten idé...

Idea...

Standard library smtpd

This module offers several classes to implement SMTP (email) servers.

The aiosmtpd package is a recommended replacement for this module. It is based on asyncio and provides a more straightforward API. smtpd should be considered deprecated.

Asyncio???

mail2alert - simplified sequence diagram

+--------+   +----------+   +--------+   +-----------+
|Notifyer|   |Mail2alert|   |REST API|   |Mail Server|
+---+----+   +----+-----+   +----+---+   +------+----+
    |             |              |              |
   +++ SEND      +++             |              |
   | +---------->| | GET        +++             |
   | |           | +----------->| |             |
   | |           | |  200 {...} | |
   | |           | |<-----------+ |             |
   | |           | |            +-+             |
   | |           | | SEND                      +++
   | |           | |-------------------------->| |
   | |           | |                       ACK | + - - -
   | |       ACK | |<--------------------------| |
   | |<----------+ |                           +-+
   +-+           +-+

Concurrent execution of multiple tasks

Coroutine benefits compared with...

Processes

Much less overhead. Always switch context at optimal time.

Threads

Less overhead. Easier to debug. Always switch context at optimal time.

Callbacks

Source code easier to read. Flows like non-concurrent code.

But it can only utilize one CPU core!

Functions, Generators, Coroutines

def my_function(x):
    return x + 1


def my_generator(x):
    for i in range(x):
        yield i


async def my_coroutine(x):
    loop = asyncio.get_event_loop()
    t0 = loop.time()
    await asyncio.sleep(x)
    t1 = loop.time()
    print(t0, t1)

Python Function

>>> def my_function(x):
...     return x + 1
...
>>> my_function
<function my_function at 0x7f2e4b07eea0>
>>> my_function(3)
4
>>>

Python Generator

>>> def my_generator(x):
...     for i in range(x):
...         yield i
...
>>> my_generator
<function my_generator at 0x7f2e49a09840>
>>> g = my_generator(2)
>>> g
<generator object my_generator at 0x7f2e460bf2b0>
>>> next(g)
0
>>> next(g)
1
>>> next(g)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration
>>>

Python 3.5+ coroutine

>>> import asyncio
>>> async def my_coroutine(x):
...     loop = asyncio.get_event_loop()
...     t0 = loop.time()
...     await asyncio.sleep(x)
...     t1 = loop.time()
...     print(t0, t1)
...
>>> my_coroutine
<function my_coroutine at 0x7f2e49a09840>
>>> c = my_coroutine(3)
>>> c
<coroutine object my_coroutine at 0x7f2e460bf2b0>
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(c)
94327.881889242 94330.884326
>>>

Python 3.4 coroutine

>>> import asyncio
>>> @asyncio.coroutine
>>> def my_coroutine(x):
...     loop = asyncio.get_event_loop()
...     t0 = loop.time()
...     yield from asyncio.sleep(x)
...     t1 = loop.time()
...     print(t0, t1)
...
>>> my_coroutine
<function my_coroutine at 0x7f2e459519d8>
>>> c = my_coroutine(4)
>>> c
<generator object my_coroutine at 0x7f2e460bf3b8>
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(c)
95398.736966465 95402.738235799
>>>

Don't use this!

Timeline

Asyncio concepts

Event Loops

  • The central execution device
  • Register, execute & cancel delayed calls
  • Create client and server transports
  • Launch subprocesses
  • Delegate costly function calls to threadpools
  • Several implementations
  • SelectorEventLoop - Default, limited to sockets in Windows
  • ProactorEventLoop - Only Windows, IOCP
  • uvloop - 3rd party, based on libuv
  • tokio - 3rd party, based on Rust event loop tokio-rs.

Event Loop objects

loop = asyncio.get_event_loop()

loop.create_task( coroutine )

loop.run_until_complete( coroutine or task )

loop.run_forever()

loop.call_*( function, *args)

loop.time()

loop.stop()

loop.close()

...

Event Loop Hello World

import asyncio

def hello_world(loop):
    print('Hello World')
    loop.stop()

loop = asyncio.get_event_loop()

# Schedule a call to hello_world()
loop.call_soon(hello_world, loop)

# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()

uvloop

https://github.com/MagicStack/uvloop

Transports & Protocols

Borrowed from Twisted

Transports

E.g. TCP, UDP, Pipes

Protocols

E.g. HTTP, echo

You're likely to stick to standard transports, but to subclass asyncio.Protocol unless you just use HTTP etc. There are examples in the docs.

Futures

import asyncio

async def slow_operation(future):
    await asyncio.sleep(1)
    future.set_result('Future is done!')

loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
loop.run_until_complete(future)
print(future.result())
loop.close()

Tasks

"Subclass of Future. Wrapper around coroutine to schedule it for execution.

A task is responsible for executing a coroutine object in an event loop.

If the wrapped coroutine yields from a future, the task suspends the execution of the wrapped coroutine and waits for the completion of the future.

When the future is done, the execution of the wrapped coroutine restarts with the result or the exception of the future."

Handle

class asyncio.Handle

A callback wrapper object returned by loop.call_soon(), loop.call_soon_threadsafe(), loop.call_later(), and loop.call_at().

cancel()

Cancel the call. If the callback is already canceled or executed, this method has no effect.

Event Loop Hello World

import asyncio

def hello_world(loop):
    print('Hello World')
    loop.stop()

loop = asyncio.get_event_loop()

# Schedule a call to hello_world()
handle = loop.call_soon(hello_world, loop)

# we could...
handle.cancel()

# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()

Async generators and comprehension

async def ticker(delay, to):
    for i in range(to):
        yield i
        await asyncio.sleep(delay)


result = [i async for i in aiter() if i % 2]


result = [await fun() for fun in funcs if await condition()]

Async for???

async for i in f():
    ....

VS

for i in await f():
    ....

Synchronization primitives

Locks
  • Lock
  • Event
  • Condition
Semaphores
  • Semaphore
  • BoundedSemaphore

Very similar to those in the threading module, but since there is no preemptive scheduling, they aren't needed so often.

Threadpool interface

If you can't avoid blocking I/O, you can hand over work to a concurrent.futures.ThreadPoolExecutor or a concurrent.futures.ProcessPoolExecutor.

loop.run_in_executor(executor, func, *args)

Asynchronous Context Managers

A context manager which is able to suspend execution in its enter and exit methods.

class AsyncContextManager:
    async def __aenter__(self):
        await log('entering context')

    async def __aexit__(self, exc_type, exc, tb):
        await log('exiting context')

...

async def commit(session, data):
    ...

    async with session.transaction():
        ...
        await session.update(data)
        ...

Don't use blocking I/O!

Use async replacements!

Split up all long loops!

Or use the threadpool etc

Too confusing?

"Man that thing is complex and it keeps getting more complex. I do not have the mental capacity to casually work with asyncio."

-- Armin Ronacher

http://lucumr.pocoo.org/2016/10/30/i-dont-understand-asyncio/

Why is he mixing multi-threading with asyncio?

Callback soup considered harmful

Your async/await functions are dumplings of local structure floating on top of callback soup, and this has far-reaching implications for the simplicity and correctness of your code.

-- Nathaniel J. Smith

https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/

Minimal knowledge...

asyncio.get_event_loop()

You know this by now...

loop.create_task(coroutine)

Schedule the execution of a coroutine object.

Wrap it in a task object and return that task.

loop.run_until_complete(coroutine)

Pass in a coroutine or a future(task).

loop.run_forever()

After you created tasks...

asyncio.gather(coroutines_or_futures, ...)

Return a future aggregating results from the given coroutine objects or futures.

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number+1):
        print("Task %s: Compute factorial(%s)..." % (name, i))
        await asyncio.sleep(1)
        f *= i
    print("Task %s: factorial(%s) = %s" % (name, number, f))

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
    factorial("A", 2),
    factorial("B", 3),
    factorial("C", 4),
))
loop.close()

loop.run_in_executor(executor, function, args, ...)

Call a function in an Executor (pool of threads or pool of processes). By default, an event loop uses a thread pool executor (ThreadPoolExecutor).

Returns a coroutine.

Some code examples...

Some networking libraries

Not only networking...

Testing with asyncio

Debugging with asyncio

if args.verbose:
    logging.getLogger('asyncio').setLevel(logging.DEBUG)

    # Enable debugging
    event_loop.set_debug(True)

    # Make the threshold for "slow" tasks very very small for
    # illustration. The default is 0.1, or 100 milliseconds.
    event_loop.slow_callback_duration = 0.001

    # Report all mistakes managing asynchronous resources.
    warnings.simplefilter('always', ResourceWarning)

...

$ export PYTHONASYNCIODEBUG=1

References

SpaceForward
Right, Down, Page DownNext slide
Left, Up, Page UpPrevious slide
POpen presenter console
HToggle this help