8

There is a list of data in a csv that I want to put into a dynamodb table on aws. See sample list below.

    Mary,F,7065
    Anna,F,2604
    Emma,F,2003
    Elizabeth,F,1939
    Minnie,F,1746
    Margaret,F,1578
    Ida,F,1472
    Alice,F,1414
    Bertha,F,1320
    Sarah,F,1288
    Annie,F,1258
    Clara,F,1226
    Ella,F,1156
    Florence,F,1063
    Cora,F,1045
    Martha,F,1040
    Laura,F,1012
    Nellie,F,995
    Grace,F,982
    Carrie,F,949
    Maude,F,858
    Mabel,F,808
    Bessie,F,796
    Jennie,F,793
    Gertrude,F,787
    Julia,F,783
    Hattie,F,769
    Edith,F,768
    Mattie,F,704
    Rose,F,700
    Catherine,F,688
    Lillian,F,672
    Ada,F,652
    Lillie,F,647
    Helen,F,636
    Jessie,F,635
    Louise,F,635
    Ethel,F,633
    Lula,F,621
    Myrtle,F,615
    Eva,F,614
    Frances,F,605
    Lena,F,603
    Lucy,F,590
    Edna,F,588
    Maggie,F,582
    Pearl,F,569
    Daisy,F,564
    Fannie,F,560
    Josephine,F,544

In order to write more than 25 items to a dynamodb table, the documents use a batch_writer object.

    resource = boto3.resource('dynamodb')
    table = resource.Table('Names')
    with table.batch_writer() as batch:
        for item in items:
            batch.put_item(item)

Is there a way to return an http response to indicate a successful completion of the batch_write? I know that it is asyncronous. Is there a wait or fetch or something to call?

polka
  • 1,383
  • 2
  • 25
  • 40

2 Answers2

15

The documents for the BatchWriter object instantiated by batch_writer are located (<3 Open Source) here. Looking at the BatchWriter class, the _flush method generates a response, it just doesn't store it anywhere.

class BatchWriter(object):
    """Automatically handle batch writes to DynamoDB for a single table."""
    def __init__(self, table_name, client, flush_amount=25,
                 overwrite_by_pkeys=None):
        """
        :type table_name: str
        :param table_name: The name of the table.  The class handles
            batch writes to a single table.
        :type client: ``botocore.client.Client``
        :param client: A botocore client.  Note this client
            **must** have the dynamodb customizations applied
            to it for transforming AttributeValues into the
            wire protocol.  What this means in practice is that
            you need to use a client that comes from a DynamoDB
            resource if you're going to instantiate this class
            directly, i.e
            ``boto3.resource('dynamodb').Table('foo').meta.client``.
        :type flush_amount: int
        :param flush_amount: The number of items to keep in
            a local buffer before sending a batch_write_item
            request to DynamoDB.
        :type overwrite_by_pkeys: list(string)
        :param overwrite_by_pkeys: De-duplicate request items in buffer
            if match new request item on specified primary keys. i.e
            ``["partition_key1", "sort_key2", "sort_key3"]``
        """
        self._table_name = table_name
        self._client = client
        self._items_buffer = []
        self._flush_amount = flush_amount
        self._overwrite_by_pkeys = overwrite_by_pkeys

    def put_item(self, Item):
        self._add_request_and_process({'PutRequest': {'Item': Item}})

    def delete_item(self, Key):
        self._add_request_and_process({'DeleteRequest': {'Key': Key}})

    def _add_request_and_process(self, request):
        if self._overwrite_by_pkeys:
            self._remove_dup_pkeys_request_if_any(request)
        self._items_buffer.append(request)
        self._flush_if_needed()

    def _remove_dup_pkeys_request_if_any(self, request):
        pkey_values_new = self._extract_pkey_values(request)
        for item in self._items_buffer:
            if self._extract_pkey_values(item) == pkey_values_new:
                self._items_buffer.remove(item)
                logger.debug("With overwrite_by_pkeys enabled, skipping "
                             "request:%s", item)

    def _extract_pkey_values(self, request):
        if request.get('PutRequest'):
            return [request['PutRequest']['Item'][key]
                    for key in self._overwrite_by_pkeys]
        elif request.get('DeleteRequest'):
            return [request['DeleteRequest']['Key'][key]
                    for key in self._overwrite_by_pkeys]
        return None

    def _flush_if_needed(self):
        if len(self._items_buffer) >= self._flush_amount:
            self._flush()

    def _flush(self):
        items_to_send = self._items_buffer[:self._flush_amount]
        self._items_buffer = self._items_buffer[self._flush_amount:]
        response = self._client.batch_write_item(
            RequestItems={self._table_name: items_to_send})
        unprocessed_items = response['UnprocessedItems']

        if unprocessed_items and unprocessed_items[self._table_name]:
            # Any unprocessed_items are immediately added to the
            # next batch we send.
            self._items_buffer.extend(unprocessed_items[self._table_name])
        else:
            self._items_buffer = []
        logger.debug("Batch write sent %s, unprocessed: %s",
                     len(items_to_send), len(self._items_buffer))

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, tb):
        # When we exit, we need to keep flushing whatever's left
        # until there's nothing left in our items buffer.
        while self._items_buffer:
            self._flush()

