1

I am trying to make a progress bar using tqdm for an asyncio function. I've tried following the guidance at:

tqdm for asyncio and asyncio with tqdm and tqdm and coroutines

Here's my code, which is running in a jupyter notebook:

import pandas as pd
from pandas_datareader import data as pdr
import asyncio
# import tqdm
from tqdm.asyncio import tqdm


async def get_prices(index, row):
    try:
        prices = pdr.get_data_yahoo(row['Symbol'], row['Start'], row['End'])
    except Exception as e:
        print('Error',e,row['Symbol'],row['Date'])
        return
    prices['Symbol'] = row['Symbol']
    prices['Key'] = index
    return prices

async def get_stock_data(df):
    # data = await asyncio.gather(*[get_prices(index, row) for index, row in df.iterrows()])
    # data = [await f for f in tqdm(asyncio.as_completed([get_prices(index, row) for index, row in df.iterrows()]), total=len(df)]
    flist = [get_prices(index, row) for index, row in df.iterrows()]
    data = [await f for f in tqdm.as_completed(flist, total=len(df))]
    return data


stocks = ['IBM', 'AAPL', 'C', 'ACTG', 'ACVA', 'ACWI', 'ACWX', 'ACXP', 'ADAG', 'ADAL', 'IBET', 'IBEX', 'IBKR', 'IBOC', 'IBRX', 'IBTB', 'IBTD', 'IBTE', 'IBTF', 'IBTG', 'IBTH', 'IBTI', 'IBTJ', 'IBTK', 'IBTL', 'IBTM', 'IBTX', 'ICAD', 'ICCC', 'ICCH', 'ICCM', 'ICFI', 'ICHR', 'ICLK', 'ICLN', 'ICLR', 'ICMB', 'ICPT', 'ICUI', 'ICVX', 'IDAI', 'IDBA', 'IDCC', 'IDEX', 'IDLB', 'IDN', 'IDRA', 'IDXX', 'IDYA', 'IEA', 'IEAWW', 'IEF', 'IEI', 'IEP', 'IESC', 'IEUS', 'IFBD', 'IFGL', 'IFRX', 'IFV', 'IGAC', 'IGACU', 'IGACW', 'IGF', 'IGIB', 'IGIC', 'IGICW', 'IGMS', 'IGNY', 'IGNYU', 'IGNYW', 'IGOV', 'IGSB', 'IGTA', 'IGTAR', 'IGTAU', 'IGTAW', 'IHRT', 'IHYF', 'III', 'IIII', 'IIIIU', 'IIIIW', 'IIIV', 'IINN', 'IINNW', 'IIVI', 'IIVIP', 'IJT', 'IKNA', 'IKT', 'ILAG', 'ILMN', 'ILPT', 'IMAB', 'IMAC', 'IMACW', 'IMAQ', 'IMAQR', 'IMAQU', 'IMAQW', 'IMBI', 'IMBIL', 'IMCC', 'IMCR', 'IMCV', 'IMGN', 'IMGO', 'IMKTA', 'IMMP', 'IMMR', 'IMMX', 'IMNM', 'IMOS', 'IMPL', 'IMPP', 'IMPPP', 'IMRA', 'IMRN', 'IMRX', 'IMTE', 'IMTX', 'IMTXW', 'IMUX', 'IMV', 'IMVT', 'IMXI', 'INAB', 'INBK', 'INBKZ', 'INBX', 'INCR', 'INCY', 'INDB', 'INDI', 'INDIW', 'INDP', 'INDT', 'INDY', 'INFI', 'INFN', 'INGN', 'INKA', 'INKAU', 'INKAW', 'INKT', 'INM', 'INMB', 'INMD', 'INNV', 'INO', 'INOD', 'INPX', 'INSE', 'INSG', 'INSM', 'INTA', 'INTC', 'INTE', 'INTEU', 'INTEW', 'INTG', 'INTR', 'INTU', 'INTZ', 'INVA', 'INVE', 'INVO', 'INVZ', 'INVZW', 'INZY', 'IOAC', 'IOACU', 'IOACW', 'IOBT', 'IONM', 'IONR', 'IONS', 'IOSP', 'IOVA', 'IPA', 'IPAR', 'IPAX', 'IPAXU', 'IPAXW', 'IPDN', 'IPGP', 'IPHA', 'IPKW', 'IPSC', 'IPVI', 'IPVIU', 'IPVIW', 'IPW', 'IPWR', 'IPX', 'IQ', 'IQMD', 'IQMDU', 'IQMDW', 'IRAA', 'IRAAU', 'IRAAW', 'IRBT', 'IRDM', 'IREN', 'IRIX', 'IRMD', 'IROQ', 'IRTC', 'IRWD', 'ISAA', 'ISDX', 'ISEE']  
df_stocks = pd.DataFrame(stocks, columns=['Symbol'])
df_stocks['Start'] = '9/1/2022'
df_stocks['End'] = '9/11/2022'

