You can run multiple async tasks simultaneously in Python using the asyncio native library. This allows you to create and manage asynchronous tasks, such as coroutines, and run them concurrently.

Let’s have the following async method, that counts to the length of the given name and returns the name.

import asyncio
from typing import Awaitable


async def count_word(name: str) -> Awaitable[str]:
    if len(name) > 8:
        raise ValueError(f"{name} is too long...")
    for ii in range(len(name)):
        print(name, ii)
        await asyncio.sleep(1)
    return name

Now, running this task twice:

await count_word("first"), await count_word("second")
first 0
first 1
first 2
first 3
first 4
second 0
second 1
second 2
second 3
second 4
second 5

('first', 'second')

Will not run the two tasks concurrently because it’s using await, which means that it will wait for the first task to complete before starting the second task. If you want to run these tasks concurrently, you should use asyncio.gather():

await asyncio.gather(
    *[
        count_word("first"),
        count_word("second"),
        count_word("third"),
        count_word("twenty second"),
    ],
    return_exceptions=True
)
first 0
second 0
third 0
first 1
second 1
third 1
first 2
second 2
third 2
first 3
second 3
third 3
first 4
second 4
third 4
second 5

['first', 'second', 'third', ValueError('twenty second is too long...')]

This call takes 6 seconds, because the longest word has 6 letters. Some notes:

  • the result of asyncio.gather remains the order from the tasks list, which is great.
  • with return_exceptions=True, the event loops keeps running giving the ability to handle exceptions whatever you like.

Limit the rate of concurrency

If you want to limit the rate of concurrency to a specific number of simultaneously running tasks, you can use a semaphore from the asyncio library. A semaphore is a synchronization primitive that can help you control the number of concurrent tasks. Here’s how you can use a semaphore to limit concurrency to two tasks:

async def count_word(semaphore: asyncio.Semaphore, name: str):
    async with semaphore:
        if len(name) > 8:
            raise ValueError(f"{name} is too long...")
        for ii in range(len(name)):
            print(name, ii)
            await asyncio.sleep(1)
        return name


semaphore = asyncio.Semaphore(2)
await asyncio.gather(
    *[
        count_word(semaphore, "first"),
        count_word(semaphore, "second"),
        count_word(semaphore, "third"),
    ],
    return_exceptions=True,
)
first 0
second 0
first 1
second 1
first 2
second 2
first 3
second 3
first 4
second 4
second 5
third 0
third 1
third 2
third 3
third 4

['first', 'second', 'third']

It only starts on the third word after it is finished with the first!

Async LLM summarization

In my previous post I applied a MapReduce method to get a summary of a 36 page pdf. In 6 chunks, this took about 30 seconds to generate. I assume this can be further optimized using async.

First, rewrite the summarize_text method to async and use the async openai.ChatCompletion.acreate method:

async def summarize_text(semaphore: asyncio.Semaphore, content: str) -> Awaitable[str]:
    async with semaphore:
        _prompt = (
            f"""Write a concise summary of the following:\n\n"{content}"\n\nCONCISE SUMMARY:"""
        )
        completion = await openai.ChatCompletion.acreate(
            model="gpt-3.5-turbo",
            temperature=0,
            messages=[{"role": "user", "content": _prompt}],
        )
        return completion.choices[0].message.content

Next, gather the async tasks and run:

import numpy as np

semaphore = asyncio.Semaphore(10)
summaries = await asyncio.gather(
    *[
    summarize_text(semaphore, encoding.decode(token_chunk))
    for token_chunk in np.array_split(tokens, 6)
],
    return_exceptions=True,
)
await summarize_text(semaphore, "\n\n".join(summaries))

This will get the summary down under 10 seconds!