1

I have a Spark job which process the data pretty fast, but when it tries to write the result into the postgresql database, it is quite slow. Here is most of the relevant code:

import psycopg2

def save_df_to_db(records):
    # each item in record is a dictionary with 'url', 'tag', 'value' as keys
    db_conn = psycopg2.connect(connect_string)
    db_conn.autocommit = True
    cur = db_conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
    upsert_query = """INSERT INTO mytable (url, tag, value)
                      VALUES (%(url)s, %(tag)s, %(value)s) ON CONFLICT (url, tag) DO UPDATE SET value = %(value)s"""

    try:
        cursor.executemany(upsert_query, records)
    except Exception as e:
        print "Error in executing save_df_to_db: ", e.message

data = [...] # initial data
rdd = sc.parallelize(data)
rdd = ... # Some simple RDD transforms...
rdd.foreachPartition(save_df_to_db)

The table also has a constraint about url+tag being unique. I am looking for solutions to improve the speed of this code. Any suggestion or recommendation is welcome.

happyhuman
  • 1,541
  • 1
  • 16
  • 30
  • As of this time Psycopg 2.7, which provides `execute_values`, is still in beta. For now use the adequate solution for 2.6: http://stackoverflow.com/a/30985541/131874 – Clodoaldo Neto Feb 15 '17 at 11:45

2 Answers2

2

Thanks for the responses. Since the version of psycopg2 I am using is not supporting the batch execution, I had to rely on a slightly different approach using the copy command. I wrote down a little function which helped reducing the save time from 20 minutes to about 30 seconds. Here is the function. It takes a pandas dataframe as input and write it to a table (curso):

import StringIO
import pandas as pd

def write_dataframe_to_table(cursor, table, dataframe, batch_size=100, null='None'):
    """
    Write a pandas dataframe into a postgres table.
    It only works if the table columns have the same name as the dataframe columns.
    :param cursor: the psycopg2 cursor object
    :param table: the table name
    :param dataframe: the dataframe
    :param batch_size: batch size
    :param null: textual representation of NULL in the file. The default is the string None.
    """
    for i in range(0, len(dataframe), batch_size):
        chunk_df = dataframe[i: batch_size + i]
        content = "\n".join(chunk_df.apply(lambda x: "\t".join(map(str, x)), axis=1))
        cursor.copy_from(StringIO.StringIO(content), table, columns=list(chunk_df.columns), null=null)
happyhuman
  • 1,541
  • 1
  • 16
  • 30
1

I believe the main bottleneck is a combination of cursor.executemany and connection.autocommit. As it is explained in the official documentation of executemany

In its current implementation this method is not faster than executing han executing execute() in a loop.

Since you combine it with connection.autocommit you effectively commit after each insert.

Psycopg provides fast execution helpers:

which can be used to perform batched operations. It would also make more sense to handle commits manually.

It is also possible that you additionally throttle the database server with larger number of concurrent writes and index updates. Normally I would recommend writing to disk and performing batch import with COPY but it is not guaranteed to help here.

Since you use mutable records without timestamps, you cannot just drop the index and recreate it after the import as another way to boost performance.

zero323
  • 322,348
  • 103
  • 959
  • 935