20

How can I delete all items from DynamoDB using python (boto3)?

I'm trying to do that:

scan = table.scan()
with table.batch_writer() as batch:
  for each in scan['Items']:
    batch.delete_item(Key=each)

But give me this error:

botocore.exceptions.ClientError: An error occurred (ValidationException) when calling the BatchWriteItem operation: The provided key element does not match the schema
Dansp
  • 1,378
  • 2
  • 14
  • 36
  • 1
    Possible duplicate of [What is the recommended way to delete a large number of items from DynamoDB?](https://stackoverflow.com/questions/9154264/what-is-the-recommended-way-to-delete-a-large-number-of-items-from-dynamodb) – dmulter Mar 14 '19 at 18:49
  • Have a look at this: https://gist.github.com/Swalloow/9966d576a9aafff482eef6b59c222baa – Chuck LaPress Mar 14 '19 at 18:53
  • 1
    I already checked these links and it did not help :( – Dansp Mar 14 '19 at 19:02
  • 1
    How about calling delete_table and create_table? – balderman Mar 14 '19 at 19:06

6 Answers6

33

While I agree that dropping the table and recreating it is much more efficient, there may be cases such as when many GSI's or Trigger events are associated with a table and you don't want to have to re-associate those. The script below will iterate over the scan to handle large tables (each scan call will return 1Mb worth of keys) and use the batch function to delete all items in the table.

import boto3
dynamo = boto3.resource('dynamodb')

def truncateTable(tableName):
    table = dynamo.Table(tableName)
    
    #get the table keys
    tableKeyNames = [key.get("AttributeName") for key in table.key_schema]

    #Only retrieve the keys for each item in the table (minimize data transfer)
    projectionExpression = ", ".join('#' + key for key in tableKeyNames)
    expressionAttrNames = {'#'+key: key for key in tableKeyNames}
    
    counter = 0
    page = table.scan(ProjectionExpression=projectionExpression, ExpressionAttributeNames=expressionAttrNames)
    with table.batch_writer() as batch:
        while page["Count"] > 0:
            counter += page["Count"]
            # Delete items in batches
            for itemKeys in page["Items"]:
                batch.delete_item(Key=itemKeys)
            # Fetch the next page
            if 'LastEvaluatedKey' in page:
                page = table.scan(
                    ProjectionExpression=projectionExpression, ExpressionAttributeNames=expressionAttrNames,
                    ExclusiveStartKey=page['LastEvaluatedKey'])
            else:
                break
    print(f"Deleted {counter}")
            
truncateTable("YOUR_TABLE_NAME")
Daniel777
  • 851
  • 8
  • 20
Ethan Harris
  • 1,273
  • 9
  • 13
  • 2
    This code is the best among current answers. 1) run faster by using batch operations, 2) properly handles keys with invalid chars by using ExpressionAttributeNames, 3) auto-discover the key name, nice. – Alex R Sep 28 '20 at 16:33
  • 2
    this code works here, but keep in mind, if you change the code by adding a filter to the scan, it may stop working, because in that case when page["Count"] == 0 there could be still next page. – cn123h May 03 '21 at 17:17
  • 1
    Throwing the following error: botocore.exceptions.ClientError: An error occurred (ValidationException) when calling the Scan operation: ExpressionAttributeNames contains invalid key: Syntax error; key: "#url_id-process-index" – Constantin Jul 15 '21 at 17:04
21

I found a solution! I just mount the key with my table Id and search Id (compId) and It's worked :)

scan = table.scan()
with table.batch_writer() as batch:
    for each in scan['Items']:
        batch.delete_item(
            Key={
                'uId': each['uId'],
                'compId': each['compId']
            }
        )
Dansp
  • 1,378
  • 2
  • 14
  • 36
  • 8
    This will delete entries; however, beware that because of the pagination of items, you may not delete everything in a longer table. – Matt Mar 10 '20 at 07:43
  • @Matt this table is deleted by day. Not need to paginated the deletion. – Dansp Nov 24 '20 at 17:05
13

Here's an answer that takes into account the fact that you might not get all records back in the first call if you're trying to truncate a big table (or a smaller table with big items). It presumes you're only using a HashKey (called id) so you'd have to add a bit to your ProjectionExpression and delete_item call if you also have a SortKey on your table.

There's some extra in there you could trim out that just prints a counter to stdout to keep us humans happy.

import boto3

TABLE = ...
ID    = ...

table = boto3.resource('dynamodb').Table(TABLE)
scan = None

with table.batch_writer() as batch:
    count = 0
    while scan is None or 'LastEvaluatedKey' in scan:
        if scan is not None and 'LastEvaluatedKey' in scan:
            scan = table.scan(
                ProjectionExpression=ID,
                ExclusiveStartKey=scan['LastEvaluatedKey'],
            )
        else:
            scan = table.scan(ProjectionExpression=ID)

        for item in scan['Items']:
            if count % 5000 == 0:
                print(count)
            batch.delete_item(Key={ID: item[ID]})
            count = count + 1
Alex R
  • 11,364
  • 15
  • 100
  • 180
Matt
  • 907
  • 1
  • 8
  • 17
  • Hey, thanks for posting! I started off with this, but kept getting an error where it couldn't find 'LastEvaluatedKey.' I ended up following this to get a while loop that captured all of the responses: https://www.beabetterdev.com/2021/10/20/dynamodb-scan-query-not-returning-data/ – AwfulPersimmon May 11 '23 at 15:20
1

Use BatchWriteItem. The documentation states

