Skip to main content

Add a script for bulk publishing to SNS

ID
c33a490
date
2022-05-14 08:54:20+00:00
author
Alex Chan <alex@alexwlchan.net>
parent
57fa7c1
message
Add a script for bulk publishing to SNS
changed files
1 file, 74 additions

Changed files

bulk_sns_publish (0) → bulk_sns_publish (2062)

diff --git a/bulk_sns_publish b/bulk_sns_publish
new file mode 100755
index 0000000..08b5577
--- /dev/null
+++ b/bulk_sns_publish
@@ -0,0 +1,74 @@
+#!/usr/bin/env python3
+"""
+This is a script for bulk publishing messages to SNS.
+
+Suppose I have a large collection of messages I want to send to SNS,
+stored as lines in a text file, e.g. some Wellcome catalogue IDs [1].
+
+    xfcrpna3
+    qf8sxvxm
+    ed3w4fv9
+    d4aahw7u
+    hwfrryuz
+
+I could loop through the file line-by-line and send them to SNS one-by-one,
+but that's slow and inefficient.  It would be more efficient to use the
+SNS PublishBatch API to send them ten at a time.
+
+This script provides a convenient wrapper for doing so.
+
+[1]: https://github.com/wellcomecollection/catalogue-pipeline/tree/main/pipeline/id_minter
+
+"""
+
+import os
+import secrets
+import sys
+
+import boto3
+import click
+import more_itertools
+import tqdm
+
+# https://github.com/alexwlchan/concurrently/blob/main/concurrently.py
+sys.path.append(os.path.join(os.environ["HOME"], "repos", "concurrently"))
+
+from concurrently import concurrently
+
+
+@click.command()
+@click.argument("INPUT_FILE", required=True)
+@click.option("--topic-arn", required=True)
+@click.option("--parallelism", required=True, type=int)
+def main(input_file, topic_arn, parallelism):
+    def inputs():
+        with open(input_file) as messages:
+            for i, batch in enumerate(more_itertools.chunked(messages, n=10)):
+                batch = [line.rstrip() for line in batch]
+
+                batch_request_entries = [
+                    {"Id": secrets.token_hex(), "Message": message} for message in batch
+                ]
+
+                yield batch_request_entries
+
+    sess = boto3.Session()
+    sns_client = sess.client("sns")
+
+    def publish(batch_request_entries):
+        sns_client.publish_batch(
+            TopicArn=topic_arn, PublishBatchRequestEntries=batch_request_entries
+        )
+
+    total_entries = sum(len(sns_in) for sns_in in inputs())
+
+    with tqdm.tqdm(total=total_entries) as pbar:
+        for (sns_in, sns_out) in concurrently(
+            publish, inputs(), max_concurrency=parallelism
+        ):
+            pbar.update(len(sns_in))
+
+
+if __name__ == "__main__":
+
+    main()