I'm trying to run a Dask
cluster alongside a Dash
app to analyze very large data sets. I'm able to run a LocalCluster
successfully and the Dask DataFrame computations occur successfully. The Dash
app is started using the following gunicorn
command:
Unfortunately, my issues occur when I try and move the cluster to coiled
.
coiled.create_software_environment(
name="my-conda-env",
conda={
"channels": ["conda-forge", "defaults"],
"dependencies": ["dask", "dash"],
},
)
coiled.create_cluster_configuration(
name="my-cluster-config",
scheduler_cpu=1,
scheduler_memory="1 GiB",
worker_cpu=2,
worker_memory="1 GiB",
software="my-conda-env"
)
cluster = coiled.Cluster(n_workers=2)
CLIENT = Client(cluster)
dd_bills_df = dd.read_sql_table(
table, conn_string, npartitions=10, index_col='DB_BillID'
)
CLIENT.publish_dataset(bills=dd_bills_df)
del dd_bills_df
log.debug(CLIENT.list_datasets())
x = CLIENT.get_dataset('bills').persist()
log.debug(x.groupby('BillType').count().compute())
The cluster is created and the data set is successfully published to the cluster and then the dataset is successfully pulled by the client into the variable x
. The problem occurs during the groupby()
calculation.
[2021-12-03 17:40:30 -0600] [78928] [ERROR] Exception in worker process
Traceback (most recent call last):
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/arbiter.py", line 589, in spawn_worker
worker.init_process()
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/workers/base.py", line 134, in init_process
self.load_wsgi()
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/workers/base.py", line 146, in load_wsgi
self.wsgi = self.app.wsgi()
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/app/base.py", line 67, in wsgi
self.callable = self.load()
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/app/wsgiapp.py", line 58, in load
return self.load_wsgiapp()
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/app/wsgiapp.py", line 48, in load_wsgiapp
return util.import_app(self.app_uri)
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/util.py", line 359, in import_app
mod = importlib.import_module(module)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1030, in _gcd_import
File "<frozen importlib._bootstrap>", line 1007, in _find_and_load
File "<frozen importlib._bootstrap>", line 986, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 680, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 855, in exec_module
File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
File "/Users/leowotzak/PenHole/test-containers2/src/application.py", line 61, in <module>
log.debug(x.groupby('BillType').count().compute())
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/dask/base.py", line 288, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/dask/base.py", line 571, in compute
results = schedule(dsk, keys, **kwargs)
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/client.py", line 2725, in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/client.py", line 1980, in gather
return self.sync(
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/client.py", line 868, in sync
return sync(
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/utils.py", line 332, in sync
raise exc.with_traceback(tb)
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/utils.py", line 315, in f
result[0] = yield future
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/client.py", line 1845, in _gather
raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ('_read_sql_chunk-5519d13b-b80d-468e-afd5-5f072b9adbec', <WorkerState 'tls://10.4.27.61:40673', name: coiled-dask-leowotzc2-75566-worker-6a0538671d, status: closed, memory: 0, processing: 10>)
This is the log output prior to the crash:
DEBUG:application:Dask DataFrame Structure:
BillName BillType ByRequest Congress EnactedAs IntroducedAt BillNumber OfficialTitle PopularTitle ShortTitle CurrentStatus BillSubjectTopTerm URL TextURL DB_LastModDate DB_CreatedDate
npartitions=10
1.0 object object int64 object object datetime64[ns] object object object object object object object object datetime64[ns] datetime64[ns]
2739.9 ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
24651.1 ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
27390.0 ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
Dask Name: from-delayed, 20 tasks
DEBUG:application:('bills',)
I have tried increasing the memory allocated to each worker and the number of partitions in the Dask DataFrame to no avail. I'm struggling to figure out what is killing the workers, has anyone else run into this error?