I am trying to use dask.delayed
to compute a large matrix for use in a later calculation. I am only ever running the code on a single local machine. When I use a dask
single-machine scheduler it works fine, but is a little slow. To access more options and performance monitors to improve the code I would like to use dask.distributed
on a single machine. However running the same code with a dask.distributed
client slowly eats up all available memory and crashes without achieving anything.
Is there a different way of setting up the problem that would allow the dask.distributed
client to complete with better memory efficiency?
- I read through the dask.delayed Best Practices guide and think we are using it right.
- I have run it on a local Win 10 PC (64GB RAM) and an Azure Win Server 2012 VM (256 GB), with the same result.
- I have tried setting chunks manually.
- I have tried using
stack.rechunk
to optimise the chunk sizes, including auto chunking by rows and columns (chunks of rows seems to run way faster in thedask
scheduler). - I have tried using
compute()
andpersist()
(same outcome). - I have tried starting the
dask.distributed
client with both threads and processes scheduler and tweaking the number of workers.threads
uses even more RAM faster before dying. - I have tried setting up
dask.distributed
with a memory limitcluster = distributed.LocalCluster(memory_limit = 8e9)
as per this answer, but the memory limit is ignored. - If I reduce the size of the problem (
nX
andnY
below) thedask.distributed
client does complete the task, however it still requires vastly more time and memory than thedask
scheduler.
This example reproduces the problem:
import dask
import distributed
import numpy as np
import dask.array as da
def calcRow(X,Y):
Tx = np.transpose(X * (X + Y)) # Simplified work
return (Tx)
# Specify size of (nY x nX) matrix
nX = 1000000 # Distributed fails with nX >= 1000000 and nY >= 5000
nY = 5000
# Fill with random data
x = np.random.rand(nX,1)
y = np.random.rand(nY,1)
# Setup dask.distributed client.
# Comment out these two lines to use the standard dask scheduler,
# which does work
client = distributed.Client()
client
# Build the matrix
row = dask.delayed(calcRow, pure=True) # Build 1 row
makeRows = [row(x, y[ii]) for ii in range(nY)] # Loop for all nY rows
buildMat = [da.from_delayed(makeRow, dtype=float, shape=(10,nX))
for makeRow in makeRows] # Build matrix
stack = da.vstack(buildMat)
my_matrix = stack.compute() # Calculate the matrix entries
In reality my problems are much larger, and calcRow
is a large, slow, complex calculation itself, but the shape and matrix build steps are the same.
I understand best practice is to scatter
the data into memory before calling compute
, but I don't have a function to scatter, just a delayed
array.
If I comment out the 2 dask.distributed
client lines, the above example runs in 60 secs using a max of 0.25 GB of RAM. But with those lines in, the code climbs to full memory use (64GB) in 3-4 minutes and keeps going until the system becomes unstable.
If I build the matrix in dask
I can then start a dask.distributed
client, and use the matrix in later dask.distributed
calculations with no problem. It is just building the matrix that causes problems.
I almost feel that this is a bug, but can't be sure my code isn't to blame. I would really value suggestions that might can get the code to run, or prove a bug.
EDIT 1:
I also tried appling a decorator to calcRow
:
@dask.delayed
def calcRow(X,Y):
and using:
makeRows = [calcRow(x, y[ii]) for ii in range(nY)]
but that seems to be identical?
EDIT 2:
If I start the distributed.client
with processes=False
it consumes all system memory faster, but actually provides the following warning, which may be diagnostic:
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 40.27 GB -- Worker memory limit: 8.00 GB