I am trying to learn Dask using a small example. Basically I read in a file and calculate row means.
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(cores=4, memory='24 GB')
cluster.scale(4)
from dask.distributed import Client
client = Client(cluster)
import dask
import numpy as np
import dask.dataframe as dd
mytbl = dd.read_csv('test.txt', sep=' ')
row_mean = mytbl.loc[:, mytbl.columns != 'chrom'].apply(np.mean, axis=1, meta=(None, 'float64'))
row_mean = row_mean.compute()
When I run compute
, I can see in Dask dashboard that the memory usage increase very fast, and all the CPUs are also used. But then the memory increased stop, and I see this error:
distributed.utils - ERROR - "('read-csv-apply-67c8f79a72a7499f86a1707b269e0776', 0)"
Traceback (most recent call last):
File "~/miniconda3/lib/python3.8/site-packages/distributed/utils.py", line 668, in log_errors
yield
File "~/miniconda3/lib/python3.8/site-packages/distributed/scheduler.py", line 3785, in add_worker
typename=types[key],
KeyError: "('read-csv-apply-67c8f79a72a7499f86a1707b269e0776', 0)"
distributed.core - ERROR - Exception while handling op register-worker
Traceback (most recent call last):
File "~/miniconda3/lib/python3.8/site-packages/distributed/core.py", line 501, in handle_comm
result = await result
File "~/miniconda3/lib/python3.8/site-packages/distributed/scheduler.py", line 3785, in add_worker
typename=types[key],
KeyError: "('read-csv-apply-67c8f79a72a7499f86a1707b269e0776', 0)"
But the workers are still using CPU, and memory sometime also increase or decrease. I tested this on a smaller dataset, and everything is fine.
So is this error simply a memory issue?
Thanks!