0

How can I do dask_ml preprocessing in a dask distributed cluster? My dataset is about 200GB and Every time I categorize the dataset preparing for OneHotEncoding, it looks like dask is ignoring the client and try to load the dataset in the local machine's memory. Maybe I miss something:

from dask_ml.preprocessing import Categorizer, DummyEncoder

from sklearn.linear_model import LogisticRegression

from sklearn.pipeline import make_pipeline

import pandas as pd

import dask.dataframe as dd

df = dd.read_csv('s3://some-bucket/files*.csv', dtypes={'column': 'category'})
    
pipe = make_pipeline(
   Categorizer(),
   DummyEncoder(),
   LogisticRegression(solver='lbfgs')
)


pipe.fit(df, y)
wml
  • 1

1 Answers1

0

Two immediate things to address:

  • You have not instantiated a distributed scheduler in your code.
  • You should probably use the LogisticRegression instance from dask-ml rather than scikit-learn.

Working Code Example

Below is a minimal code example that works.

Note that the preprocessing functions accept only Dask Dataframes while the LogisticRegression estimator accepts only Dask arrays. You can split the pipeline or use a custom FunctionTransformer (from this answer). See this open Dask issue for more context.

from dask_ml.preprocessing import Categorizer, DummyEncoder
from dask_ml.linear_model import LogisticRegression
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import FunctionTransformer

import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client

client = Client()

from dask_ml.datasets import make_classification
X, y = make_classification(chunks=50)

# define custom transformers to include in pipeline
def trans_array(array):
    return dd.from_array(array)
transform_array = FunctionTransformer(trans_array)

def trans_df(dataframe):
    return dataframe.to_dask_array(lengths=True)
transform_df = FunctionTransformer(trans_df)

pipe = make_pipeline(
    transform_array,
    Categorizer(),
    DummyEncoder(),
    transform_df,
    LogisticRegression(solver='lbfgs')
)

pipe.fit(X,y)

rrpelgrim
  • 342
  • 2
  • 13