data = pd.concat([d for d in await get_stock_data(df_stocks)])
data.dropna(inplace=True)
data.to_csv('../output/stockprices.csv', sep='\t')
data

The code works, but the progress bar does not. I get this output which does not change:

0%|          | 0/214 [00:00<?, ?it/s]

I have also tried from tqdm.autonotebook import tqdm, but that gives the same result.

I am sure I'm doing something boneheaded, but am unable to solve this on my own.

dsagman
  • 481
  • 1
  • 8
  • Would you mind rewriting code as self-contained, minimal reproducible example? It would help people here figure out issue faster! Especially clarifying names like `pdr` and stock data which never been defined in given code. – jupiterbjy Sep 13 '22 at 10:31
  • Updated. Apologies, should have done that. Thanks for pointing out. – dsagman Sep 13 '22 at 15:10

1 Answers1

1

TL;DR

Problem is that Async Function get_prices() doesn't have any Awaitable, hence it's not async but synchronous. You will definitely want to understand what Coroutine is before reading the rest of this.

Since that library pandas-datareader does not define any asynchronous function - you might be better offloading the pdr.get_data_xxx to thread or give up concurrency and use synchronous tqdm.


Explanation

Problem-wise, what you wrote could be simplified as following:

import asyncio
import random
import time
from tqdm.asyncio import tqdm


async def fake_async_task():
    time.sleep(0.5 + random.random())  # <- notice there is no await & awaitable!


async def main():
    tasks = [fake_async_task() for _ in range(10)]
    _ = [await task_ for task_ in tqdm.as_completed(tasks, total=len(tasks))]


# asyncio.run(main()) --> when not in Jupyter
await main() # --> in Jupyter

If you run this, you'll notice this also seemingly jumps the progress from 0% to 100% just before it ends, and total execution takes much longer than 0.5 + α seconds.

  0%|          | 0/10 [00:00<?, ?it/s]

Reason behind it is quite complicated; Throwing a fake async function - aka async func without await - is outside of the documented usages, thus requires looking inside the library codes.

To overly simplify the flows - at cost of accuracy & proper terminology:

  1. We feed 10 fake_async_task() coroutines to tqdm.asyncio.tqdm.as_completed.
  2. tqdm.asyncio.tqdm.as_completed is merely a wrapper for asyncio.as_completed, so tqdm pass all given Awaitable to it then wait for any results.
  3. asyncio.as_completed schedule execution of all given Awaitable, and then schedule Awaitable named _wait_for_one for getting results.
  4. First scheduled fake_async_task() starts running, until next await keyword it encounters.
  5. But we haven't put any await keywords, so coroutine ends up running start to end without stopping.
  6. Same thing happens for other 9 scheduled fake_async_task(), and _wait_for_one is still patiently waiting for it's turn.
  7. when it's finally _wait_for_one's turn, all tasks are already done, so yielding result happens faster than human eyes can see, so is the progress bar's progress changes.

That's why total execution time was addictive, it never really archived any concurrency during execution.

Running functions like fake_async_task() is simply neither tqdm authors nor asyncio had in mind. Others would usually write codes like this instead:

import asyncio
import random
from tqdm.asyncio import tqdm


async def task():
    await asyncio.sleep(0.5 + random.random()) # <- await + something that's awaitable
    # adding random val to prevent it finishing altogether,
    # or progress bar will seemingly jump from 0 to 100 again.


async def main():
    tasks = [task() for _ in range(10)]
    _ = [await task_ for task_ in tqdm.as_completed(tasks, total=len(tasks))]


# asyncio.run(main())
await main()

Which now then print out progress - more like, slow enough so we can see - just as we wanted. Also, total execution time was 0.5 + α seconds, achieving proper concurrency.

 30%|███       | 3/10 [00:00<00:01,  3.98it/s]

Alternatives

But if function you want to use happened to not have async variant, yet it is gladly not CPU intensive, but I/O intensive, then you can offload it to another thread to achieve concurrency while using asynchronous APIs.

import asyncio
import random
import time
from tqdm.asyncio import tqdm


def io_intensive_sync_task():
    time.sleep(0.5 + random.random())

async def main():
    tasks = [asyncio.to_thread(io_intensive_sync_task) for _ in range(10)]
    _ = [await task_ for task_ in tqdm.as_completed(tasks, total=len(tasks))]
    # can skip the total param as tqdm internally use len if not provided


# asyncio.run(main())
await main()

Which will run just as the previous example does.

jupiterbjy
  • 2,882
  • 1
  • 10
  • 28
  • Incredible helpful and clear, and helps make clear what is going on with concurrency in Python. Thank you! – dsagman Sep 14 '22 at 23:36
  • Quick note, when running in Jupyter last line should be `await main()`, per https://stackoverflow.com/questions/55409641/asyncio-run-cannot-be-called-from-a-running-event-loop-when-using-jupyter-no – dsagman Sep 20 '22 at 01:21
  • @dsagman Updated accordingly, nice catch! – jupiterbjy Sep 20 '22 at 02:59