The BatchWriteItem operation puts or deletes multiple items in one or more tables. A single call to BatchWriteItem can write up to 16 MB of data, which can comprise as many as 25 put or delete requests. Individual items to be written can be as large as 400 KB.

I'm assuming that Boto3 API has this also, but might be with a different name.

kaskelotti
  • 4,709
  • 9
  • 45
  • 72
  • 1
    Here it is in the boto3 docs: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.batch_write_item – Evan Apr 12 '20 at 17:19
0

The same approach using batch_writer(), but multithreaded

import boto3
import threading
import time
from queue import LifoQueue, Empty


class DDBTableCleaner(object):

    def __init__(self, table_name, threads_limit=32):
        self._queue = LifoQueue()
        self._threads = dict()
        self._cnt = 0
        self._done = False
        self._threads_limit = threads_limit
        dynamodb_client = boto3.resource('dynamodb')
        self.table = dynamodb_client.Table(table_name)

    def run(self):
        for i in range(self._threads_limit):
            thread_name = f'worker_thread_{i}'
            self._threads[thread_name] = threading.Thread(
                target=self.worker_thread,
                name=thread_name,
            )
            self._threads[thread_name].start()
        self.queue_replenish()
        while self._queue.qsize() > 0:
            print(f'items processed: ({self._cnt})')
            time.sleep(1)
        self._done = True
        for thread in self._threads.values():
            if thread.is_alive():
                thread.join()
        print(f'items processed: ({self._cnt})')

    def queue_replenish(self):
        table_key_names = [key.get('AttributeName') for key in self.table.key_schema]
        projection_expression = ', '.join('#' + key for key in table_key_names)
        expression_attr_names = {'#' + key: key for key in table_key_names}
        page = self.table.scan(
            ProjectionExpression=projection_expression,
            ExpressionAttributeNames=expression_attr_names
        )
        while page['Count'] > 0:
            for item in page['Items']:
                self._queue.put(item)
            if 'LastEvaluatedKey' in page:
                page = self.table.scan(
                    ProjectionExpression=projection_expression,
                    ExpressionAttributeNames=expression_attr_names,
                    ExclusiveStartKey=page['LastEvaluatedKey']
                )
            else:
                break

    def worker_thread(self):
        thr_name = threading.current_thread().name
        print(f'[{thr_name}] thread started')
        with self.table.batch_writer() as batch:
            while not self._done:
                try:
                    item = self._queue.get_nowait()
                except Empty:
                    time.sleep(1)
                else:
                    try:
                        batch.delete_item(Key=item)
                        self._cnt += 1
                    except Exception as e:
                        print(e)
        print(f'[{thr_name}] thread completed')


if __name__ == '__main__':

    table = '...'
    cleaner = DDBTableCleaner(table, threads_limit=10)
    cleaner.run()


egor
  • 1
-1

Modified @egor 's great answer for seeing more robust progress with tqdm:

import boto3
import threading
import time
from queue import LifoQueue, Empty
from tqdm.notebook import tqdm

class DDBTableCleaner(object):
    def __init__(self, table_name, profile_name, threads_limit=32):
        self._pbar = None
        self._queue = LifoQueue()
        self._threads = dict()
        self._cnt = 0
        self._done = False
        self._threads_limit = threads_limit
        self._table_name = table_name
        self.session = boto3.Session(profile_name=profile_name)
        dynamodb_client = self.session.resource('dynamodb')
        self.table = dynamodb_client.Table(table_name)

    def run(self):
        if bool(self._pbar):
            self._pbar.close()

        self._pbar = tqdm(desc=self._table_name)
        for i in range(self._threads_limit):
            thread_name = f'worker_thread_{i}'
            self._threads[thread_name] = threading.Thread(
                target=self.worker_thread,
                name=thread_name,
            )
            self._threads[thread_name].start()
        self.queue_replenish()
        while self._queue.qsize() > 0:
            # print(f'items processed: ({self._cnt})')
            time.sleep(1)
        self._done = True
        for thread in self._threads.values():
            if thread.is_alive():
                thread.join()
        self._pbar.close()
        print(f'items processed: ({self._cnt})')

    def queue_replenish(self):
        table_key_names = [key.get('AttributeName') for key in self.table.key_schema]
        projection_expression = ', '.join('#' + key for key in table_key_names)
        expression_attr_names = {'#' + key: key for key in table_key_names}
        total_read = 0
        page = self.table.scan(
            ProjectionExpression=projection_expression,
            ExpressionAttributeNames=expression_attr_names
        )
        while page['Count'] > 0:
            total_read += page['Count']
            for item in page['Items']:
                self._queue.put(item)
            # print("Still reading... Total read: %d" % total_read)
            self._pbar.total = total_read
            if 'LastEvaluatedKey' in page:
                page = self.table.scan(
                    ProjectionExpression=projection_expression,
                    ExpressionAttributeNames=expression_attr_names,
                    ExclusiveStartKey=page['LastEvaluatedKey']
                )
            else:
                break

    def worker_thread(self):
        thr_name = threading.current_thread().name
        # print(f'[{thr_name}] thread started')
        with self.table.batch_writer() as batch:
            while not self._done:
                try:
                    item = self._queue.get_nowait()
                except Empty:
                    # print("Empty queue")
                    time.sleep(1)
                else:
                    try:
                        batch.delete_item(Key=item)
                        self._pbar.update(1)
                        self._cnt += 1
                    except Exception as e:
                        print(e)
        # print(f'[{thr_name}] thread completed')
JSBach
  • 447
  • 1
  • 6
  • 13