I want to write messages from a websocket to a postgres-DB running on a Raspberry Pi.
The average message/seconds ratio from the websocket is about 30messages/second. But within peaks it reaches up to 250 messages/second.
I implemented a python program to receive the messages and write them to the database with sqlalchemy orm. After each message i first check if the same primary key already exists and then do an update or an insert, afterwards i always do a commit, and so it gets very slow. I can write maximally 30 messages/second to the database. In peak-times this is a problem. So i tested several approaches to speed things up.
This is my best approach:
I first make all the single-querys (with psycopg2) and then join them together and send the complete querystring to the database to execute it at once --> so it speeds up to 580 messages /second.
Create the table for Testdata:
CREATE TABLE transactions (
id int NOT NULL PRIMARY KEY,
name varchar(255),
description varchar(255),
country_name varchar(255),
city_name varchar(255),
cost varchar(255),
currency varchar(255),
created_at DATE,
billing_type varchar(255),
language varchar(255),
operating_system varchar(255)
);
example copied from https://medium.com/technology-nineleaps/mysql-sqlalchemy-performance-b123584eb833
Python-Test-Skript:
import random
import time
from faker import Faker
import psycopg2
from psycopg2.extensions import AsIs
"""psycopg2"""
psycopg2_conn = {'host':'192.168.176.101',
'dbname':'test',
'user':'blabla',
'password':'blabla'}
connection_psycopg2 = psycopg2.connect(**psycopg2_conn)
myFactory = Faker()
def random_data():
billing_type_list = ['cheque', 'cash', 'credit', 'debit', 'e-wallet']
language = ['English', 'Bengali', 'Kannada']
operating_system = 'linux'
random_dic = {}
for i in range(0, 300):
id = int(i)
name = myFactory.name()
description = myFactory.text()
country_name = myFactory.country()
city_name = myFactory.city()
cost = str(myFactory.random_digit_not_null())
currency = myFactory.currency_code()
created_at = myFactory.date_time_between(start_date="-30y", end_date="now", tzinfo=None)
billing_type = random.choice(billing_type_list)
language = random.choice(language)
operating_system = operating_system
random_dic[id] = {}
for xname in ['id', 'description', 'country_name','city_name','cost','currency',
'created_at', 'billing_type','language','operating_system']:
random_dic[id][xname]=locals()[xname]
print(id)
return random_dic
def single_insert_on_conflict_psycopg2(idic, icur):
cur=icur
columns = idic.keys()
columns_with_excludephrase = ['EXCLUDED.{}'.format(column) for column in columns]
values = [idic[column] for column in columns]
insert_statement = """
insert into transactions (%s) values %s
ON CONFLICT ON CONSTRAINT transactions_pkey
DO UPDATE SET (%s) = (%s)
"""
#insert_statement = 'insert into transactions (%s) values %s'
print(','.join(columns))
print(','.join(columns_with_excludephrase))
print(tuple(values))
xquery = cur.mogrify(insert_statement,(
AsIs (','.join(columns)) ,
tuple(values),
AsIs (','.join(columns)) ,
AsIs (','.join(columns_with_excludephrase))
))
print(xquery)
return xquery
def complete_run_psycopg2(random_dic):
querylist=[]
starttime = time.time()
cur = connection_psycopg2.cursor()
for key in random_dic:
print(key)
query=single_insert_on_conflict_psycopg2(idic=random_dic[key],
icur=cur)
querylist.append(query.decode("utf-8") )
complete_query = ';'.join(tuple(querylist))
cur.execute(complete_query)
connection_psycopg2.commit()
cur.close()
endtime = time.time()
xduration=endtime-starttime
write_sec=len(random_dic)/xduration
print('complete Duration:{}'.format(xduration))
print('writes per second:{}'.format(write_sec))
return write_sec
def main():
random_dic = random_data()
complete_run_psycopg2(random_dic)
return
if __name__ == '__main__':
main()
Now my question: is this a proper approach? Are there any hints I didn’t consider?