4

I am new into Python and Rapids.AI and I am trying to recreate SKLearn KMeans in a multinode GPU (I have 2 GPUs) using Dask and RAPIDs (I am using rapids with its docker, which mounts a Jupyter Notebook too).

The code I show below (also I show an example of the Iris dataset) freezes and the jupyter notebook cell is never ended. I tried to use %debug magic key and also Dask dashboard but I did not draw any clear conclusions (the only conclusion I think that could due to device_m_csv.iloc, but I am not sure about it). Another thing that could be is I am forgetting some wait() or compute() or persistent() (really, I am not sure on what occasions they should be used correctly).

I will explain the code, for a better reading:

  • First of all, do needed imports
  • Next, starts with KMeans algorithm (delimiter: #######################...)
  • Create a CUDA cluster with 2 workers, one per GPU (I have 2 GPUs) and 1 thread for worker (I have read this is the recommended value) and start a client
  • Read dataset from CSV making 2 partitions (chunksize = '2kb')
  • Split previous dataset into data (more known as X) and labels ( (more known as y)
  • Instantiate cu_KMeans using Dask
  • Fit the model
  • Predict values
  • Check the obtained score

Sorry for not being able to offer more data, but I couldn't get it. Whatever is necessary to solve the doubt I will be happy to offer it.

Where or what can you think the problem is?.

Thank you very much in advance.

%%time

# Import libraries and show its versions
import numpy as np; print('NumPy Version:', np.__version__)
import pandas as pd; print('Pandas Version:', pd.__version__)
import sklearn; print('Scikit-Learn Version:', sklearn.__version__)
import nvstrings, nvcategory
import cupy; print('cuPY Version:', cupy.__version__)
import cudf; print('cuDF Version:', cudf.__version__)
import cuml; print('cuML Version:', cuml.__version__)
import dask; print('Dask Version:', dask.__version__)
import dask_cuda; print('DaskCuda Version:', dask_cuda.__version__)
import dask_cudf; print('DaskCuDF Version:', dask_cudf.__version__)
import matplotlib; print('MatPlotLib Version:', matplotlib.__version__)
import seaborn as sns; print('SeaBorn Version:', sns.__version__)
#import timeimport warnings

from dask import delayed
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster, wait
from dask_ml.cluster import KMeans as skmKMeans
from dask_cuda import LocalCUDACluster

from sklearn import metrics
from sklearn.cluster import KMeans as skKMeans
from sklearn.metrics import adjusted_rand_score as sk_adjusted_rand_score, silhouette_score as sk_silhouette_score
from cuml.cluster import KMeans as cuKMeans
from cuml.dask.cluster.kmeans import KMeans as cumKMeans
from cuml.metrics import adjusted_rand_score as cu_adjusted_rand_score

# Configure matplotlib library
import matplotlib.pyplot as plt
%matplotlib inline

# Configure seaborn library
sns.set()
#sns.set(style="white", color_codes=True)
%config InlineBackend.figure_format = 'svg'

# Configure warnings
#warnings.filterwarnings("ignore")


####################################### KMEANS #############################################################
# Create local cluster
cluster = LocalCUDACluster(n_workers=2, threads_per_worker=1)
client = Client(cluster)

# Identify number of workers
n_workers = len(client.has_what().keys())

# Read data in host memory
device_m_csv = dask_cudf.read_csv('./DataSet/iris.csv', header = 0, delimiter = ',', chunksize='2kB') # Get complete CSV. Chunksize is 2kb for getting 2 partitions
#x = host_data.iloc[:, [0,1,2,3]].values
device_m_data = device_m_csv.iloc[:, [0, 1, 2, 3]] # Get data columns
device_m_labels = device_m_csv.iloc[:, 4] # Get labels column

# Plot data
#sns.pairplot(device_csv.to_pandas(), hue='variety');

# Define variables
label_type = { 'Setosa': 1, 'Versicolor': 2, 'Virginica': 3 } # Dictionary of variables type

# Create KMeans
cu_m_kmeans = cumKMeans(init = 'k-means||',
                     n_clusters = len(device_m_labels.unique()),
                     oversampling_factor = 40,
                     random_state = 0)
# Fit data in KMeans
cu_m_kmeans.fit(device_m_data)

# Predict data
cu_m_kmeans_labels_predicted = cu_m_kmeans.predict(device_m_data).compute()

# Check score
#print('Cluster centers:\n',cu_m_kmeans.cluster_centers_)
#print('adjusted_rand_score: ', sk_adjusted_rand_score(device_m_labels, cu_m_kmeans.labels_))
#print('silhouette_score: ', sk_silhouette_score(device_m_data.to_pandas(), cu_m_kmeans_labels_predicted))

# Close local cluster
client.close()
cluster.close()

Iris dataset example:

IrisDatasetExample


EDIT 1

@Corey, it is my ouput using your code:

NumPy Version: 1.17.5
Pandas Version: 0.25.3
Scikit-Learn Version: 0.22.1
cuPY Version: 6.7.0
cuDF Version: 0.12.0
cuML Version: 0.12.0
Dask Version: 2.10.1
DaskCuda Version: 0+unknown
DaskCuDF Version: 0.12.0
MatPlotLib Version: 3.1.3
SeaBorn Version: 0.10.0
Cluster centers:
           0         1         2         3
0  5.006000  3.428000  1.462000  0.246000
1  5.901613  2.748387  4.393548  1.433871
2  6.850000  3.073684  5.742105  2.071053
adjusted_rand_score:  0.7302382722834697
silhouette_score:  0.5528190123564102
JuMoGar
  • 1,740
  • 2
  • 19
  • 46

2 Answers2

3

I modified your reproducible example slightly and was able to produce an output on the most recent nightly of RAPIDS.

This is the output of the script.

(cuml_dev_2) cjnolet@deeplearn ~ $ python ~/kmeans_mnmg_reproduce.py 
NumPy Version: 1.18.1
Pandas Version: 0.25.3
Scikit-Learn Version: 0.22.2.post1
cuPY Version: 7.2.0
cuDF Version: 0.13.0a+3237.g61e4d9c
cuML Version: 0.13.0a+891.g4f44f7f
Dask Version: 2.11.0+28.g10db6ba
DaskCuda Version: 0+unknown
DaskCuDF Version: 0.13.0a+3237.g61e4d9c
MatPlotLib Version: 3.2.0
SeaBorn Version: 0.10.0
/share/software/miniconda3/envs/cuml_dev_2/lib/python3.7/site-packages/dask/array/random.py:27: FutureWarning: dask.array.random.doc_wraps is deprecated and will be removed in a future version
  FutureWarning,
/share/software/miniconda3/envs/cuml_dev_2/lib/python3.7/site-packages/distributed/dashboard/core.py:79: UserWarning: 
Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.
  warnings.warn("\n" + msg)
bokeh.server.util - WARNING - Host wildcard '*' will allow connections originating from multiple (or possibly all) hostnames or IPs. Use non-wildcard values to restrict access explicitly
Cluster centers:
           0         1         2         3
0  5.883607  2.740984  4.388525  1.434426
1  5.006000  3.428000  1.462000  0.246000
2  6.853846  3.076923  5.715385  2.053846
adjusted_rand_score:  0.7163421126838475
silhouette_score:  0.5511916046195927

And here is the modified script that produced this output:

    # Import libraries and show its versions
    import numpy as np; print('NumPy Version:', np.__version__)
    import pandas as pd; print('Pandas Version:', pd.__version__)
    import sklearn; print('Scikit-Learn Version:', sklearn.__version__)
    import nvstrings, nvcategory
    import cupy; print('cuPY Version:', cupy.__version__)
    import cudf; print('cuDF Version:', cudf.__version__)
    import cuml; print('cuML Version:', cuml.__version__)
    import dask; print('Dask Version:', dask.__version__)
    import dask_cuda; print('DaskCuda Version:', dask_cuda.__version__)
    import dask_cudf; print('DaskCuDF Version:', dask_cudf.__version__)
    import matplotlib; print('MatPlotLib Version:', matplotlib.__version__)
    import seaborn as sns; print('SeaBorn Version:', sns.__version__)
    #import timeimport warnings

    from dask import delayed
    import dask.dataframe as dd
    from dask.distributed import Client, LocalCluster, wait
    from dask_ml.cluster import KMeans as skmKMeans
    from dask_cuda import LocalCUDACluster

    from sklearn import metrics
    from sklearn.cluster import KMeans as skKMeans
    from sklearn.metrics import adjusted_rand_score as sk_adjusted_rand_score, silhouette_score as sk_silhouette_score
    from cuml.cluster import KMeans as cuKMeans
    from cuml.dask.cluster.kmeans import KMeans as cumKMeans
    from cuml.metrics import adjusted_rand_score as cu_adjusted_rand_score
    # Configure matplotlib library
    import matplotlib.pyplot as plt

    # Configure seaborn library
    sns.set()
    #sns.set(style="white", color_codes=True)
    # Configure warnings
    #warnings.filterwarnings("ignore")


    ####################################### KMEANS #############################################################
    # Create local cluster
    cluster = LocalCUDACluster(n_workers=2, threads_per_worker=1)
    client = Client(cluster)

    # Identify number of workers
    n_workers = len(client.has_what().keys())

    # Read data in host memory
    from sklearn.datasets import load_iris

    loader = load_iris()

    #x = host_data.iloc[:, [0,1,2,3]].values
    device_m_data = dask_cudf.from_cudf(cudf.from_pandas(pd.DataFrame(loader.data)), npartitions=2) # Get data columns
    device_m_labels = dask_cudf.from_cudf(cudf.from_pandas(pd.DataFrame(loader.target)), npartitions=2)

    # Plot data
    #sns.pairplot(device_csv.to_pandas(), hue='variety');

    # Define variables
    label_type = { 'Setosa': 1, 'Versicolor': 2, 'Virginica': 3 } # Dictionary of variables type

    # Create KMeans
    cu_m_kmeans = cumKMeans(init = 'k-means||',
                     n_clusters = len(np.unique(loader.target)),
                     oversampling_factor = 40,
                     random_state = 0)
    # Fit data in KMeans
    cu_m_kmeans.fit(device_m_data)

    # Predict data
    cu_m_kmeans_labels_predicted = cu_m_kmeans.predict(device_m_data).compute()

    # Check score
    print('Cluster centers:\n',cu_m_kmeans.cluster_centers_)
    print('adjusted_rand_score: ', sk_adjusted_rand_score(loader.target, cu_m_kmeans_labels_predicted.values.get()))
    print('silhouette_score: ', sk_silhouette_score(device_m_data.compute().to_pandas(), cu_m_kmeans_labels_predicted))

    # Close local cluster
    client.close()
    cluster.close()

Can you please provide your output for the versions of these libraries? I would recommend also running the modified script and see if this runs successfully for you. If not, we can dive in further to find out if it's Docker-related, RAPIDS version related, or something else.

If you have access to the command-prompt that's running your Jupyter notebook, it might be helpful to enable logging by passing in verbose=True when constructing the KMeans object. This can help us isolate where things are getting stuck.

  • Thank you, Corey, yout code works! We get different cluster centers order but it is normal. So, as I can see, the problem was doing an `iloc` to a `dask_cuda` dataframe, right?. Also, if it is not so much trouble... Could you clarify me when to use `.compute()`, `wait()` and `.persistent()`? I have been reading about it but I am not sure when to use them or when not. For example, on `# Predict data` is used `compute()` (as I see on the doc) but I do not understand why there. I am not clear about these three concepts. Thank you again. – JuMoGar Mar 07 '20 at 17:52
  • 1
    I wrote a full answer below because stack overflow doesn’t give me enough characters to explain it fully in a comment. TLDR: you want to be aware of whether operations you are performing are lazy or eager. Calling persist will materialize an lazy (delayed) execution, turning it into an eager execution (future). Wait should be done only on futures because a lazy object will not materialize until it is turned into futures. Compute causes results to materialize but also transfers them to client. – Corey J. Nolet Mar 14 '20 at 15:49
  • Thank you so much, @Corey J. Nolet for your answer. I have just read it and was very very helpful :) – JuMoGar Mar 14 '20 at 19:31
1

The Dask documentation is really good and extensive, though I admit sometimes the flexibility and amount of features if provides can be a little overwhelming. I think it helps to see Dask as an API for distributed computing that gives the user control over a few different layers of execution, each layer providing more fine-grained control.

compute(), wait() and persist() are concepts that come from the manner in which the tasks that underly a series of distributed computations are scheduled on a set of workers. What's common to all of these computations is an execution graph that represents remote tasks and their inter-dependencies. At some point, this execution graph gets scheduled on a set of workers. Dask provides two APIs, depending on whether the tasks underlying the graph are scheduled immediately (eagerly) or whether the computation needs to be triggered manually (lazily).

Both of these APIs build the execution graph as tasks are created that depend on the results of other tasks. The former uses the dask.futures API for immediate asynchronous execution, the results of which you may sometimes want to wait() on before doing other operations. The dask.delayed API is used for lazy executions and requires the invocation of methods like compute() or persist() in order to begin computation.

Most often, users of libraries like RAPIDS are more concerned with manipulating their data and aren't as concerned with how those manipulations are scheduled on the set of workers. The dask.dataframe and dask.array objects are built on top of the delayed and futures APIs. Most users interact with these data structures rather than interacting with delayed and futures objects, but it's not a bad idea to be aware of them if you should ever need to do some data transformations outside of what the distributed dataframe and array objects provide.

dask.dataframe and dask.array both build lazy execution graphs where at all possible and provide a compute() method to materialize the graph and return the result to the client. They both also provide a persist() method to start computation asynchronously in the background. wait() is useful if you want to begin computation in the background but do not want to return the results to the client.

I hope this is helpful.

  • Thank you so much, @Corey J. Nolet. Your explanation is very very helpful. I greatly appreciate the time spent writing this response. Now I understand much better the Dask concepts. Thank you so much, again :) – JuMoGar Mar 14 '20 at 19:30