I have a python program that connects to a PostGreSQL database. In this database I have quite a lot of data (around 1.2 billion rows). Luckily I don't have to analyse all of those rows at the same time.
Those 1.2 billion rows are spread on several tables (around 30). Currently I am accessing a table called table_3, in which I want to access all the rows that has a specific "did" value (as the column is called).
I have counted the rows using a SQL command:
SELECT count(*) FROM table_3 WHERE did='356002062376054';
which returns with 157 million rows.
I will perform some "analysis" on all of these rows (extracting 2 specific values) and doing some calculations on these values, followed by writing them to a dictionary and then save them back on the PostGreSQL in a different table.
The problem is I'm am creating a lot of lists and dictionaries in managing all this I end up running out of memory even though I am using Python 3 64 bit and have 64 GB of RAM.
Some code:
CONNECTION = psycopg2.connect('<psycopg2 formatted string>')
CURSOR = CONNECTION.cursor()
DID_LIST = ["357139052424715",
"353224061929963",
"356002064810514",
"356002064810183",
"358188051768472",
"358188050598029",
"356002061925067",
"358188056470108",
"356002062376054",
"357460064130045"]
SENSOR_LIST = [1, 2, 3, 4, 5, 6, 7, 8, 9,
10, 11, 12, 13, 801, 900, 901,
902, 903, 904, 905, 906, 907,
908, 909, 910, 911]
for did in did_list:
table_name = did
for sensor_id in sensor_list:
rows = get_data(did, sensor_id)
list_object = create_standard_list(sensor_id, rows) # Happens here
formatted_list = format_table_dictionary(list_object) # Or here
pushed_rows = write_to_table(table_name, formatted_list) #write_to_table method is omitted as that is not my problem.
def get_data(did, table_id):
"""Getting data from postgresql."""
table_name = "table_{0}".format(table_id)
query = """SELECT * FROM {0} WHERE did='{1}'
ORDER BY timestamp""".format(table_name, did)
CURSOR.execute(query)
CONNECTION.commit()
return CURSOR
def create_standard_list(sensor_id, data):
"""Formats DB data to dictionary"""
list_object = []
print("Create standard list")
for row in data: # data is the psycopg2 CURSOR
row_timestamp = row[2]
row_data = row[3]
temp_object = {"sensor_id": sensor_id, "timestamp": row_timestamp,
"data": row_data}
list_object.append(temp_object)
return list_object
def format_table_dictionary(list_dict):
"""Formats dictionary to simple data
table_name = (dates, data_count, first row)"""
print("Formatting dict to DB")
temp_today = 0
dict_list = []
first_row = {}
count = 1
for elem in list_dict:
# convert to seconds
date = datetime.fromtimestamp(elem['timestamp'] / 1000)
today = int(date.strftime('%d'))
if temp_today is not today:
if not first_row:
first_row = elem['data']
first_row_str = str(first_row)
dict_object = {"sensor_id": elem['sensor_id'],
"date": date.strftime('%d/%m-%Y'),
"reading_count": count,
# size in MB of data
"approx_data_size": (count*len(first_row_str)/1000),
"time": date.strftime('%H:%M:%S'),
"first_row": first_row}
dict_list.append(dict_object)
first_row = {}
temp_today = today
count = 0
else:
count += 1
return dict_list
My error happens somewhere around creating either of the two lists as marked with comments in my code. And it represents with my computer stopping responding, and eventually logging me out. I am running windows 10 if that is some importance.
I know the first list I create with the "create_standard_list" method could be excluded and that code could be run in the "format_table_dictionary" code, and thereby avoid a list with 157 mio element in memory, but I think that some of the other tables that I will run into will have similar problems and might be even larger, so I thought of optimizing it all right now, but I am unsure of what I could do?
I guess writing to a file wouldn't really help a whole lot as I would have to read that file and thereby putting it back into memory all again?
Minimalist example
I have a table
---------------------------------------------------------------
|Row 1 | did | timestamp | data | unused value | unused value |
|Row 2 | did | timestamp | data | unused value | unused value |
....
---------------------------------
table = [{ values from above row1 }, { values from above row2},...]
connection = psycopg2.connect(<connection string>)
cursor = connection.cursor()
table = cursor.execute("""SELECT * FROM table_3 WHERE did='356002062376054'
ORDER BY timestamp""")
extracted_list = extract(table)
calculated_list = calculate(extracted_list)
... write to db ...
def extract(table):
"""extract all but unused values"""
new_list = []
for row in table:
did = row[0]
timestamp = row[1]
data = row[2]
a_dict = {'did': did, 'timestamp': timestamp, 'data': data}
new_list.append(a_dict)
return new_list
def calculate(a_list):
"""perform calculations on values"""
dict_list = []
temp_today = 0
count = 0
for row in a_list:
date = datetime.fromtimestamp(row['timestamp'] / 1000) # from ms to sec
today = int(date.strfime('%d'))
if temp_today is not today:
new_dict = {'date': date.strftime('%d/%m-%Y'),
'reading_count': count,
'time': date.strftime('%H:%M:%S')}
dict_list.append(new_dict)
return dict_list