Skip to main content

aws/sqs_stats.py

1#!/usr/bin/env python3
2"""
3Print a summary of messages visible on our SQS queues (and dead-letter queues).
5 $ sqs_stats
6 1 - calm-windows
7 - 3 catalogue-2023-03-29_image_inferrer_input
8 - 1,246 catalogue-2023-03-29_ingestor_works_input
9 1 - sierra-adapter-20200604-sierra_bibs_windows
11Note: this relies on a Wellcome-specific convention that the dead-letter queue
12associated with a queue has the same name but with `_dlq` on the end,
13e.g. `calm-windows` and `calm-windows_dlq`.
15"""
17import collections
18import os
19import sys
21import boto3
22import humanize
23import termcolor
25from _common import create_link_text
27# https://github.com/alexwlchan/concurrently
28sys.path.append(os.path.join(os.environ["HOME"], "repos", "concurrently"))
30from concurrently import concurrently # noqa: E402
33def list_queue_urls_in_account(sess, *, prefixes):
34 """
35 Generates a list of all the queue URLs in an account.
36 """
37 sqs_client = sess.client("sqs")
39 for prefix in prefixes:
40 for page in sqs_client.get_paginator("list_queues").paginate(
41 QueueNamePrefix=prefix
42 ):
43 yield from page["QueueUrls"]
46def get_queue_stats(sess, *, queue_urls):
47 """
48 Get the size of the queues associated with this pipeline.
49 """
50 sqs_client = sess.client("sqs")
52 attribute_names = [
53 "ApproximateNumberOfMessages",
54 "ApproximateNumberOfMessagesNotVisible",
55 "ApproximateNumberOfMessagesDelayed",
56 ]
58 queue_responses = {}
60 for q_url, q_resp in concurrently(
61 handler=lambda q_url: sqs_client.get_queue_attributes(
62 QueueUrl=q_url, AttributeNames=attribute_names
63 ),
64 inputs=queue_urls,
65 ):
66 queue_responses[q_url] = q_resp
68 return {
69 q_url: sum(int(resp["Attributes"][attr]) for attr in attribute_names)
70 for q_url, resp in queue_responses.items()
71 }
74def print_number(account_id, region_name, queue_name, *, value, color):
75 if value is None:
76 print("-".rjust(9, " "), end="")
77 else:
78 spaces_required = 9 - len(humanize.intcomma(value))
80 print(
81 termcolor.colored(
82 " " * spaces_required
83 + create_link_text(
84 url=f"https://{region_name}.console.aws.amazon.com/sqs/v2/home?region={region_name}#/queues/https%3A%2F%2Fsqs.{region_name}.amazonaws.com%2F{account_id}%2F{queue_name}",
85 label=humanize.intcomma(value),
86 ),
87 color,
88 ),
89 end="",
90 )
93def pprint_queue_stats(account_id, region_name, queue_stats):
94 interesting_queues = {
95 q_url: q_size for q_url, q_size in queue_stats.items() if q_size > 0
96 }
98 if not interesting_queues:
99 print("All queues are empty")
100 return
102 paired_queues = collections.defaultdict(lambda: {"q": None, "dlq": None})
104 for q_url, q_size in interesting_queues.items():
105 q_name = q_url.split("/")[-1]
106 if q_name.endswith("_dlq"):
107 paired_queues[q_name.replace("_dlq", "")]["dlq"] = q_size
108 else:
109 paired_queues[q_name]["q"] = q_size
111 for q_name, q_stats in sorted(paired_queues.items()):
112 print_number(account_id, region_name, q_name, value=q_stats["q"], color="green")
113 print_number(
114 account_id, region_name, q_name + "_dlq", value=q_stats["dlq"], color="red"
115 )
116 print("\t", end="")
117 print(q_name)
120if __name__ == "__main__":
121 sess = boto3.Session()
123 prefixes = sys.argv[1:] or ("",)
125 queue_urls = list_queue_urls_in_account(sess, prefixes=prefixes)
127 queue_stats = get_queue_stats(sess, queue_urls=queue_urls)
129 sts = sess.client("sts")
130 account_id = sts.get_caller_identity()["Account"]
131 region_name = sess.region_name
133 pprint_queue_stats(account_id, region_name, queue_stats)