Skip to main content

add my download_sqs_message script

ID
2a8a09f
date
2023-04-29 08:39:19+00:00
author
Alex Chan <alex@alexwlchan.net>
parent
0503835
message
add my download_sqs_message script
changed files
1 file, 93 additions

Changed files

aws/download_sqs_messages (0) → aws/download_sqs_messages (2475)

diff --git a/aws/download_sqs_messages b/aws/download_sqs_messages
new file mode 100755
index 0000000..52e0c2a
--- /dev/null
+++ b/aws/download_sqs_messages
@@ -0,0 +1,93 @@
+#!/usr/bin/env python3
+
+import argparse
+import json
+import os
+
+import boto3
+
+from _common import ACCOUNT_NAMES
+
+
+def get_aws_session(*, role_arn):
+    sts_client = boto3.client("sts")
+    assumed_role_object = sts_client.assume_role(
+        RoleArn=role_arn, RoleSessionName="AssumeRoleSession1"
+    )
+    credentials = assumed_role_object["Credentials"]
+
+    return boto3.Session(
+        aws_access_key_id=credentials["AccessKeyId"],
+        aws_secret_access_key=credentials["SecretAccessKey"],
+        aws_session_token=credentials["SessionToken"],
+    )
+
+
+def get_session(*, queue_url):
+    """
+    Return a boto3 Session for publishing to SNS.
+
+    If it recognises the account which contains the queue, it will pick
+    the appropriate IAM role, otherwise it use the default boto3 Session.
+    """
+    # The arn format of an SQS queue URL is:
+    #
+    #       https://sqs.eu-west-1.amazonaws.com/1234567890/queue-name
+    #
+    # Extract the account ID.
+    account_id = queue_url.split("/")[3]
+
+    try:
+        role_arn = (
+            f"arn:aws:iam::{account_id}:role/{ACCOUNT_NAMES[account_id]}-developer"
+        )
+        return get_aws_session(role_arn=role_arn)
+    except KeyError:
+        return boto3.Session()
+
+
+def parse_args():
+    parser = argparse.ArgumentParser(
+        prog=os.path.basename(__file__),
+        description="Download all the messageds from an SQS queue.",
+    )
+
+    parser.add_argument(
+        "QUEUE_URL", help="The URL of the SQS queue to fetch messages from"
+    )
+
+    return parser.parse_args()
+
+
+def download_messages(*, queue_url):
+    sess = get_session(queue_url=queue_url)
+
+    sqs_client = sess.client("sqs")
+
+    while True:
+        resp = sqs_client.receive_message(
+            QueueUrl=queue_url, AttributeNames=["All"], MaxNumberOfMessages=10
+        )
+
+        try:
+            yield from resp["Messages"]
+        except KeyError:
+            return
+
+        entries = [
+            {"Id": msg["MessageId"], "ReceiptHandle": msg["ReceiptHandle"]}
+            for msg in resp["Messages"]
+        ]
+
+        resp = sqs_client.delete_message_batch(QueueUrl=queue_url, Entries=entries)
+
+        if len(resp["Successful"]) != len(entries):
+            raise RuntimeError(
+                f"Failed to delete messages: entries={entries!r} resp={resp!r}"
+            )
+
+
+if __name__ == "__main__":
+    args = parse_args()
+    for message in download_messages(queue_url=args.QUEUE_URL):
+        print(json.dumps(message))