2

I'm running a hour long computation that fetches an external API, process it and save to a dataframe. The API is using Python's request library.

By tweaking the request lib, I managed to fend off problems related to retries and reading errors, but not all possible problems are handled, of course.

Everytime the API fails, my computation just stops, and I lose one hour worth of work.

I'm calling dask like this:

dd = daskDataFrame.from_pandas(result, npartitions=20)
future = dd.compute()

Is there any way to restart Dask from the point it if failed?

By reading the documentation, there now the Client.retry() function: https://distributed.dask.org/en/latest/api.html#distributed.Client.retry

By I don't know how to use it in my code.

Is the retry function the solution? if yes, how to use it?

I also found this correlated question in SO:

Retries in dask.compute() are unclear

But I don't know if I need to implement the suggested code in the answer, or just call my compute() function with the retries parameter.

Draconar
  • 1,147
  • 1
  • 17
  • 36

1 Answers1

1

By running .compute on the dask dataframe you are converting it into a pandas dataframe in memory. If you want a future object, then you can run:

future = client.compute(dd)
# or
future = client.persist(dd)

Should one of the above fail, you will see error in the status property of the future:

print(future.status)
# will print error if the dataframe could not be computed

It's not clear what is the best way for your use case, but one option is to have a loop that checks status and if it's an error, then retries/restarts the computation, something like this:

from dask.distributed import wait
while future.status!="finished":
    wait(future)
    if future.status == 'error':
        future.retry()
SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46
  • what is the difference between compute() and persist()? – Draconar Jun 13 '21 at 17:10
  • 1
    in this context, not much of a difference, but in general `persist` allows you to compute an object and make sure it remains in the worker memory (so can also be distributed across multiple workers), see https://distributed.dask.org/en/latest/api.html#distributed.Client.persist – SultanOrazbayev Jun 13 '21 at 17:21