Pandas is great for Python because it offers efficient data manipulation and analysis capabilities, leveraging the speed of the underlying NumPy library. How does it behave with asyncio since I could not find much about it.

  • Have an enourmnes dataset
  • call an API with a throughput of 10call at once.

The simple example pandas.DataFrame consists of 100 rows of lorem text:

import lorem
import pandas as pd

df = pd.DataFrame({"Text": [lorem.text() for _ in range(100)]})
>>> df.head()
                                                Text
0  Labore quisquam neque adipisci labore non quae...
1  Aliquam etincidunt dolore dolore voluptatem. A...
2  Aliquam consectetur dolor dolorem dolorem ipsu...
3  Labore non aliquam numquam sed. Eius neque con...
4  Voluptatem ipsum modi amet tempora tempora eti...

Asyncio

If we want to sent every row to an API and that call takes about a second. Let’s consider this method reverses the text and returns the final three letters:

import asyncio
import time

def my_api_call(text: str):
    time.sleep(.1)
    return text[::-1][:3]

async def async_my_api_call(text: str):
    await asyncio.sleep(.1)
    return text[::-1][:3]

For a 100 rows, the synchronous my_api_call takes 10s:

df['new'] = [my_api_call(row) for row in df['Text']]

For a 100 rows, the asynchronous async_my_api_call takes 112ms:

df['new'] = await asyncio.gather(*(async_my_api_call(row) for row in df['Text']))

about 100x faster.

Semaphore

Lets asume the API can only handle 10 requests at once. We can implement a semaphore:

async def async_my_api_call(semaphore: asyncio.Semaphore, text: str):
    async with semaphore:
        await asyncio.sleep(0.1)
        return text[::-1][:3]

semaphore = asyncio.Semaphore(10)
df["new"] = await asyncio.gather(*(async_my_api_call(semaphore, row) for row in df["Text"]))

In 1 second (100 rows by 10 times .1s).

Dask

We can add some parralelism with Dask, a computing library for Pandas data structures with large-scale data processing. We can transfrom our dataframe into a Dask dataframe with 4 partitions:

import dask.dataframe as dd

ddf = dd.from_pandas(df, npartitions=4)

And we call the synchronous API:

ddf["new"] = ddf.map_partitions(
    lambda partition: partition["Text"].apply(my_api_call), meta=(None, str)
)
ddf.compute() # we need to compute, because apply is laze

This takes 2.5 seconds, 4 times faster than with 1 partition.

Dask Queue

https://stackoverflow.com/questions/48667371/semaphores-in-dask-distributed