2

I am new to Dask, so kindly forgive me if this question seems silly to you. In Dask, I am working with a Dask dataframe with around 50GB of data. This data is string data that I need to preprocess (fast with the process) before giving it to the machine learning algorithm (fast with threads). Now the problem is that the data frame operations are fast when I design a cluster with respect to processes, but it is slow with respect to threads (but threads are fast with machine learning). Therefore, I am looking for a solution in which we can switch from process to threaded environment.

Currently, I am saving preprocess data with the process cluster and then close it and start a new cluster with a threaded environment to apply the machine learning.

Is there some alternative to solve this problem?

Please help me in this regard.

Vivek kala
  • 23
  • 3

3 Answers3

0

It is possible to start heterogeneous workers from the command line:

# this runs in one shell process
dask-scheduler --scheduler-file scheduler.json

# this runs in another shell process/window
dask-worker --scheduler-file scheduler.json --name multi_proc --nprocs 5 --nthreads 1

# this runs in yet another shell process/window
dask-worker --scheduler-file scheduler.json --name multi_thread --nprocs 1 --nthreads 5

Then within your script/notebook you would connect to the scheduler with client = Client('scheduler.json') and at the time of submission specify the name of the appropriate worker for the task, e.g.

# submit for completion only by the multi_thread worker
results_multi_thread = [client.submit(process_multi_thread, task, workers='multi_thread') for task in task_list]

# submit for completion only by the multi_process worker
results_multi_proc = [client.submit(process_multi_proc, task, workers='multi_proc') for task in task_list]

For multiple workers, you would have to specify unique names (e.g. multi_proc_1, multi_proc_2, etc.), but as you can see this is a fairly involved process, so unless the specifics of your case are such that everything has to happen in one go, I would stick to the solution you are using (two separate clusters) because it's easier to code/maintain and hopefully at some point there will be support for heterogeneous workers.

SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46
0

There are a number of approaches you could take, and the best idea would be for you to measure each. You'll have a much more accurate representation of your actual workflow than anything we could guess!

You can assign abstract resources to your workers, such that you could have as many workers as you need for the process-bound stage, but each worker actually has many threads. With the resources tagging, you could keep only one one thread running per worker. The LocalCluster also allows you to scale up/down the number of workers, so you could "switch" modes when going on to stage 2 by scaling down (all data held in the cluster will be copied to surviving workers).

I think it's fairly rare that people face this "process-only"/"thread-only" pattern. Usually, a mix of threads and processes is good enough to get most workloads done well.

mdurant
  • 27,272
  • 5
  • 45
  • 74
0

In my opinion, the preprocessing part and the machine learning part can be separately developed. In your case, you can save the preprocessed data from dask first, then launch another machine learning system for the training(e.g., tensorflow).

To store the intermediate data, the traditional approach is to use HDFS or S3. But if you are worried about the I/O costs from the external storage, you can try distributed in-memory storage engines like v6d.io, hope the example(https://v6d.io/examples/distributed-learning.html) helps.