0

I want to write messages from a websocket to a postgres-DB running on a Raspberry Pi.

The average message/seconds ratio from the websocket is about 30messages/second. But within peaks it reaches up to 250 messages/second.

I implemented a python program to receive the messages and write them to the database with sqlalchemy orm. After each message i first check if the same primary key already exists and then do an update or an insert, afterwards i always do a commit, and so it gets very slow. I can write maximally 30 messages/second to the database. In peak-times this is a problem. So i tested several approaches to speed things up.

This is my best approach:

I first make all the single-querys (with psycopg2) and then join them together and send the complete querystring to the database to execute it at once --> so it speeds up to 580 messages /second.

Create the table for Testdata:

CREATE TABLE transactions (
    id int NOT NULL PRIMARY KEY,
    name varchar(255),
    description varchar(255),
    country_name varchar(255),
    city_name varchar(255),
    cost varchar(255),
    currency varchar(255),
    created_at DATE,
    billing_type varchar(255),
    language varchar(255),
    operating_system varchar(255)
);

example copied from https://medium.com/technology-nineleaps/mysql-sqlalchemy-performance-b123584eb833

Python-Test-Skript:

import random
import time
from faker import Faker
import psycopg2
from psycopg2.extensions import AsIs

"""psycopg2"""
psycopg2_conn = {'host':'192.168.176.101',
              'dbname':'test',
                    'user':'blabla',
                    'password':'blabla'}
connection_psycopg2 = psycopg2.connect(**psycopg2_conn)

myFactory = Faker() 

def random_data(): 

    billing_type_list = ['cheque', 'cash', 'credit', 'debit', 'e-wallet']
    language = ['English', 'Bengali', 'Kannada']
    operating_system = 'linux'
    random_dic = {}
    for i in range(0, 300):
        id = int(i)
        name = myFactory.name()
        description = myFactory.text()
        country_name = myFactory.country()
        city_name = myFactory.city()
        cost = str(myFactory.random_digit_not_null())
        currency = myFactory.currency_code()
        created_at = myFactory.date_time_between(start_date="-30y", end_date="now", tzinfo=None)
        billing_type = random.choice(billing_type_list)
        language = random.choice(language)
        operating_system = operating_system
        random_dic[id] = {}
        for xname in ['id', 'description', 'country_name','city_name','cost','currency',
                      'created_at', 'billing_type','language','operating_system']:
            random_dic[id][xname]=locals()[xname]
        print(id)

    return random_dic




def single_insert_on_conflict_psycopg2(idic, icur):

    cur=icur
    columns = idic.keys()
    columns_with_excludephrase = ['EXCLUDED.{}'.format(column) for column in columns]
    values = [idic[column] for column in columns]    
    insert_statement = """
    insert into transactions (%s) values %s
    ON CONFLICT ON CONSTRAINT transactions_pkey 
    DO UPDATE SET (%s) = (%s)
    """
    #insert_statement = 'insert into transactions (%s) values %s'
    print(','.join(columns))
    print(','.join(columns_with_excludephrase))
    print(tuple(values))
    xquery = cur.mogrify(insert_statement,( 
                    AsIs (','.join(columns)) , 
                    tuple(values),
                    AsIs (','.join(columns)) , 
                    AsIs (','.join(columns_with_excludephrase)) 
                    ))
    print(xquery)
    return xquery

def complete_run_psycopg2(random_dic):

    querylist=[]
    starttime = time.time()
    cur = connection_psycopg2.cursor()
    for key in random_dic:
        print(key)
        query=single_insert_on_conflict_psycopg2(idic=random_dic[key], 
                                               icur=cur)
        querylist.append(query.decode("utf-8") )

    complete_query = ';'.join(tuple(querylist))
    cur.execute(complete_query)
    connection_psycopg2.commit()
    cur.close()
    endtime = time.time()
    xduration=endtime-starttime
    write_sec=len(random_dic)/xduration
    print('complete Duration:{}'.format(xduration))
    print('writes per second:{}'.format(write_sec))
    return write_sec

def main():

    random_dic = random_data()
    complete_run_psycopg2(random_dic)
    return

if __name__ == '__main__':
    main()

Now my question: is this a proper approach? Are there any hints I didn’t consider?

Egirus Ornila
  • 1,234
  • 4
  • 14
  • 39

1 Answers1

0

First You can not insert column names like that. I would use .format to inject column names, and then use %s for the values.

SQL = 'INSERT INTO ({}) VALUES (%s,%s,%s,%s,%s,%s)'.format(','.join(columnns))
db.Pcursor().execute(SQL, value1, value2, value3)

Second you will get better speed if you use async processes.

Fortunately for you I wrote a gevent async library for psycopg2 you can use. It makes the process far easier, it is async threaded and pooled.

Python Postgres psycopg2 ThreadedConnectionPool exhausted

eatmeimadanish
  • 3,809
  • 1
  • 14
  • 20
  • What do you mean by "You can not insert column names like that"? i found many examples like that. Are there security issues? Or what problems will i get if i do it like that? – Egirus Ornila Aug 01 '19 at 17:48
  • psycopg2 will tokenize the strings which will not work as column names. Which is only a concern for user input values. – eatmeimadanish Aug 01 '19 at 22:02