3Print the items in a DynamoDB table as JSON objects. If no table name is
4supplied, it prints a list of all table names in the account.
6You can do something similar with `aws dynamodb scan`, but this script
7has a couple of neat features:
9- It does a Parallel Scan instead of a vanilla Scan, so it's faster
10- It starts returning objects immediately, rather than waiting until
11 it scans the whole table
12- The output format is more convenient -- a single JSON object per line,
13 so it can be used with text utilities like `head` and `tail`, and the
14 DynamoD JSON representation (e.g. {"sides": {"N": "5"}}) is transformed
15 into a more useful form (e.g. {"sides": 5})
17See https://alexwlchan.net/2020/getting-every-item-from-a-dynamodb-table-with-python/
21import concurrent.futures
31def is_integer(d: decimal.Decimal):
32 _, denominator = d.as_integer_ratio()
33 return denominator == 1
36class DynamoEncoder(json.JSONEncoder):
37 def default(self, obj):
38 if isinstance(obj, decimal.Decimal) and is_integer(obj):
42def parallel_scan_table(sess, *, TableName, **kwargs):
44 Generates all the items in a DynamoDB table.
46 :param dynamo_client: A boto3 client for DynamoDB.
47 :param TableName: The name of the table to scan.
49 Other keyword arguments will be passed directly to the Scan operation.
50 See https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.scan
52 This does a Parallel Scan operation over the table.
55 dynamo_client = sess.resource("dynamodb").meta.client
57 # How many segments to divide the table into? As long as this is >= to the
58 # number of threads used by the ThreadPoolExecutor, the exact number doesn't
62 # How many scans to run in parallel? If you set this really high you could
63 # overwhelm the table read capacity, but otherwise I don't change this much.
64 max_scans_in_parallel = 5
66 # Schedule an initial scan for each segment of the table. We read each
67 # segment in a separate thread, then look to see if there are more rows to
68 # read -- and if so, we schedule another scan.
72 "TableName": TableName,
74 "TotalSegments": total_segments,
76 for segment in range(total_segments)
79 # Make the list an iterator, so the same tasks don't get run repeatedly.
80 scans_to_run = iter(tasks_to_do)
82 with concurrent.futures.ThreadPoolExecutor() as executor:
83 # Schedule the initial batch of futures. Here we assume that
84 # max_scans_in_parallel < total_segments, so there's no risk that
85 # the queue will throw an Empty exception.
87 executor.submit(dynamo_client.scan, **scan_params): scan_params
88 for scan_params in itertools.islice(scans_to_run, max_scans_in_parallel)
92 # Wait for the first future to complete.
93 done, _ = concurrent.futures.wait(
94 futures, return_when=concurrent.futures.FIRST_COMPLETED
98 yield from fut.result()["Items"]
100 scan_params = futures.pop(fut)
102 # A Scan reads up to N items, and tells you where it got to in
103 # the LastEvaluatedKey. You pass this key to the next Scan operation,
104 # and it continues where it left off.
106 scan_params["ExclusiveStartKey"] = fut.result()["LastEvaluatedKey"]
109 tasks_to_do.append(scan_params)
111 # Schedule the next batch of futures. At some point we might run out
112 # of entries in the queue if we've finished scanning the table, so
113 # we need to spot that and not throw.
114 for scan_params in itertools.islice(scans_to_run, len(done)):
115 futures[executor.submit(dynamo_client.scan, **scan_params)] = (
120def list_table_names(sess):
121 paginator = sess.client("dynamodb").get_paginator("list_tables")
123 for page in paginator.paginate():
124 yield from page["TableNames"]
127if __name__ == "__main__":
129 table_name = sys.argv[1]
133 sess = boto3.Session()
135 if table_name is not None:
136 for item in tqdm.tqdm(parallel_scan_table(sess, TableName=table_name)):
137 print(json.dumps(item, cls=DynamoEncoder))
139 for table_name in list_table_names(sess):