This script will either delete a record, or add a ttl field. You might want to tailor it to your column name and remove the delete stuff.
Usage:
usage: add_ttl.py [-h] --profile PROFILE [-d] [--force_delete_all] [-v] [-q]
Procedurally modify DynamoDB
optional arguments:
-h, --help show this help message and exit
--profile PROFILE AWS profile name
-d, --dryrun Dry run, take no action
--force_delete_all Delete all records, including valid, unexpired
-v, --verbose set loglevel to DEBUG
-q, --quiet set loglevel to ERROR
Script:
#!/usr/bin/env python3
# pylint:disable=duplicate-code
import argparse
import logging
import sys
from collections import Counter
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from functools import cached_property
from typing import Dict, Optional
import boto3
from dateutil.parser import isoparse
from tqdm import tqdm
LOGGER = logging.getLogger(__name__)
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
LOG_FORMAT = "[%(asctime)s] %(levelname)s:%(name)s:%(message)s"
def setup_logging(loglevel=None, date_format=None, log_format=None):
"""Setup basic logging.
Args:
loglevel (int): minimum loglevel for emitting messages
"""
logging.basicConfig(
level=loglevel or logging.INFO,
stream=sys.stdout,
format=log_format or LOG_FORMAT,
datefmt=date_format or DATE_FORMAT,
)
def parse_args():
"""
Extract the CLI arguments from argparse
"""
parser = argparse.ArgumentParser(description="Procedurally modify DynamoDB")
parser.add_argument(
"--profile",
help="AWS profile name",
required=True,
)
parser.add_argument(
"-d",
"--dryrun",
action="store_true",
default=False,
help="Dry run, take no action",
)
parser.add_argument(
"--force_delete_all",
action="store_true",
default=False,
help="Delete all records, including valid, unexpired",
)
parser.add_argument(
"-v",
"--verbose",
dest="loglevel",
help="set loglevel to DEBUG",
action="store_const",
const=logging.DEBUG,
)
parser.add_argument(
"-q",
"--quiet",
dest="loglevel",
help="set loglevel to ERROR",
action="store_const",
const=logging.ERROR,
)
return parser.parse_args()
def query_yes_no(question, default="yes"):
"""Ask a yes/no question via input() and return their answer.
"question" is a string that is presented to the user.
"default" is the presumed answer if the user just hits <Enter>.
It must be "yes" (the default), "no" or None (meaning
an answer is required of the user).
The "answer" return value is True for "yes" or False for "no".
"""
valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False}
if default is None:
prompt = " [y/n] "
elif default == "yes":
prompt = " [Y/n] "
elif default == "no":
prompt = " [y/N] "
else:
raise ValueError("invalid default answer: '%s'" % default)
while True:
sys.stdout.write(question + prompt)
choice = input().lower()
if default is not None and choice == "":
return valid[default]
if choice in valid:
return valid[choice]
sys.stdout.write("Please respond with 'yes' or 'no' " "(or 'y' or 'n').\n")
@dataclass
class Table:
"""Class that wraps dynamodb and simplifies pagination as well as counting."""
region_name: str
table_name: str
_counter: Optional[Counter] = None
def __str__(self):
out = "\n" + ("=" * 80) + "\n"
for key, value in self.counter.items():
out += "{:<20} {:<2}\n".format(key, value)
return out
def str_table(self):
keys = list(self.counter.keys())
# Set the names of the columns.
fmt = "{:<20} " * len(keys)
return f"\n\n{fmt}\n".format(*keys) + f"{fmt}\n".format(
*list(self.counter.values())
)
@cached_property
def counter(self):
if not self._counter:
self._counter = Counter()
return self._counter
@cached_property
def client(self):
return boto3.client("dynamodb", region_name=self.region_name)
@cached_property
def table(self):
dynamodb = boto3.resource("dynamodb", region_name=self.region_name)
return dynamodb.Table(self.table_name)
@property
def items(self):
response = self.table.scan()
self.counter["Fetched Pages"] += 1
data = response["Items"]
with tqdm(desc="Fetching pages") as pbar:
while "LastEvaluatedKey" in response:
response = self.table.scan(
ExclusiveStartKey=response["LastEvaluatedKey"]
)
self.counter["Fetched Pages"] += 1
data.extend(response["Items"])
pbar.update(500)
self.counter["Fetched Items"] = len(data)
return data
@cached_property
def item_count(self):
response = self.client.describe_table(TableName=self.table_name)
breakpoint()
count = int(response["Table"]["ItemCount"])
self.counter["Total Rows"] = count
return count
def delete_item(table, item):
return table.table.delete_item(
Key={
"tim_id": item["tim_id"],
}
)
def update_item(table: Table, item: Dict, ttl: int):
return table.table.update_item(
Key={"tim_id": item["tim_id"]},
UpdateExpression="set #t=:t",
ExpressionAttributeNames={
"#t": "ttl",
},
ExpressionAttributeValues={
":t": ttl,
},
ReturnValues="UPDATED_NEW",
)
def main():
setup_logging()
args = parse_args()
if not query_yes_no(
f"Performing batch operations with {args.profile}; is this correct?"
):
sys.exit(1)
sys.stdout.write(f"Setting up connection with {args.profile}\n")
boto3.setup_default_session(profile_name=args.profile)
table = Table(region_name="us-west-2", table_name="TimManager")
now = datetime.utcnow().replace(microsecond=0).astimezone(timezone.utc)
buffer = timedelta(days=7)
# @TODO list comprehension
to_update = []
to_delete = []
for item in tqdm(table.items, desc="Inspecting items"):
ttl_dt = isoparse(item["delivery_stop_time"])
if ttl_dt > now - buffer and not args.force_delete_all:
to_update.append(item)
else:
to_delete.append(item)
table.counter["Identified for update"] = len(to_update)
table.counter["Identified for delete"] = len(to_delete)
table.counter["Performed Update"] = 0
table.counter["Performed Delete"] = 0
if to_update and query_yes_no(
f"Located {len(to_update)} records to update with {args.profile}"
):
for item in tqdm(to_update, desc="Updating items"):
if not args.dryrun:
ttl_dt = isoparse(item["delivery_stop_time"])
response = update_item(table, item, int((ttl_dt + buffer).timestamp()))
if response.get("ResponseMetadata", {}).get("HTTPStatusCode") == 200:
table.counter["Updated"] += 1
if to_delete and query_yes_no(
f"Located {len(to_delete)} records to delete with {args.profile}"
):
for item in tqdm(to_delete, desc="Deleting items"):
if not args.dryrun:
table.counter["Deleted"] += 1
response = delete_item(table, item)
if response.get("ResponseMetadata", {}).get("HTTPStatusCode") == 200:
table.counter["Deleted"] += 1
sys.stdout.write(str(table))
if __name__ == "__main__":
main()