I have a list of dictionaries that represents a CSV file, and I would like to write them to S3, however I am getting a memory error. Here Id my code:
import csv
import io
dicts = [] # populated with about 1,000,000 dictionaries representing a CSV
f = io.StringIO()
writer = csv.DictWriter(f, fieldnames=dicts[0].keys())
writer.writeheader()
for k in dicts:
writer.writerow(k)
print("Writing to S3...")
response = s3.upload_fileobj(Bucket='mybucket', Key=f"key.csv", Fileobj=f.getvalue())
f.close()
However, when I run this I get the following error:
[ERROR] MemoryErrorTraceback (most recent call last):
File "/var/task/lambda_function.py", line 85, in lambda_handler
response = s3.upload_fileobj(Bucket='mybucket', Key=f"key.csv", Fileobj=f.getvalue())
How can I go about writing this to S3 in a more memory efficient way? the CSV is about 400mb and has around 1,000,000 rows.
EDIT:
I have the max amount of memory available, here is the report from lambda:
REPORT RequestId: c8f651cf-9869-4217-921f-52edcf577234
Duration: 123484.03 ms
Billed Duration: 123485 ms
Memory Size: 10240 MB
Max Memory Used: 10043 MB
Init Duration: 453.23 ms
I have run a memory profiler and the vast majority of the memory is used writing to f
and f.getvalue()
unsurprisingly
EDIT:
Here is the full lambda function code:
for i in event['files']:
try:
file = s3.get_object(Bucket="incomingbucket", Key=i)
print(file)
except Exception as e:
print(e, i)
file_id = str(uuid.uuid4())
jsonRootLs = i.split(".")
if len(jsonRootLs) > 1:
jsonRoot = '.'.join(j for j in jsonRootLs[0:len(jsonRootLs)-1])
jsonFileName = f"{jsonRoot}.json"
else:
jsonRoot = jsonRootLs[0]
jsonFileName = f"{jsonRoot}.json"
mapper = s3.get_object(Key=jsonFileName, Bucket='slm-addressfile-incoming')
mapperJSON = json.loads(mapper['Body'].read().decode('utf-8'))
dicts = modelerFile(file, mapperJSON)
for j in dicts:
j['mail_filename'] = i
j['file_id'] = file_id
dictsToSend.extend(dicts)
print("Records added to list")
f = io.StringIO()
writer = csv.DictWriter(f, fieldnames=dicts[0].keys())
writer.writeheader()
for k in dicts:
writer.writerow(k)
print("Writing to S3...")
response = s3.upload_fileobj(Bucket='slm-test-bucket-transactional', Key=f"{jsonRoot}.csv", Fileobj=f.getvalue())
f.close()
# Function to re map columns
def customFile(file, mapperjson):
NCOAFields = mapperjson['mappings']
lines1 = []
for line in file['Body'].iter_lines():
lines1.append(line.decode('utf-8', errors='ignore'))
fieldnames = lines1[0].replace('"','').split(',')
jlist1 = (dict(row) for row in csv.DictReader(lines1[1:], fieldnames))
dicts = []
for i in jlist1:
d = {}
metadata = {}
for k, v in i.items():
if k in NCOAFields:
d[NCOAFields[k]] = v
else:
metadata[k] = v
if len(metadata) > 0:
d['metadata'] = metadata
d['individual_id'] = str(uuid.uuid4())
dicts.append(d)
del jlist1
return dicts
Basically it reads a CSV rom S3 which also has a JSON mapping file to change names of the columns to our destination schema