add my ‘s3ls’ script, tidy up ‘s3tree’
- ID
6f79a16- date
2023-04-28 22:42:19+00:00- author
Alex Chan <alex@alexwlchan.net>- parent
1ceb678- message
add my 's3ls' script, tidy up 's3tree'- changed files
5 files, 385 additions, 261 deletions
Changed files
aws/_utils.py (0) → aws/_utils.py (1619)
diff --git a/aws/_utils.py b/aws/_utils.py
new file mode 100755
index 0000000..3c3fd77
--- /dev/null
+++ b/aws/_utils.py
@@ -0,0 +1,61 @@
+#!/usr/bin/env python3
+
+import boto3
+import hyperlink
+
+
+def get_aws_session(*, role_arn):
+ sts_client = boto3.client("sts")
+ assumed_role_object = sts_client.assume_role(
+ RoleArn=role_arn, RoleSessionName="AssumeRoleSession1"
+ )
+ credentials = assumed_role_object["Credentials"]
+
+ return boto3.Session(
+ aws_access_key_id=credentials["AccessKeyId"],
+ aws_secret_access_key=credentials["SecretAccessKey"],
+ aws_session_token=credentials["SessionToken"],
+ )
+
+
+def guess_account(s3_identifier):
+ """
+ Given the name of an S3 bucket, guess the account it belongs to.
+
+ You can pass the name of the bucket, or the S3 URI.
+
+ Examples:
+
+ > guess_account('s3://example-bucket/cat.jpg')
+ {'account_id': '1234567890', 'name': 'example'}
+
+ > guess_account('example-bucket')
+ {'account_id': '1234567890', 'name': 'example'}
+
+ """
+ if "wellcomedigitalworkflow" in s3_identifier:
+ return {
+ "account_id": "299497370133",
+ "name": "workflow",
+ "role_arn": "arn:aws:iam::299497370133:role/workflow-read_only",
+ }
+
+
+def create_s3_session(s3_identifier):
+ account = guess_account(s3_identifier)
+ if account:
+ return get_aws_session(role_arn=account["role_arn"])
+ else:
+ return boto3.Session()
+
+
+def parse_s3_uri(s3_uri):
+ uri = hyperlink.URL.from_text(s3_uri)
+
+ if uri.scheme != "s3":
+ raise ValueError(f"Unrecognised scheme in {s3_uri!r}, expected s3://")
+
+ bucket = uri.host
+ prefix = "/".join(uri.path)
+
+ return {"Bucket": bucket, "Prefix": prefix}
aws/s3ls (0) → aws/s3ls (2031)
diff --git a/aws/s3ls b/aws/s3ls
new file mode 100755
index 0000000..380d7e4
--- /dev/null
+++ b/aws/s3ls
@@ -0,0 +1,80 @@
+#!/usr/bin/env python3
+"""
+A script for listing all the objects in an S3 prefix.
+
+Objects are printed to stdout as JSON, one object per line.
+"""
+
+import argparse
+import datetime
+import json
+import sys
+
+from _utils import create_s3_session, parse_s3_uri
+
+
+def parse_args():
+ parser = argparse.ArgumentParser(
+ prog="s3ls", description="List all the objects in an S3 prefix"
+ )
+
+ parser.add_argument("S3_URI")
+ parser.add_argument(
+ "--with-versions",
+ action="store_true",
+ help="List every version of the objects in S3, not just the latest version",
+ )
+ parser.add_argument(
+ "--start-after", help="Start listing objects at the given key", default=""
+ )
+
+ return parser.parse_args()
+
+
+class DatetimeEncoder(json.JSONEncoder):
+ def default(self, obj):
+ if isinstance(obj, datetime.datetime):
+ return obj.isoformat()
+
+
+def get_objects(sess, **kwargs):
+ """
+ Generates every object in an S3 bucket.
+ """
+ paginator = sess.client("s3").get_paginator("list_objects_v2")
+
+ for page in paginator.paginate(**kwargs):
+ yield from page["Contents"]
+
+
+def get_object_versions(sess, **kwargs):
+ """
+ Generates every version of an object in an S3 bucket.
+ """
+ s3_client = sess.client("s3")
+ paginator = s3_client.get_paginator("list_object_versions")
+
+ for page in paginator.paginate(**kwargs):
+ for key in ("Versions", "DeleteMarkers"):
+ try:
+ yield from page[key]
+ except KeyError:
+ pass
+
+
+if __name__ == "__main__":
+ args = parse_args()
+
+ s3_list_args = parse_s3_uri(args.S3_URI)
+
+ sess = create_s3_session(args.S3_URI)
+
+ if "--with-versions" in sys.argv:
+ iterator = get_object_versions
+ s3_list_args["KeyMarker"] = args.start_after
+ else:
+ iterator = get_objects
+ s3_list_args["StartAfter"] = args.start_after
+
+ for s3_obj in iterator(sess, **s3_list_args):
+ print(json.dumps(s3_obj, cls=DatetimeEncoder))
aws/s3tree (243) → aws/s3tree (7061)
diff --git a/aws/s3tree b/aws/s3tree
index 7ac5705..0496d33 100755
--- a/aws/s3tree
+++ b/aws/s3tree
@@ -1,11 +1,245 @@
-#!/usr/bin/env bash
+#!/usr/bin/env python3
-set -o errexit
-set -o nounset
+import collections
+import datetime
+import os
+import sys
+from typing import List
-if [[ "$@" == "s3://wellcomedigitalworkflow"* ]]
-then
- AWS_PROFILE=workflow-read_only python3 ~/repos/pathscripts/aws/s3tree.py "$@"
-else
- python3 ~/repos/pathscripts/aws/s3tree.py "$@"
-fi
+import attr
+import boto3
+import humanize
+import termcolor
+
+from _utils import create_s3_session, parse_s3_uri
+
+
+def list_s3_objects(sess, **kwargs):
+ s3 = sess.client("s3")
+
+ for page in s3.get_paginator("list_objects_v2").paginate(**kwargs):
+ yield from page.get("Contents", [])
+
+
+def create_link_text(*, url, label):
+ # Based on https://stackoverflow.com/a/71309268/1558022
+
+ # OSC 8 ; params ; URI ST <name> OSC 8 ;; ST
+ return f"\033]8;;{url}\033\\{label}\033]8;;\033\\"
+
+
+def pprint_nested_tree(bucket, tree, folder_counts, parents=None):
+ lines = []
+ parents = parents or []
+
+ if not parents:
+ lines.append(".")
+
+ entries = sorted(tree.items())
+
+ for i, (key, nested_tree) in enumerate(entries, start=1):
+ if parents:
+ full_path = f'{"/".join(parents)}/{key}'
+ else:
+ full_path = key
+ if isinstance(key, str):
+ label = create_link_text(
+ url=f"https://eu-west-1.console.aws.amazon.com/s3/buckets/{bucket}?prefix={full_path}/&showversions=false",
+ label=f"{key}/",
+ )
+ else:
+ label = key
+
+ if full_path in folder_counts:
+ obj_count_line = termcolor.colored(
+ f"...plus {folder_counts[full_path]} object{'s' if folder_counts[full_path] > 1 else ''}",
+ "blue",
+ )
+
+ if i == len(entries) and nested_tree:
+ obj_count_line = f" ├── {obj_count_line}"
+ elif i == len(entries):
+ obj_count_line = f" └── {obj_count_line}"
+ elif nested_tree:
+ obj_count_line = f"│ ├── {obj_count_line}"
+ else:
+ obj_count_line = f"│ └── {obj_count_line}"
+ else:
+ obj_count_line = None
+
+ if i == len(entries):
+ lines.append("└── " + label)
+
+ if obj_count_line is not None:
+ lines.append(obj_count_line)
+
+ lines.extend(
+ [
+ " " + l
+ for l in pprint_nested_tree(
+ bucket,
+ nested_tree,
+ folder_counts=folder_counts,
+ parents=parents + [key],
+ )
+ ]
+ )
+ else:
+ lines.append("├── " + label)
+
+ if obj_count_line is not None:
+ lines.append(obj_count_line)
+
+ lines.extend(
+ [
+ "│ " + l
+ for l in pprint_nested_tree(
+ bucket,
+ nested_tree,
+ folder_counts=folder_counts,
+ parents=parents + [key],
+ )
+ ]
+ )
+
+ return lines
+
+
+@attr.s
+class S3Folder:
+ path: str = attr.ib()
+ objects: List[str] = attr.ib(factory=list)
+ folders = attr.ib(factory=dict) # Mapping[str, S3Folder]
+
+
+def build_s3_tree(keys, path=None):
+ path = path or []
+
+ tree = S3Folder(path="/".join(path))
+
+ per_folder_keys = collections.defaultdict(list)
+
+ for k in keys:
+ if "/" in k:
+ folder_name, entry_name = k.split("/", 1)
+ per_folder_keys[folder_name].append(entry_name)
+ else:
+ per_folder_keys["."].append(k)
+
+ assert sum(len(entries) for entries in per_folder_keys.values()) == len(keys)
+
+ tree.objects = sorted(per_folder_keys.pop(".", []))
+
+ for folder_name, folder_keys in per_folder_keys.items():
+ tree.folders[folder_name] = build_s3_tree(
+ folder_keys, path=path + [folder_name]
+ )
+
+ return tree
+
+
+def pprint_s3tree(*, bucket, tree):
+ lines = []
+
+ # If we're at the top of the tree, we want to print a '.'
+ if tree.path == "":
+ lines.append(".")
+
+ # Start by printing any objects that are in this folder. Print up to
+ # 4 objects, otherwise print 3 and then '...X other objects'
+ if len(tree.objects) == 4:
+ tree_object_count = 4
+ else:
+ tree_object_count = 3
+
+ for i, object_key in enumerate(sorted(tree.objects[:tree_object_count]), start=1):
+ if tree.folders or len(tree.objects) > i:
+ prefix_char = "├─"
+ else:
+ prefix_char = "└─"
+
+ lines.append(f"{prefix_char} {termcolor.colored(object_key, 'blue')}")
+
+ if len(tree.objects) > tree_object_count:
+ if tree.folders:
+ prefix_char = "├─"
+ else:
+ prefix_char = "└─"
+
+ extra_objects = f"...{len(tree.objects) - 3} other objects"
+ lines.append(f"{prefix_char} {termcolor.colored(extra_objects, 'blue')}")
+
+ for i, (folder_name, folder_tree) in enumerate(
+ sorted(tree.folders.items()), start=1
+ ):
+ if tree.path == "":
+ full_path = folder_name
+ else:
+ full_path = "/".join([tree.path, folder_name])
+
+ if len(tree.folders) > i:
+ folder_prefix_char = "├─"
+ sub_prefix_char = "│ "
+ else:
+ folder_prefix_char = "└─"
+ sub_prefix_char = " "
+
+ lines.append(
+ folder_prefix_char
+ + " "
+ + create_link_text(
+ url=f"https://eu-west-1.console.aws.amazon.com/s3/buckets/{bucket}?prefix={full_path}/&showversions=false",
+ label=f"{folder_name}/",
+ )
+ )
+ lines.extend(
+ [
+ f"{sub_prefix_char}{ln}"
+ for ln in pprint_s3tree(bucket=bucket, tree=folder_tree)
+ ]
+ )
+
+ return lines
+
+
+if __name__ == "__main__":
+ try:
+ s3_uri = sys.argv[1]
+ except IndexError:
+ sys.exit(f"Usage: {__file__} <S3_URI>")
+
+ s3_prefix = parse_s3_uri(s3_uri)
+
+ sess = create_s3_session(s3_uri)
+
+ s3_objects = list(list_s3_objects(sess, **s3_prefix))
+
+ if not s3_objects:
+ print("(no objects)")
+ sys.exit(1)
+
+ keys = [s3_obj["Key"] for s3_obj in s3_objects if s3_obj["Size"] > 0]
+
+ tree = build_s3_tree(keys)
+
+ print("\n".join(pprint_s3tree(bucket=s3_prefix["Bucket"], tree=tree)))
+
+ print("")
+ total_size = sum(s3_obj["Size"] for s3_obj in s3_objects)
+ last_modified = max(s3_obj["LastModified"] for s3_obj in s3_objects)
+
+ if last_modified.date() == datetime.date.today():
+ last_modified_message = "today"
+ elif last_modified.year != datetime.date.today().year:
+ last_modified_message = f"in {last_modified.strftime('%B %Y')}"
+ else:
+ last_modified_message = last_modified.strftime("%d %B")
+
+ print(
+ termcolor.colored(
+ f'{humanize.intcomma(len(s3_objects))} object{"s" if len(s3_objects) > 1 else ""}, '
+ f"totalling {humanize.naturalsize(total_size)}, "
+ f"last modified {last_modified_message}",
+ "green",
+ )
+ )
aws/s3tree.py (7223) → aws/s3tree.py (0)
diff --git a/aws/s3tree.py b/aws/s3tree.py
deleted file mode 100755
index a7c3b8f..0000000
--- a/aws/s3tree.py
+++ /dev/null
@@ -1,252 +0,0 @@
-#!/usr/bin/env python3
-
-import collections
-import datetime
-import os
-import sys
-from typing import List
-
-import attr
-import boto3
-import humanize
-import hyperlink
-import termcolor
-
-
-def parse_s3_uri(s3_uri):
- uri = hyperlink.URL.from_text(s3_uri)
-
- if uri.scheme != "s3":
- raise ValueError(f"Unrecognised scheme in {s3_uri!r}, expected s3://")
-
- bucket = uri.host
- prefix = "/".join(uri.path)
-
- return {"Bucket": bucket, "Prefix": prefix}
-
-
-def list_s3_objects(sess, **kwargs):
- s3 = sess.client("s3")
-
- for page in s3.get_paginator("list_objects_v2").paginate(**kwargs):
- yield from page.get("Contents", [])
-
-
-def create_link_text(*, url, label):
- # Based on https://stackoverflow.com/a/71309268/1558022
-
- # OSC 8 ; params ; URI ST <name> OSC 8 ;; ST
- return f"\033]8;;{url}\033\\{label}\033]8;;\033\\"
-
-
-def pprint_nested_tree(bucket, tree, folder_counts, parents=None):
- lines = []
- parents = parents or []
-
- if not parents:
- lines.append(".")
-
- entries = sorted(tree.items())
-
- for i, (key, nested_tree) in enumerate(entries, start=1):
- if parents:
- full_path = f'{"/".join(parents)}/{key}'
- else:
- full_path = key
- if isinstance(key, str):
- label = create_link_text(
- url=f"https://eu-west-1.console.aws.amazon.com/s3/buckets/{bucket}?prefix={full_path}/&showversions=false",
- label=f"{key}/",
- )
- else:
- label = key
-
- if full_path in folder_counts:
- obj_count_line = termcolor.colored(
- f"...plus {folder_counts[full_path]} object{'s' if folder_counts[full_path] > 1 else ''}",
- "blue",
- )
-
- if i == len(entries) and nested_tree:
- obj_count_line = f" ├── {obj_count_line}"
- elif i == len(entries):
- obj_count_line = f" └── {obj_count_line}"
- elif nested_tree:
- obj_count_line = f"│ ├── {obj_count_line}"
- else:
- obj_count_line = f"│ └── {obj_count_line}"
- else:
- obj_count_line = None
-
- if i == len(entries):
- lines.append("└── " + label)
-
- if obj_count_line is not None:
- lines.append(obj_count_line)
-
- lines.extend(
- [
- " " + l
- for l in pprint_nested_tree(
- bucket,
- nested_tree,
- folder_counts=folder_counts,
- parents=parents + [key],
- )
- ]
- )
- else:
- lines.append("├── " + label)
-
- if obj_count_line is not None:
- lines.append(obj_count_line)
-
- lines.extend(
- [
- "│ " + l
- for l in pprint_nested_tree(
- bucket,
- nested_tree,
- folder_counts=folder_counts,
- parents=parents + [key],
- )
- ]
- )
-
- return lines
-
-
-@attr.s
-class S3Folder:
- path: str = attr.ib()
- objects: List[str] = attr.ib(factory=list)
- folders = attr.ib(factory=dict) # Mapping[str, S3Folder]
-
-
-def build_s3_tree(keys, path=None):
- path = path or []
-
- tree = S3Folder(path="/".join(path))
-
- per_folder_keys = collections.defaultdict(list)
-
- for k in keys:
- if "/" in k:
- folder_name, entry_name = k.split("/", 1)
- per_folder_keys[folder_name].append(entry_name)
- else:
- per_folder_keys["."].append(k)
-
- assert sum(len(entries) for entries in per_folder_keys.values()) == len(keys)
-
- tree.objects = sorted(per_folder_keys.pop(".", []))
-
- for folder_name, folder_keys in per_folder_keys.items():
- tree.folders[folder_name] = build_s3_tree(
- folder_keys, path=path + [folder_name]
- )
-
- return tree
-
-
-def pprint_s3tree(*, bucket, tree):
- lines = []
-
- # If we're at the top of the tree, we want to print a '.'
- if tree.path == "":
- lines.append(".")
-
- # Start by printing any objects that are in this folder. Print up to
- # 4 objects, otherwise print 3 and then '...X other objects'
- if len(tree.objects) == 4:
- tree_object_count = 4
- else:
- tree_object_count = 3
-
- for i, object_key in enumerate(sorted(tree.objects[:tree_object_count]), start=1):
- if tree.folders or len(tree.objects) > i:
- prefix_char = "├─"
- else:
- prefix_char = "└─"
-
- lines.append(f"{prefix_char} {termcolor.colored(object_key, 'blue')}")
-
- if len(tree.objects) > tree_object_count:
- if tree.folders:
- prefix_char = "├─"
- else:
- prefix_char = "└─"
-
- extra_objects = f"...{len(tree.objects) - 3} other objects"
- lines.append(f"{prefix_char} {termcolor.colored(extra_objects, 'blue')}")
-
- for i, (folder_name, folder_tree) in enumerate(
- sorted(tree.folders.items()), start=1
- ):
- if tree.path == "":
- full_path = folder_name
- else:
- full_path = "/".join([tree.path, folder_name])
-
- if len(tree.folders) > i:
- folder_prefix_char = "├─"
- sub_prefix_char = "│ "
- else:
- folder_prefix_char = "└─"
- sub_prefix_char = " "
-
- lines.append(
- folder_prefix_char
- + " "
- + create_link_text(
- url=f"https://eu-west-1.console.aws.amazon.com/s3/buckets/{bucket}?prefix={full_path}/&showversions=false",
- label=f"{folder_name}/",
- )
- )
- lines.extend(
- [
- f"{sub_prefix_char}{ln}"
- for ln in pprint_s3tree(bucket=bucket, tree=folder_tree)
- ]
- )
-
- return lines
-
-
-if __name__ == "__main__":
- try:
- s3_uri = sys.argv[1]
- except IndexError:
- sys.exit(f"Usage: {__file__} <S3_URI>")
-
- s3_prefix = parse_s3_uri(s3_uri)
-
- sess = boto3.Session()
-
- s3_objects = list(list_s3_objects(sess, **s3_prefix))
-
- keys = [s3_obj["Key"] for s3_obj in s3_objects if s3_obj["Size"] > 0]
-
- tree = build_s3_tree(keys)
-
- print("\n".join(pprint_s3tree(bucket=s3_prefix["Bucket"], tree=tree)))
-
- print("")
- total_size = sum(s3_obj["Size"] for s3_obj in s3_objects)
- last_modified = max(s3_obj["LastModified"] for s3_obj in s3_objects)
-
- if last_modified.date() == datetime.date.today():
- last_modified_message = "today"
- elif last_modified.year != datetime.date.today().year:
- last_modified_message = f"in {last_modified.strftime('%B %Y')}"
- else:
- last_modified_message = last_modified.strftime('%d %B')
-
- print(
- termcolor.colored(
- f'{humanize.intcomma(len(s3_objects))} object{"s" if len(s3_objects) > 1 else ""}, '
- f"totalling {humanize.naturalsize(total_size)}, "
- f"last modified {last_modified_message}",
- "green",
- )
- )
aws/s3tree.py (0) → aws/s3tree.py (6)
diff --git a/aws/s3tree.py b/aws/s3tree.py
new file mode 120000
index 0000000..086f1fa
--- /dev/null
+++ b/aws/s3tree.py
@@ -0,0 +1 @@
+s3tree
\ No newline at end of file