6

I am trying to use dask.delayed to compute a large matrix for use in a later calculation. I am only ever running the code on a single local machine. When I use a dask single-machine scheduler it works fine, but is a little slow. To access more options and performance monitors to improve the code I would like to use dask.distributed on a single machine. However running the same code with a dask.distributed client slowly eats up all available memory and crashes without achieving anything.

Is there a different way of setting up the problem that would allow the dask.distributed client to complete with better memory efficiency?

  • I read through the dask.delayed Best Practices guide and think we are using it right.
  • I have run it on a local Win 10 PC (64GB RAM) and an Azure Win Server 2012 VM (256 GB), with the same result.
  • I have tried setting chunks manually.
  • I have tried using stack.rechunk to optimise the chunk sizes, including auto chunking by rows and columns (chunks of rows seems to run way faster in the dask scheduler).
  • I have tried using compute() and persist() (same outcome).
  • I have tried starting the dask.distributed client with both threads and processes scheduler and tweaking the number of workers. threads uses even more RAM faster before dying.
  • I have tried setting up dask.distributed with a memory limit cluster = distributed.LocalCluster(memory_limit = 8e9) as per this answer, but the memory limit is ignored.
  • If I reduce the size of the problem (nX and nY below) the dask.distributed client does complete the task, however it still requires vastly more time and memory than the dask scheduler.

This example reproduces the problem:

import dask
import distributed
import numpy as np
import dask.array as da

def calcRow(X,Y):
    Tx = np.transpose(X * (X + Y)) # Simplified work
    return (Tx)

# Specify size of (nY x nX) matrix
nX = 1000000 #  Distributed fails with nX >= 1000000 and nY >= 5000
nY = 5000

# Fill with random data
x = np.random.rand(nX,1)
y = np.random.rand(nY,1)

# Setup dask.distributed client.
# Comment out these two lines to use the standard dask scheduler,
# which does work 
client = distributed.Client()
client

# Build the matrix
row = dask.delayed(calcRow, pure=True)   # Build 1 row
makeRows = [row(x, y[ii]) for ii in range(nY)] # Loop for all nY rows
buildMat = [da.from_delayed(makeRow, dtype=float, shape=(10,nX))
            for makeRow in makeRows] # Build matrix
stack = da.vstack(buildMat)
my_matrix = stack.compute() # Calculate the matrix entries

In reality my problems are much larger, and calcRow is a large, slow, complex calculation itself, but the shape and matrix build steps are the same.

I understand best practice is to scatter the data into memory before calling compute, but I don't have a function to scatter, just a delayed array.

If I comment out the 2 dask.distributed client lines, the above example runs in 60 secs using a max of 0.25 GB of RAM. But with those lines in, the code climbs to full memory use (64GB) in 3-4 minutes and keeps going until the system becomes unstable.

If I build the matrix in dask I can then start a dask.distributed client, and use the matrix in later dask.distributed calculations with no problem. It is just building the matrix that causes problems.

I almost feel that this is a bug, but can't be sure my code isn't to blame. I would really value suggestions that might can get the code to run, or prove a bug.

EDIT 1: I also tried appling a decorator to calcRow:

@dask.delayed
def calcRow(X,Y):

and using:

makeRows = [calcRow(x, y[ii]) for ii in range(nY)]

but that seems to be identical?

EDIT 2: If I start the distributed.client with processes=False it consumes all system memory faster, but actually provides the following warning, which may be diagnostic:

distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 40.27 GB -- Worker memory limit: 8.00 GB

Nick W.
  • 61
  • 4
  • What are you using your matrix for? If it is matrix-vector or matrix- matrix multiplication you might want to delay the compute until then, since you would never need to have the full matrix in memory. – Juan Carlos Ramirez Jun 27 '19 at 00:09
  • That is definitely an option for some of the problems @JuanCarlosRamirez . We can call `results = da.dot(stack, x).compute()` and be done. But because of the size of most problems and the complexity of later compute steps (a matrix inversion) we save the matrix to disk using zarr `G = da.to_zarr(stack, path_to_zarr)` either for use in later calculations in this run, or for later runs. That call seems to trigger an implicit `compute()` call, which ends up with the same problem described above. – Nick W. Jun 27 '19 at 04:51

0 Answers0