3Print a summary of messages visible on our SQS queues (and dead-letter queues).
7 - 3 catalogue-2023-03-29_image_inferrer_input
8 - 1,246 catalogue-2023-03-29_ingestor_works_input
9 1 - sierra-adapter-20200604-sierra_bibs_windows
11Note: this relies on a Wellcome-specific convention that the dead-letter queue
12associated with a queue has the same name but with `_dlq` on the end,
13e.g. `calm-windows` and `calm-windows_dlq`.
25from _common import create_link_text
27# https://github.com/alexwlchan/concurrently
28sys.path.append(os.path.join(os.environ["HOME"], "repos", "concurrently"))
30from concurrently import concurrently # noqa: E402
33def list_queue_urls_in_account(sess, *, prefixes):
35 Generates a list of all the queue URLs in an account.
37 sqs_client = sess.client("sqs")
39 for prefix in prefixes:
40 for page in sqs_client.get_paginator("list_queues").paginate(
41 QueueNamePrefix=prefix
43 yield from page["QueueUrls"]
46def get_queue_stats(sess, *, queue_urls):
48 Get the size of the queues associated with this pipeline.
50 sqs_client = sess.client("sqs")
53 "ApproximateNumberOfMessages",
54 "ApproximateNumberOfMessagesNotVisible",
55 "ApproximateNumberOfMessagesDelayed",
60 for q_url, q_resp in concurrently(
61 handler=lambda q_url: sqs_client.get_queue_attributes(
62 QueueUrl=q_url, AttributeNames=attribute_names
66 queue_responses[q_url] = q_resp
69 q_url: sum(int(resp["Attributes"][attr]) for attr in attribute_names)
70 for q_url, resp in queue_responses.items()
74def print_number(account_id, region_name, queue_name, *, value, color):
76 print("-".rjust(9, " "), end="")
78 spaces_required = 9 - len(humanize.intcomma(value))
84 url=f"https://{region_name}.console.aws.amazon.com/sqs/v2/home?region={region_name}#/queues/https%3A%2F%2Fsqs.{region_name}.amazonaws.com%2F{account_id}%2F{queue_name}",
85 label=humanize.intcomma(value),
93def pprint_queue_stats(account_id, region_name, queue_stats):
94 interesting_queues = {
95 q_url: q_size for q_url, q_size in queue_stats.items() if q_size > 0
98 if not interesting_queues:
99 print("All queues are empty")
102 paired_queues = collections.defaultdict(lambda: {"q": None, "dlq": None})
104 for q_url, q_size in interesting_queues.items():
105 q_name = q_url.split("/")[-1]
106 if q_name.endswith("_dlq"):
107 paired_queues[q_name.replace("_dlq", "")]["dlq"] = q_size
109 paired_queues[q_name]["q"] = q_size
111 for q_name, q_stats in sorted(paired_queues.items()):
112 print_number(account_id, region_name, q_name, value=q_stats["q"], color="green")
114 account_id, region_name, q_name + "_dlq", value=q_stats["dlq"], color="red"
120if __name__ == "__main__":
121 sess = boto3.Session()
123 prefixes = sys.argv[1:] or ("",)
125 queue_urls = list_queue_urls_in_account(sess, prefixes=prefixes)
127 queue_stats = get_queue_stats(sess, queue_urls=queue_urls)
129 sts = sess.client("sts")
130 account_id = sts.get_caller_identity()["Account"]
131 region_name = sess.region_name
133 pprint_queue_stats(account_id, region_name, queue_stats)