0

I have run my script in an instance of 18Gb of ram, 4 CPU, and 20 Gb of a disk in both use cases

  1. My use case is (read line by line):

Read line by line and process every 500000 lines where the script to make cleaning data(add columns, convert to specific type), convert chunk data to particular file type ( parquet), and load data to s3, all done with pandas library. I want to clarify that every 500000 lines are running concurrently by child process (I am using multiprocessing).

My core script in program:

 s3_object = s3r.Object(bucket_name=cfg['BUCKETS']['INPUT_PATH'], key=s3_key_input)
    chunk = ""
    i = 0
    err = 0    
    fs = []
    columns_name_list, sep = build_custom_scheme(s3r, params, fs_config, cfg)

    if sep == "\\t": sep = "\t"
    dataset_config = {"colunms": columns_name_list, "separator": sep}

    with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:

        for ln in s3_object.get()['Body'].iter_lines():
            try:
                i += 1
                
                chunk += str(ln, 'utf-8') + '\n'

            except Exception as e:
                err += 1
                _logger.warning("Numero de errores de encoding {0} : ".format(err))

            if i % 500000 == 0:

                _logger.info("Procesando {0} registros: ".format(i))
                fs.append(executor.submit(async_func, chunk, dataset_config, params, cfg, layer, i))

                del chunk
            
                chunk = ""

        _logger.info("Procesando {0} registros: ".format(i))

        fs.append(executor.submit(async_func, chunk, dataset_config, params, cfg, layer, i))

        for i, f in enumerate(concurrent.futures.as_completed(fs)):
            print(f"Process {i} - result {f}")
  1. Case when read all file in pandas (not use multiprocessing)
    df = pd.concat((tchunk for tchunk in pd.read_csv(tmp, dtype=str, na_filter=False, names = columns_name,  sep=separator, header = None, chunksize=5000,index_col=False)))

The last case is too slow when reading very large files. While in the first usecase is working with files of up to 5gb in an average time of 3 minutes. Then, should consider using just the first use case or maybe to use the second usecase with other library like dask ???

  • I would definitely try doing this with dask. something like `df = dask.dataframe.read_csv('s3://bucket/path_to_key.csv', na_filter=False, names = columns_name, sep=separator, header = None, chunksize=5000, index_col=False)` – Michael Delgado Apr 19 '22 at 00:22
  • 1
    one thing to be careful of with pandas is type conversions and whether pandas is using the C or python engine. to speed up pd.read_csv (and same thing with dask), use the `engine='c'` argument, provide explicit dtypes, and make sure you're handling all NaN flags correctly (e.g. so that a float column does not get converted to string/object because of a stray `#n.a.` or something). see https://stackoverflow.com/questions/25508510/fastest-way-to-parse-large-csv-files-in-pandas/69153989#69153989 – Michael Delgado Apr 19 '22 at 00:26
  • thanks for you comment!. i have actually a file with specific types for each columns, so that way i feel calm for that step of conversion. I have a last question, if dask is faster than pandas, we should go straight to using dask (with engine = c) without thinking twice. – masterdevsshm83_ Apr 19 '22 at 02:45
  • Sorry what’s the question? – Michael Delgado Apr 19 '22 at 03:32
  • srry my english friend, My question was, should I use dask instead of pandas when dealing with very large files? – masterdevsshm83_ Apr 19 '22 at 04:17
  • 1
    oh got it! it does depend on the size and what you're trying to do. in general, the bigger the job the more you'll benefit from using dask, but you will have to rewrite/rethink your workflow a bit. parallel processing does involve overhead, so not all jobs will be faster with dask. dask is definitely a powerful tool worth learning in my opinion though. play around with it and you'll get a feel for what works well. – Michael Delgado Apr 19 '22 at 05:15

1 Answers1

1

Do you run into the same issues running the second case like this?

df = pd.DataFrame()
for chunk in pd.read_csv(tmp, dtype=str, na_filter=False, names=columns_name, sep=separator, header=None, chunksize=500000, index_col=False):
    df = pd.concat([df, chunk])

It'll take some setup to implement, but you could also try pyspark.pandas, giving the benefits of pyspark, while not having to learn something new:

import pyspark.pandas as ps

df = ps.read_csv(tmp, dtype=str, na_filter=False, names=columns_name, sep=separator, header=None, index_col=False)
BeRT2me
  • 12,699
  • 2
  • 13
  • 31
  • Yes is still too slow, what happens is that in each iteration there is more to process and if the file is very large, it takes more time. What I'm doing in use case 1 is processing for each chunk of data like "async process" each 500000 lines – masterdevsshm83_ Apr 18 '22 at 23:32
  • 1
    With this method you could also process each chunk individually instead of concating them all together. Also, the result of `pd.read_csv(file, chucksize=x)` is an iterator, so there's probably a way to pass off each chunk to a different process, or use multiprocessing for each manageable chunk. – BeRT2me Apr 18 '22 at 23:37
  • i get your point, but the chunksize of pandas brings me fractional or corrupt data. For example, an incomplete row could come – masterdevsshm83_ Apr 18 '22 at 23:50
  • How so? `chunksize` is specifying the number of lines, so every row will always be complete. – BeRT2me Apr 18 '22 at 23:58
  • thanks is more clear for me, I was misunderstanding the chunksize parameter. I would have to validate which path is more efficient, either iter_lines or read_csv with chunksize. It is clear to me then that the best way to process a large file is to use multiprocessing because the child process are independent ( data ) – masterdevsshm83_ Apr 19 '22 at 02:16
  • `Dask` or some other tool may also be an option, but personally I'd use `pyspark.pandas`. This way you don't have to learn something new if you're already familiar with `pandas`. – BeRT2me Apr 19 '22 at 21:46