I am trying to write a pyspark udf that uses rate limiting via asyncio
and semaphore
in a PySpark workflow, but I am not able to make asynchronous calls work.
Using the following example, how can we make pyspark withColumn
accept the output of an asynchronous UDF?
from typing import Iterator
import pandas as pd
import asyncio
import aiohttp
from pyspark.sql.functions import pandas_udf, lit, col
# simple dataframe setup
pdf = pd.DataFrame([1, 2, 3], columns=["id"])
df = spark.createDataFrame(pdf)
async def call_api_limit_wrap(session, value, limit):
async with limit:
await asyncio.sleep(3)
pokemon_url = f'https://pokeapi.co/api/v2/pokemon/{value}'
async with session.get(pokemon_url) as resp:
pokemon = await resp.json()
print(pokemon['name'])
return pokemon['name']
@pandas_udf("string")
async def call_api_parallel(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
limit = asyncio.Semaphore(1) # 1 calls at a time
async with aiohttp.ClientSession() as session:
coroutines = [call_api_limit_wrap(session, id, limit) for id in iterator]
yield await asyncio.gather(*coroutines)
# results as df
df = df.withColumn("name", call_api_parallel(df["id"]))
display(df)
Expected Output:
id | name
----------
1 bulbasaur
2 ivysaur
3 venusaur
Error:
PythonException: 'TypeError: 'async_generator' object is not iterable'. Full traceback below
Guides I tried: