I have run my script in an instance of 18Gb of ram, 4 CPU, and 20 Gb of a disk in both use cases
- My use case is (read line by line):
Read line by line and process every 500000 lines where the script to make cleaning data(add columns, convert to specific type), convert chunk data to particular file type ( parquet), and load data to s3, all done with pandas library. I want to clarify that every 500000 lines are running concurrently by child process (I am using multiprocessing).
My core script in program:
s3_object = s3r.Object(bucket_name=cfg['BUCKETS']['INPUT_PATH'], key=s3_key_input)
chunk = ""
i = 0
err = 0
fs = []
columns_name_list, sep = build_custom_scheme(s3r, params, fs_config, cfg)
if sep == "\\t": sep = "\t"
dataset_config = {"colunms": columns_name_list, "separator": sep}
with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
for ln in s3_object.get()['Body'].iter_lines():
try:
i += 1
chunk += str(ln, 'utf-8') + '\n'
except Exception as e:
err += 1
_logger.warning("Numero de errores de encoding {0} : ".format(err))
if i % 500000 == 0:
_logger.info("Procesando {0} registros: ".format(i))
fs.append(executor.submit(async_func, chunk, dataset_config, params, cfg, layer, i))
del chunk
chunk = ""
_logger.info("Procesando {0} registros: ".format(i))
fs.append(executor.submit(async_func, chunk, dataset_config, params, cfg, layer, i))
for i, f in enumerate(concurrent.futures.as_completed(fs)):
print(f"Process {i} - result {f}")
- Case when read all file in pandas (not use multiprocessing)
df = pd.concat((tchunk for tchunk in pd.read_csv(tmp, dtype=str, na_filter=False, names = columns_name, sep=separator, header = None, chunksize=5000,index_col=False)))
The last case is too slow when reading very large files. While in the first usecase is working with files of up to 5gb in an average time of 3 minutes. Then, should consider using just the first use case or maybe to use the second usecase with other library like dask ???