clean up some old AWS scripts
- ID
61394ba- date
2026-02-08 17:21:39+00:00- author
Alex Chan <alex@alexwlchan.net>- parent
53becaa- message
clean up some old AWS scripts- changed files
3 files, 1 addition, 271 deletions
Changed files
aws/README.md (7648) → aws/README.md (6275)
diff --git a/aws/README.md b/aws/README.md
index 4cc0632..92a6bdd 100644
--- a/aws/README.md
+++ b/aws/README.md
@@ -18,20 +18,6 @@ folder_name = "aws"
scripts = [
{
- "name": "bulk_sns_publish.py",
- "description": """
- a tool for publishing lots of messages to SNS, using the <code>PublishBatch</code> API.
- See <a href="https://alexwlchan.net/2023/my-sns-firehose/">Publishing lots and lots of messages to SNS</a>.
- """
- },
- {
- "name": "download_sqs_messages.py",
- "description": """
- a tool for downloading lots of messages from SQS, using the <code>ReceiveMessage</code> API.
- See <a href="https://alexwlchan.net/2018/downloading-sqs-queues/">Getting every message in an SQS queue</a>.
- """
- },
- {
"name": "dynamols.py",
"description": """
print the items in a DynamoDB table, one item per line
@@ -91,26 +77,6 @@ cog_helpers.create_description_table(folder_name=folder_name, scripts=scripts)
]]]-->
<dl>
<dt>
- <a href="https://github.com/alexwlchan/scripts/blob/main/aws/bulk_sns_publish.py">
- <code>bulk_sns_publish.py</code>
- </a>
- </dt>
- <dd>
- a tool for publishing lots of messages to SNS, using the <code>PublishBatch</code> API.
- See <a href="https://alexwlchan.net/2023/my-sns-firehose/">Publishing lots and lots of messages to SNS</a>.
- </dd>
-
- <dt>
- <a href="https://github.com/alexwlchan/scripts/blob/main/aws/download_sqs_messages.py">
- <code>download_sqs_messages.py</code>
- </a>
- </dt>
- <dd>
- a tool for downloading lots of messages from SQS, using the <code>ReceiveMessage</code> API.
- See <a href="https://alexwlchan.net/2018/downloading-sqs-queues/">Getting every message in an SQS queue</a>.
- </dd>
-
- <dt>
<a href="https://github.com/alexwlchan/scripts/blob/main/aws/dynamols.py">
<code>dynamols.py</code>
</a>
@@ -184,7 +150,7 @@ cog_helpers.create_description_table(folder_name=folder_name, scripts=scripts)
<img src="screenshots/sqs_stats.png">
</dd>
</dl>
-<!-- [[[end]]] (sum: 1fPAA932ZF) -->
+<!-- [[[end]]] (sum: MTjBzgY4Ng) -->
## Guessing the right account
aws/bulk_sns_publish.py (4539) → aws/bulk_sns_publish.py (0)
diff --git a/aws/bulk_sns_publish.py b/aws/bulk_sns_publish.py
deleted file mode 100755
index d9ef4ba..0000000
--- a/aws/bulk_sns_publish.py
+++ /dev/null
@@ -1,154 +0,0 @@
-#!/usr/bin/env python3
-"""
-This is a script for bulk publishing messages to SNS.
-
-Suppose I have a large collection of messages I want to send to SNS,
-stored as lines in a text file, e.g. some Wellcome catalogue IDs [1].
-
- xfcrpna3
- qf8sxvxm
- ed3w4fv9
- d4aahw7u
- hwfrryuz
-
-I could loop through the file line-by-line and send them to SNS one-by-one,
-but that's slow and inefficient. It would be more efficient to use the
-SNS PublishBatch API to send them ten at a time.
-
-This script provides a convenient wrapper for doing so.
-
-[1]: https://github.com/wellcomecollection/catalogue-pipeline/tree/main/pipeline/id_minter
-
-"""
-
-import argparse
-import functools
-import itertools
-import os
-import sys
-import uuid
-
-import boto3
-import tqdm
-
-from _common import ACCOUNT_NAMES
-
-# https://github.com/alexwlchan/concurrently
-sys.path.append(os.path.join(os.environ["HOME"], "repos", "concurrently"))
-
-from concurrently import concurrently # noqa: E402
-
-
-def get_aws_session(*, role_arn):
- sts_client = boto3.client("sts")
- assumed_role_object = sts_client.assume_role(
- RoleArn=role_arn, RoleSessionName="AssumeRoleSession1"
- )
- credentials = assumed_role_object["Credentials"]
-
- return boto3.Session(
- aws_access_key_id=credentials["AccessKeyId"],
- aws_secret_access_key=credentials["SecretAccessKey"],
- aws_session_token=credentials["SessionToken"],
- )
-
-
-def get_session(*, topic_arn):
- """
- Return a boto3 Session for publishing to SNS.
-
- If it recognises the account which contains the topic, it will pick
- the appropriate IAM role, otherwise it use the default boto3 Session.
- """
- # The arn format of an SNS topic is:
- #
- # arn:aws:sns:{region}:{account_id}:{topic_name}
- #
- # Extract the account ID.
- account_id = topic_arn.split(":")[4]
-
- try:
- role_arn = (
- f"arn:aws:iam::{account_id}:role/{ACCOUNT_NAMES[account_id]}-developer"
- )
- return get_aws_session(role_arn=role_arn)
- except KeyError:
- return boto3.Session()
-
-
-def chunked_iterable(iterable, size):
- """
- Break an iterable into pieces of the given size.
-
- See https://alexwlchan.net/2018/iterating-in-fixed-size-chunks/
- """
- it = iter(iterable)
- while True:
- chunk = tuple(itertools.islice(it, size))
- if not chunk:
- break
- yield chunk
-
-
-def get_batch_entries(path):
- """
- Given a file which contains one notification per line, generate a series
- of values that can be passed as the `PublishBatchRequestEntries` argument
- to the `Sns.publish_batch` method.
- """
- for batch in chunked_iterable(open(path), size=10):
- yield [{"Id": str(uuid.uuid4()), "Message": line.strip()} for line in batch]
-
-
-def parse_args():
- parser = argparse.ArgumentParser(
- prog=os.path.basename(__file__),
- description="Publish lots of notifications to Amazon SNS.",
- )
-
- parser.add_argument(
- "INPUT_FILE", help="A path containing notifications to send, one per line"
- )
- parser.add_argument(
- "--topic-arn", help="The ARN of the SNS topic to publish to", required=True
- )
-
- return parser.parse_args()
-
-
-def publish_batch(sns_client, topic_arn, batch_entries):
- resp = sns_client.publish_batch(
- TopicArn=topic_arn, PublishBatchRequestEntries=batch_entries
- )
-
- # This is to account for any failures in sending messages to SNS.
- # I've never actually had this happen in practice so I've not written
- # any code to handle it (I'd probably just retry the whole script)
- # but I include it just in case.
- assert len(resp["Failed"]) == 0, resp
-
-
-def publish_messages(*, input_file, topic_arn):
- sess = get_session(topic_arn=topic_arn)
-
- # Note: creating boto3 clients isn't thread-safe, so it's important
- # to create it once rather than creating it multiple times in the
- # concurrently() handler.
- #
- # See https://github.com/boto/boto3/issues/801
- sns_client = sess.client("sns")
-
- total_entries = sum(len(entries) for entries in get_batch_entries(input_file))
-
- with tqdm.tqdm(total=total_entries) as pbar:
- for batch, _ in concurrently(
- handler=functools.partial(publish_batch, sns_client, topic_arn),
- inputs=get_batch_entries(input_file),
- max_concurrency=8,
- ):
- pbar.update(len(batch))
-
-
-if __name__ == "__main__":
- args = parse_args()
- publish_messages(input_file=args.INPUT_FILE, topic_arn=args.topic_arn)
aws/download_sqs_messages.py (2111) → aws/download_sqs_messages.py (0)
diff --git a/aws/download_sqs_messages.py b/aws/download_sqs_messages.py
deleted file mode 100755
index 3582189..0000000
--- a/aws/download_sqs_messages.py
+++ /dev/null
@@ -1,82 +0,0 @@
-#!/usr/bin/env python3
-
-import json
-import sys
-
-import boto3
-import tqdm
-
-from _common import ACCOUNT_NAMES, get_aws_session
-
-
-def list_queue_urls():
- sess = boto3.Session()
- paginator = sess.client("sqs").get_paginator("list_queues")
-
- for page in paginator.paginate():
- yield from page["QueueUrls"]
-
-
-def get_session(*, queue_url):
- """
- Return a boto3 Session for publishing to SNS.
-
- If it recognises the account which contains the queue, it will pick
- the appropriate IAM role, otherwise it use the default boto3 Session.
- """
- # The arn format of an SQS queue URL is:
- #
- # https://sqs.eu-west-1.amazonaws.com/1234567890/queue-name
- #
- # Extract the account ID.
- account_id = queue_url.split("/")[3]
-
- try:
- role_arn = (
- f"arn:aws:iam::{account_id}:role/{ACCOUNT_NAMES[account_id]}-developer"
- )
- return get_aws_session(role_arn=role_arn)
- except KeyError:
- return boto3.Session()
-
-
-def download_messages(*, queue_url):
- sess = get_session(queue_url=queue_url)
-
- sqs_client = sess.client("sqs")
-
- while True:
- resp = sqs_client.receive_message(
- QueueUrl=queue_url, AttributeNames=["All"], MaxNumberOfMessages=10
- )
-
- try:
- yield from resp["Messages"]
- except KeyError:
- return
-
- entries = [
- {"Id": msg["MessageId"], "ReceiptHandle": msg["ReceiptHandle"]}
- for msg in resp["Messages"]
- ]
-
- resp = sqs_client.delete_message_batch(QueueUrl=queue_url, Entries=entries)
-
- if len(resp["Successful"]) != len(entries):
- raise RuntimeError(
- f"Failed to delete messages: entries={entries!r} resp={resp!r}"
- )
-
-
-if __name__ == "__main__":
- try:
- queue_url = sys.argv[1]
- except IndexError:
- queue_url = None
-
- if queue_url is not None:
- for message in tqdm.tqdm(download_messages(queue_url=queue_url)):
- print(json.dumps(message))
- else:
- for queue_url in list_queue_urls():
- print(queue_url)