0

I am trying to write a pyspark udf that uses rate limiting via asyncio and semaphore in a PySpark workflow, but I am not able to make asynchronous calls work.

Using the following example, how can we make pyspark withColumn accept the output of an asynchronous UDF?

from typing import Iterator
import pandas as pd
import asyncio
import aiohttp

from pyspark.sql.functions import pandas_udf, lit, col

# simple dataframe setup
pdf = pd.DataFrame([1, 2, 3], columns=["id"])
df = spark.createDataFrame(pdf)

async def call_api_limit_wrap(session, value, limit):
    
    async with limit:
        await asyncio.sleep(3)
        pokemon_url = f'https://pokeapi.co/api/v2/pokemon/{value}'
        async with session.get(pokemon_url) as resp:
            pokemon = await resp.json()
            print(pokemon['name'])
            return pokemon['name']
    


@pandas_udf("string")
async def call_api_parallel(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    limit = asyncio.Semaphore(1) # 1 calls at a time
    async with aiohttp.ClientSession() as session:
        coroutines = [call_api_limit_wrap(session, id, limit) for id in iterator]
        yield await asyncio.gather(*coroutines)

# results as df
df = df.withColumn("name", call_api_parallel(df["id"]))
display(df)

Expected Output:

id | name
----------
 1   bulbasaur
 2   ivysaur
 3   venusaur  

Error:

PythonException: 'TypeError: 'async_generator' object is not iterable'. Full traceback below

Guides I tried:

Michael K
  • 439
  • 3
  • 13

0 Answers0