Pandas is great for Python because it offers efficient data manipulation and analysis capabilities, leveraging the speed of the underlying NumPy library. How does it behave with asyncio
since I could not find much about it.
- Have an enourmnes dataset
- call an API with a throughput of 10call at once.
The simple example pandas.DataFrame
consists of 100 rows of lorem text:
import lorem
import pandas as pd
df = pd.DataFrame({"Text": [lorem.text() for _ in range(100)]})
>>> df.head()
Text
0 Labore quisquam neque adipisci labore non quae...
1 Aliquam etincidunt dolore dolore voluptatem. A...
2 Aliquam consectetur dolor dolorem dolorem ipsu...
3 Labore non aliquam numquam sed. Eius neque con...
4 Voluptatem ipsum modi amet tempora tempora eti...
Asyncio
If we want to sent every row to an API and that call takes about a second. Let’s consider this method reverses the text and returns the final three letters:
import asyncio
import time
def my_api_call(text: str):
time.sleep(.1)
return text[::-1][:3]
async def async_my_api_call(text: str):
await asyncio.sleep(.1)
return text[::-1][:3]
For a 100 rows, the synchronous my_api_call
takes 10s:
df['new'] = [my_api_call(row) for row in df['Text']]
For a 100 rows, the asynchronous async_my_api_call
takes 112ms:
df['new'] = await asyncio.gather(*(async_my_api_call(row) for row in df['Text']))
about 100x faster.
Semaphore
Lets asume the API can only handle 10 requests at once. We can implement a semaphore:
async def async_my_api_call(semaphore: asyncio.Semaphore, text: str):
async with semaphore:
await asyncio.sleep(0.1)
return text[::-1][:3]
semaphore = asyncio.Semaphore(10)
df["new"] = await asyncio.gather(*(async_my_api_call(semaphore, row) for row in df["Text"]))
In 1 second (100 rows by 10 times .1s).
Dask
We can add some parralelism with Dask, a computing library for Pandas data structures with large-scale data processing. We can transfrom our dataframe into a Dask dataframe with 4 partitions:
import dask.dataframe as dd
ddf = dd.from_pandas(df, npartitions=4)
And we call the synchronous API:
ddf["new"] = ddf.map_partitions(
lambda partition: partition["Text"].apply(my_api_call), meta=(None, str)
)
ddf.compute() # we need to compute, because apply is laze
This takes 2.5 seconds, 4 times faster than with 1 partition.
Dask Queue
https://stackoverflow.com/questions/48667371/semaphores-in-dask-distributed