How I solved it:

I built on the responses to this question about overwriting class methods. They all work, but the best for my use case was to overwrite the class instance with this version of _flush.

First I built a new version of _flush.

import logging
import types

## New Flush

def _flush(self):
    items_to_send = self._items_buffer[:self._flush_amount]
    self._items_buffer = self._items_buffer[self._flush_amount:]
    self._response = self._client.batch_write_item(
        RequestItems={self._table_name: items_to_send})
    unprocessed_items = self._response['UnprocessedItems']

    if unprocessed_items and unprocessed_items[self._table_name]:
        # Any unprocessed_items are immediately added to the
        # next batch we send.
        self._items_buffer.extend(unprocessed_items[self._table_name])
    else:
        self._items_buffer = []
    logger.debug("Batch write sent %s, unprocessed: %s",
                 len(items_to_send), len(self._items_buffer))


Then I overwrote the instance method like this.

with batch_writer() as batch:
    batch._flush=types.MethodType(_flush, batch)
    for item in items:
        batch.put_item(Item=item)
print(batch._response)

And this generates an output like this.

{'UnprocessedItems': {},
 'ResponseMetadata': {'RequestId': '853HSV0ULO4BN71R6T895J991VVV4KQNSO5AEMVJF66Q9ASUAAJ',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'server': 'Server',
   'date': 'Fri, 29 Mar 2019 18:29:49 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '23',
   'connection': 'keep-alive',
   'x-amzn-requestid': '853HSV0ULO4BN71R6T895J991VVV4KQNSO5AEMVJF66Q9ASUAAJ',
   'x-amz-crc32': '4185382645'},
  'RetryAttempts': 0}}
polka
  • 1,383
  • 2
  • 25
  • 40
  • This is a really interesting method override - I didn't realize you could inject methods like that. Thanks for sharing! Any plans to open a PR to get this behavior merged into boto3 codebase to replace the _flush() method? Super nice to have a response. – Sam Jett Aug 27 '21 at 17:58
  • Look, if you have the time and want to crib my work and publish a PR to boto3, go for it. That would make this question, my most popular question/answer on SO obsolete... so you would probably owe me a mention. – polka Aug 28 '21 at 16:31
3

There doesn't appear to be any built-in way to do this. The _flush method on BatchWriter does log a debug message when it finishes a batch, though. If you just want to see what's happening, you could enable debug logging before your put_item loop:

import logging
logger = logging.getLogger('boto3.dynamodb.table')
logger.setLevel(logging.DEBUG)

If you want to take some action instead you could create a custom logging.Handler, something like this:

import logging
import sys

class CatchBatchWrites(logging.Handler):
    def handle(self, record):
        if record.msg.startswith('Batch write sent'):
            processed, unprocessed = record.args
            # do something with these numbers


logger = logging.getLogger('boto3.dynamodb.table')
logger.setLevel(logging.DEBUG) # still necessary
logger.addHandler(CatchBatchWrites())
Nathan Vērzemnieks
  • 5,495
  • 1
  • 11
  • 23
  • Let me know if this has to be a separate question, but what does the batch._client.describe_endpoints function do? It has an http response, but I don't know what it is referring to or whether it is relevant to the above situation. – polka Mar 26 '19 at 18:24
  • I don't think it's relevant, but I'm not sure what it does! Yes - I would suggest making that a separate question if you do want to know more. – Nathan Vērzemnieks Mar 27 '19 at 04:42
  • Ok. Made a new question [here](https://stackoverflow.com/questions/55370149/what-does-the-batch-client-describe-endpoints-function-do), @nathan-vērzemnieks – polka Mar 27 '19 at 05:08
  • I am trying to implement your solution, and I am getting this valueerror. `ValueError: not enough values to unpack (expected 2, got 0)` – polka Mar 27 '19 at 05:26
  • Running the logging is supplying a record.msg of `Batch write sent %s, unprocessed: %s`. – polka Mar 27 '19 at 05:45