1

I have millions of files being created each hour. Each file has one line of data. These files need to be merged into a single file.

I have tried doing this in the following way:-

  1. Using aws s3 cp to download files for the hour.
  2. Use a bash command to merge the files. OR
  3. Use a python script to merge the files.

This hourly job is being run in Airflow on Kubernetes(EKS). This takes more than one hour to complete and is creating a backlog. Other problem is that it often causes the EC2 Node to stop responding due to high CPU and memory usage. What is the most efficient way of running this job?

The python script for reference:-

from os import listdir
import sys
# from tqdm import tqdm

files = listdir('./temp/')
dest = sys.argv[1]

data = []

tot_len = len(files)
percent = tot_len//100

for i, file in enumerate(files):
    if(i % percent == 0):
        print(f'{i/percent}% complete.')
    with open('./temp/'+file, 'r') as f:
        d = f.read()
        data.append(d)

result = '\n'.join(data)

with open(dest, 'w') as f:
    f.write(result)
  • There is a way check this out - https://stackoverflow.com/questions/17749058/combine-multiple-text-files-into-one-text-file-using-python#17749339 And there is another efficient and easy way, check this out - https://stackoverflow.com/questions/4827453/merge-all-files-in-a-directory-into-one-using-bash#4827464 – Cool Breeze Jun 02 '21 at 04:24
  • Show code? It's unclear what "merge" means to you, and if it just means pasting the file contents together in arbitrary order, it's hard to imagine how code _could_ be written that would require high memory use. – Tim Peters Jun 02 '21 at 04:36
  • @user84634 I started with using cat which gave error on regex expansion of too many files. So I replaced it with find command. Even after this the command takes a lot of time. I tried the simple python code of read then join but even that takes above 2 hours which is not good for an hourly task and sometimes it even causes node crashes in EKS cluster. – therapyCarrots Jun 02 '21 at 04:52
  • @TimPeters Yes, I meant pasting files together in arbitrary order. When I run 'aws s3 cp', the contents of the folder are downloaded into the pod running in the EKS cluster and when I check the memory usage it starts exceeding 1GB later into the job. (The total size of the original files ~1GB). I have also added the python code. – therapyCarrots Jun 02 '21 at 04:54
  • There is another way, you could write a script that will divide the files into 50-100 subdirectories. And then run the python/bash code asynchronously in 30-40 directories at a time. This will perform the job much faster as compared to the previous approach. – Cool Breeze Jun 02 '21 at 04:59

3 Answers3

3

A scalable and reliable method would be:

  • Configure the Amazon S3 bucket to trigger an AWS Lambda function whenever a new file arrives
  • Within the AWS Lambda function, read the contents of the file and send it to an Amazon Kinesis Firehose stream. Then, delete the input file.
  • Configure the Amazon Kinesis Firehose stream to buffer the input data and output a new file either based on time period (up to 15 minutes) or data size (up to 128MB)

See: Amazon Kinesis Data Firehose Data Delivery - Amazon Kinesis Data Firehose

This will not produce on file per hour -- the number of files will depend upon the size of the incoming data.

If you need to create hourly files, you could consider using Amazon Athena on the output files from Firehose. Athena allows you to run SQL queries on files stored in Amazon S3. So, if the input files contain a date column, it can select only the data for a specific hour. (You could code the Lambda function add a date column for this purpose.)

John Rotenstein
  • 241,921
  • 22
  • 380
  • 470
1

I expect you should very seriously consider following the ideas in the AWS-specific answer you got. I'd add this response as a comment, except there's no way to show indented code coherently in a comment.

With respect to your Python script, you're building a giant string with a number of characters equal to the total number of characters across all your input files. So of course the memory use will grow at least that large.

Much less memory-intensive is to write out a file content immediately upon reading it (note this code is untested - may have a typo I'm blind to):

with open(dest, 'w') as fout:
    for i, file in enumerate(files):
        if(i % percent == 0):
            print(f'{i/percent}% complete.')
        with open('./temp/'+file, 'r') as fin:
            fout.write(fin.read())

And one more thing to try if you pursue this: open the files in binary mode instead ('wb' and 'rb'). That may save useless layers of text-mode character decoding/encoding. I assume you just want to paste the raw bytes together.

Tim Peters
  • 67,464
  • 13
  • 126
  • 132
0

Putting this out there in case someone else needs it.

I optimized the merging code to the best of my ability but still the bottleneck was reading or downloading the s3 files which is pretty slow using even the official aws cli.

I found a library s5cmd which is pretty fast as it makes full use of multiprocessing and multithreading and it solved my problem.

Link :- https://github.com/peak/s5cmd