5

I upload 130k json files.

I do this with Python:

import os
import json
import pandas as pd

path = "/my_path/"

filename_ending = '.json'


json_list = []

json_files = [file for file in os.listdir(f"{path}") if file.endswith(filename_ending)]

import time
start = time.time()

for jf in json_files:
    with open(f"{path}/{jf}", 'r') as f:

        json_data = json.load(f)

        json_list.append(json_data)

end = time.time()

and it takes 60 seconds.

I do this with multiprocessing:

import os
import json
import pandas as pd
from multiprocessing import Pool
import time

path = "/my_path/"

filename_ending = '.json'

json_files = [file for file in os.listdir(f"{path}") if file.endswith(filename_ending)]


def read_data(name):
    with open(f"/my_path/{name}", 'r') as f:
        json_data = json.load(f)

    return json_data


if __name__ == '__main__':

    start = time.time()

    pool = Pool(processes=os.cpu_count())                       
    x = pool.map(read_data, json_files)     

    end = time.time()

and it takes 53 seconds.

I do this with ray:

import os
import json
import pandas as pd
from multiprocessing import Pool
import time
import ray


path = "/my_path/"

filename_ending = '.json'

json_files = [file for file in os.listdir(f"{path}") if file.endswith(filename_ending)]

start = time.time()

ray.shutdown()
ray.init(num_cpus=os.cpu_count()-1)

@ray.remote    
def read_data(name):
    with open(f"/my_path/{name}", 'r') as f:
        json_data = json.load(f)

    return json_data

all_data = []
for jf in json_files:
    all_data.append(read_data.remote(jf))


final = ray.get(all_data)

end = time.time()

and it takes 146 seconds.

My question is why ray takes so much time?

Is it because:

1) ray is relatively slow for relatively small amount of data?

2) I am doing something wrong in my code?

3) ray is not that useful?

Outcast
  • 4,967
  • 5
  • 44
  • 99

2 Answers2

7

I never used ray, but I'm quite confident, that my explanation should be right.

The original code does a simple json deserialisation. The code requires mostly file IO and just a little bit of CPU. (json deserialisation is rather quick, that's one of the reasons why json is a popular exchange format)

Ray has to push the data from one process to the other (if distributed over multiple machines via the network). In order to do so it is performing some serialisation / deserialisation by itself (perhaps it's using pickle and a robust TCP protocol to push params and to collect results). and probably this overhead is bigger then the work the actual task takes.

If you would do some more calculations with the json data (anything that is more CPU intensive), then you would be able to see a difference.

My guess is, that your example problem is too simple and thus ray's overhead exceeds the benefice of using multiple workers.

In other words. It costs more time / energy to distribute the tasks and to collect the results than it actually takes to perform calculate the result.

gelonida
  • 5,327
  • 2
  • 23
  • 41
  • How do you know that Ray is doing anything network related? – AMC Nov 05 '19 at 03:15
  • 1
    I don't, know whether it's really networking if running on a single host It will if using clusters. For single host setups it could be just local TCP sockets, unix domain sockets, local pipes ore shared memory. but in most cases there is a considerable overhead for serialisation, deserialisation, exchanging the data and synchronizing. I adapted my answer – gelonida Nov 05 '19 at 09:32
  • 1
    I'm pretty sure Ray uses a Redis server for handling inter-process communication, even locally, so there's probably a non-negligible startup penalty. Combine that with the fact that the vast majority of the work here is filesystem I/O, I'm pretty confident @gelonida is correct. – bstovall Sep 24 '20 at 07:19
1

I would say that hypothesis 1) is probably the closest to the truth. Ray seems like a powerful library, but all you’re doing is reading a bunch of files. Is your code just an example for the sake of benchmarking, or part of some larger program? If it is the latter, then it might be interesting to have your benchmark code reflect that.

It's nothing huge, but I tweaked your 3 programs so they should be at least slightly more efficient.


import os
import json


folder_path = "/my_path/"
filename_ending = '.json'

json_files = (os.path.join(folder_path, fp) for fp in os.listdir(f"{folder_path}") if fp.endswith(filename_ending))


def load_json_from_file(file_path):
    with open(file_path, 'r') as file_1:
        return json.load(file_1)


json_list = [load_json_from_file(curr_fp) for curr_fp in json_files]

import os
import json
import multiprocessing as mp


folder_path = "/my_path/"
filename_ending = '.json'

json_files = (os.path.join(folder_path, fp) for fp in os.listdir(f"{folder_path}") if fp.endswith(filename_ending))


def load_json_from_file(file_path):
    with open(file_path, 'r') as file_1:
        return json.load(file_1)


with mp.Pool() as pool:       
    json_list = pool.map(load_json_from_file, json_files)  

import os
import json
import ray

folder_path = "/my_path/"
filename_ending = '.json'


@ray.remote
def load_json_from_file(file_path):
    with open(file_path, 'r') as file_1:
        return json.load(file_1)


json_files = (os.path.join(folder_path, fp) for fp in os.listdir(f"{folder_path}") if fp.endswith(filename_ending))

ray.init()

futures_list = [load_json_from_file.remote(curr_fp) for curr_fp in json_files]

json_list = ray.get(futures_list)

Let me know if you have any questions. If you can run the benchmarks again, I would love to know what difference, if any, there is.

AMC
  • 2,642
  • 7
  • 13
  • 35
  • Hey, thanks for your reply (upvote). I agree that probably hypothesis (1) is more true in my case although it rests to be proved that `Ray` is actually better with bigger data. By the way, regarding your 2nd block of code, how come you do not use the `__main__`? I thought (based on its documentation) that to use `multiprocessing` you have to use `__main__`? – Outcast Nov 05 '19 at 10:06
  • @PoeteMaudit Oops, I forgot about this comment, sorry! It seems that `__main__` is absolutely necessary when using `multiprocessing` on Windows. I use a Mac, but it’s probably a good idea for me to write the `__main__` part anyway. See [here](https://stackoverflow.com/a/20361032/11301900) for an explanation of why it is a must on Windows, and [here](https://stackoverflow.com/q/419163/11301900) for a more general discussion on the subject. – AMC Nov 12 '19 at 03:50
  • Cool but I made also `multiprocessing` running with that: `with mp.Pool(processes=os.cpu_count()-1) as pool: output = pool.map(my_function, input)` so with no `__main__`. – Outcast Nov 12 '19 at 16:38
  • @PoeteMaudit You aren’t using Windows? – AMC Nov 12 '19 at 16:42
  • Ah yes good point. I am running it on a remote server so I do not know what is it. Linux I suppose? – Outcast Nov 12 '19 at 17:13
  • @PoeteMaudit “on a remote server” can you elaborate? What service are you using? – AMC Nov 12 '19 at 17:13
  • Microsoft and Jupyter...so it must be again Windows? In any case it works without __main__ – Outcast Nov 12 '19 at 17:58
  • @PoeteMaudit Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/202217/discussion-between-alexander-cecile-and-poete-maudit). – AMC Nov 12 '19 at 17:58