69

I am using bulk_create to loads thousands or rows into a postgresql DB. Unfortunately some of the rows are causing IntegrityError and stoping the bulk_create process. I was wondering if there was a way to tell django to ignore such rows and save as much of the batch as possible?

Meitham
  • 9,178
  • 5
  • 34
  • 45
  • That may not be possible because PostgreSQL aborts the transaction on the first error. Django would need to either (a) create a SAVEPOINT before each insert, which slows things down and costs resources; or (b) Use a procedure or query to insert only if the row doesn't already exist. Personally, I'd bulk-insert into a new separate table, probably `UNLOGGED` or `TEMPORARY`, then `INSERT INTO realtable SELECT * FROM temptable WHERE NOT EXISTS (SELECT 1 FROM realtable WHERE temptable.id = realtable.id)` or similar. – Craig Ringer Sep 16 '12 at 23:06
  • @CraigRinger Nice idea, but bit of a work if you have larger models – gies0r Aug 23 '18 at 20:13

7 Answers7

107

This is now possible on Django 2.2

Django 2.2 adds a new ignore_conflicts option to the bulk_create method, from the documentation:

On databases that support it (all except PostgreSQL < 9.5 and Oracle), setting the ignore_conflicts parameter to True tells the database to ignore failure to insert any rows that fail constraints such as duplicate unique values. Enabling this parameter disables setting the primary key on each model instance (if the database normally supports it).

Example:

Entry.objects.bulk_create([
    Entry(headline='This is a test'),
    Entry(headline='This is only a test'),
], ignore_conflicts=True)
Community
  • 1
  • 1
