14

Following on from this question, when I try to create a postgresql table from a dask.dataframe with more than one partition I get the following error:

IntegrityError: (psycopg2.IntegrityError) duplicate key value violates unique constraint "pg_type_typname_nsp_index"
DETAIL:  Key (typname, typnamespace)=(test1, 2200) already exists.
 [SQL: '\nCREATE TABLE test1 (\n\t"A" BIGINT, \n\t"B" BIGINT, \n\t"C" BIGINT, \n\t"D" BIGINT, \n\t"E" BIGINT, \n\t"F" BIGINT, \n\t"G" BIGINT, \n\t"H" BIGINT, \n\t"I" BIGINT, \n\t"J" BIGINT, \n\tidx BIGINT\n)\n\n']

You can recreate the error with the following code:

import numpy as np
import dask.dataframe as dd
import dask
import pandas as pd
import sqlalchemy_utils as sqla_utils
import sqlalchemy as sqla
DATABASE_CONFIG = {
    'driver': '',
    'host': '',
    'user': '',
    'password': '',
    'port': 5432,
}
DBNAME = 'dask'
url = '{driver}://{user}:{password}@{host}:{port}/'.format(
        **DATABASE_CONFIG)
db_url = url.rstrip('/') + '/' + DBNAME
# create db if non-existent
if not sqla_utils.database_exists(db_url):
    print('Creating database \'{}\''.format(DBNAME))
    sqla_utils.create_database(db_url)
conn = sqla.create_engine(db_url)
# create pandas df with random numbers
df = pd.DataFrame(np.random.randint(0,40,size=(100, 10)), columns=list('ABCDEFGHIJ'))
# add index so that it can be used as primary key later on
df['idx'] = df.index
# create dask df
ddf = dd.from_pandas(df, npartitions=4)
# Write to psql
dto_sql = dask.delayed(pd.DataFrame.to_sql)
out = [dto_sql(d, 'test', db_url, if_exists='append', index=False, index_label='idx')
       for d in ddf.to_delayed()]
dask.compute(*out)

The code doesn't produce an error if npartitions is set to 1. So I'm guessing it has to do with postgres not being able to handle parallel requests to write to a same sql table...? How can I fix this?

Ludo
  • 2,307
  • 2
  • 27
  • 58
  • I had the same trouble with dataframe running process in parallel and I used lock as suggest by @genchev but for parallelisation you need to follow steps from this answer https://stackoverflow.com/questions/25557686/python-sharing-a-lock-between-processes – Hugo Roussaffa Nov 24 '21 at 00:54

3 Answers3

14

I was reading this. It seems this error rises when you are creating/updating the same table with parallel processing. I understand it depends because of this (as explained on the google group discussion).

So I think it depend from PostgreSQL itself and not from the connection driver or the module used for the multiprocessing.

Well, Actually, the only way I found to solve this is to create chunks big enough to have back a writing process slower than the calculation itself. With bigger chunks this error doesn't rise.

Bhargav Rao
  • 50,140
  • 28
  • 121
  • 140
Glori P.
  • 466
  • 1
  • 4
  • 14
  • 1
    Not the same exact conditions, but I got this error when I (mistakenly) had two connections to PSQL running an identical `CREATE TABLE` seed script in two terminals. Killing one or both terminals and only restarting one fixed it. – Preston Wallace May 17 '20 at 05:15
  • Had the same error when deploying to heroku. I waited for a few minutes and then opened the app. The error had disapeared. My guess: Tried to use the app when heroku was still creating tables. – Confusion Matrix Aug 25 '20 at 12:38
4

In PostgreSQL that helps me.

set enable_parallel_hash=off;

After u can turn it on

set enable_parallel_hash=on;
1

I had the same error with ponyORM on PostgreSQL in Heroku. I solved it by locking the thread until it executes the DB operation. In my case:

lock = threading.Lock()
with lock:
    PonyOrmEntity(name='my_name', description='description')
    PonyOrmEntity.get(lambda u: u.name == 'another_name')
genchev
  • 102
  • 1
  • 10