Cesar Canassa
  • 18,659
  • 11
  • 66
  • 69
  • 4
    It seem like, there was almost no performance impact if that option is turned on. Nice Work Django! And thank you for this answer... – Sauer May 24 '19 at 10:32
  • 1
    The sad thing is that there will be no PK on created models :( – artem Oct 14 '19 at 20:57
  • 1
    @int_32 are you using a special PK field? At least with the default, the PK gets created fine, except the returned objects still have `None` as their PK, but in the db it's fine. – Henry Woody Nov 07 '19 at 21:27
  • 1
    @HenryWoody correct, I mean that returned objects do not have PK's, of course it's ok in DB. – artem Nov 08 '19 at 15:43
  • 26
    When using this technique, do you get any return info on which rows couldn't be inserted? – shacker May 28 '20 at 21:31
  • 1
    It's been alluded to in the comments but it bears repeating because the docs phrasing is kind of confusing... it DOES set a primary key in the row on the DB, you just don't get it back on the version of the object that it returns... – M. Ryan Aug 24 '22 at 15:32
14

One quick-and-dirty workaround for this that doesn't involve manual SQL and temporary tables is to just attempt to bulk insert the data. If it fails, revert to serial insertion.

objs = [(Event), (Event), (Event)...]

try:
    Event.objects.bulk_create(objs)

except IntegrityError:
    for obj in objs:
        try:
            obj.save()
        except IntegrityError:
            continue

If you have lots and lots of errors this may not be so efficient (you'll spend more time serially inserting than doing so in bulk), but I'm working through a high-cardinality dataset with few duplicates so this solves most of my problems.

Ivan
  • 1,427
  • 1
  • 16
  • 26
  • in my personal view this is indeed the best, as it allows you to capture the error which eventually gets missed as part of "ignore conflicts". – Aman Madan Nov 01 '20 at 21:26
9

(Note: I don't use Django, so there may be more suitable framework-specific answers)

It is not possible for Django to do this by simply ignoring INSERT failures because PostgreSQL aborts the whole transaction on the first error.

Django would need one of these approaches:

  1. INSERT each row in a separate transaction and ignore errors (very slow);
  2. Create a SAVEPOINT before each insert (can have scaling problems);
  3. Use a procedure or query to insert only if the row doesn't already exist (complicated and slow); or
  4. Bulk-insert or (better) COPY the data into a TEMPORARY table, then merge that into the main table server-side.

The upsert-like approach (3) seems like a good idea, but upsert and insert-if-not-exists are surprisingly complicated.

Personally, I'd take (4): I'd bulk-insert into a new separate table, probably UNLOGGED or TEMPORARY, then I'd run some manual SQL to:

LOCK TABLE realtable IN EXCLUSIVE MODE;

INSERT INTO realtable 
SELECT * FROM temptable WHERE NOT EXISTS (
    SELECT 1 FROM realtable WHERE temptable.id = realtable.id
);

The LOCK TABLE ... IN EXCLUSIVE MODE prevents a concurrent insert that creates a row from causing a conflict with an insert done by the above statement and failing. It does not prevent concurrent SELECTs, only SELECT ... FOR UPDATE, INSERT,UPDATE and DELETE, so reads from the table carry on as normal.

If you can't afford to block concurrent writes for too long you could instead use a writable CTE to copy ranges of rows from temptable into realtable, retrying each block if it failed.

Craig Ringer
  • 307,061
  • 76
  • 688
  • 778
  • Thanks @craig-ringer I ended up clearing my python list of objects before inserting them in the DB, something similar to approach #3 of yours, but in pure python. – Meitham Sep 21 '12 at 12:53
  • There's a detailed example of (4) at https://rodmtech.net/docs/django/django-bulk_create-without-integrityerror-rollback/ – eugene Mar 03 '15 at 14:20
3

Or 5. Divide and conquer

I didn't test or benchmark this thoroughly, but it performs pretty well for me. YMMV, depending in particular on how many errors you expect to get in a bulk operation.

def psql_copy(records):
    count = len(records)
    if count < 1:
        return True
    try:
        pg.copy_bin_values(records)
        return True
    except IntegrityError:
        if count == 1:
            # found culprit!
            msg = "Integrity error copying record:\n%r"
            logger.error(msg % records[0], exc_info=True)
            return False
    finally:
        connection.commit()

    # There was an integrity error but we had more than one record.
    # Divide and conquer.
    mid = count / 2
    return psql_copy(records[:mid]) and psql_copy(records[mid:])
    # or just return False
Aryeh Leib Taurog
  • 5,370
  • 1
  • 42
  • 49
0

Even in Django 1.11 there is no way to do this. I found a better option than using Raw SQL. It using djnago-query-builder. It has an upsert method

from querybuilder.query import Query
q = Query().from_table(YourModel)
# replace with your real objects
rows = [YourModel() for i in range(10)] 
q.upsert(rows, ['unique_fld1', 'unique_fld2'], ['fld1_to_update', 'fld2_to_update'])

Note: The library only support postgreSQL

Here is a gist that I use for bulk insert that supports ignoring IntegrityErrors and returns the records inserted.

Noortheen Raja
  • 791
  • 11
  • 15
0

Late answer for pre Django 2.2 projects :

I ran into this situation recently and I found my way out with a seconder list array for check the uniqueness.

In my case, the model has that unique together check, and bulk create is throwing Integrity Error exception because of the array of bulk create has duplicate data in it.

So I decided to create checklist besides bulk create objects list. Here is the sample code; The unique keys are owner and brand, and in this example owner is an user object instance and brand is a string instance:

create_list = []
create_list_check = []
for brand in brands:
    if (owner.id, brand) not in create_list_check:
        create_list_check.append((owner.id, brand))
        create_list.append(ProductBrand(owner=owner, name=brand))

if create_list:
    ProductBrand.objects.bulk_create(create_list)
Sencer H.
  • 1,201
  • 1
  • 13
  • 35
0

it's work for me
i am use this this funtion in thread.
my csv file contains 120907 no of rows.

def products_create():
    full_path = os.path.join(settings.MEDIA_ROOT,'productcsv')
    filename = os.listdir(full_path)[0]
    logger.debug(filename)
    logger.debug(len(Product.objects.all()))
    if len(Product.objects.all()) > 0:
        logger.debug("Products Data Erasing")
        Product.objects.all().delete()
        logger.debug("Products Erasing Done")

    csvfile = os.path.join(full_path,filename)
    csv_df = pd.read_csv(csvfile,sep=',')
    csv_df['HSN Code'] = csv_df['HSN Code'].fillna(0)
    row_iter = csv_df.iterrows()
    logger.debug(row_iter)

    logger.debug("New Products Creating")

    for index, row in row_iter:
        Product.objects.create(part_number = row[0],
                               part_description = row[1],
                               mrp = row[2],
                               hsn_code = row[3],
                               gst = row[4],
                               )

    # products_list = [
    #           Product(
    #               part_number = row[0] ,
    #               part_description = row[1],
    #               mrp = row[2],
    #               hsn_code = row[3],
    #               gst = row[4],
    #           )
    #           for index, row in row_iter

    #       ]
    # logger.debug(products_list)

    # Product.objects.bulk_create(products_list)
    logger.debug("Products uploading done")```

Gaurav Nagar
  • 191
  • 2
  • 4