Skip to main content

Continue simplifying the code

ID
b98c0e3
date
2024-02-11 00:15:31+00:00
author
Alex Chan <alex@alexwlchan.net>
parent
1c7ad78
message
Continue simplifying the code
changed files
1 file, 33 additions, 49 deletions

Changed files

web/save_tumblr_likes.py (12626) → web/save_tumblr_likes.py (12317)

diff --git a/web/save_tumblr_likes.py b/web/save_tumblr_likes.py
index 81d2792..fd5e1b8 100644
--- a/web/save_tumblr_likes.py
+++ b/web/save_tumblr_likes.py
@@ -1,4 +1,3 @@
-import contextlib
 import datetime
 import errno
 import functools
@@ -10,13 +9,13 @@ import subprocess
 import sys
 import textwrap
 from urllib.parse import parse_qs, urlparse
-from urllib.request import urlretrieve
 
 from bs4 import BeautifulSoup
 import httpx
 import hyperlink
 import keyring
-from sqlite_utils import Database, NotFoundError
+from sqlite_utils import Database
+from sqlite_utils.db import NotFoundError
 import termcolor
 
 
@@ -27,7 +26,7 @@ def get_liked_posts(blog_identifier: str, days: int):
     client = httpx.Client(
         base_url="https://api.tumblr.com/v2/blog",
         params={"api_key": keyring.get_password("tumblr", "api_key")},
-        headers={"User-Agent": "Alex Chan's personal scripts; alex@alexwlchan.net"}
+        headers={"User-Agent": "Alex Chan's personal scripts; alex@alexwlchan.net"},
     )
 
     params = {}
@@ -63,23 +62,6 @@ def get_liked_posts(blog_identifier: str, days: int):
         params.update(resp.json()["response"]["_links"]["next"]["query_params"])
 
 
-@contextlib.contextmanager
-def changedir(newdir):
-    """Changes the current working directory."""
-    # https://stackoverflow.com/a/24176022/1558022
-    prevdir = os.getcwd()
-    os.chdir(os.path.expanduser(newdir))
-    try:
-        yield
-    finally:
-        os.chdir(prevdir)
-
-
-def youtube_dl(*args):
-    return subprocess.check_output(["yt-dlp"] + list(args)).strip().decode("utf8")
-
-
-
 def log_result(format_template):
     def decorator(inner_fn):
         @functools.wraps(inner_fn)
@@ -92,6 +74,7 @@ def log_result(format_template):
                     textwrap.fill(str(exc), width=85), prefix=" " * 4
                 )
                 print(termcolor.colored(f"✘ {description}\n{wrapped_error}", "red"))
+                raise
             else:
                 print(termcolor.colored(f"✔ {description}", "green"))
                 return result
@@ -112,12 +95,9 @@ def get_saved_blog_name(*, post_id: str, blog_name: str, db_path: pathlib.Path) 
     db = Database(db_path)
 
     try:
-        return db['tumblr_posts'].get(post_id)['blog_name']
+        return db["tumblr_posts"].get(post_id)["blog_name"]
     except NotFoundError:
-        db['tumblr_posts'].insert({
-            'post_id': post_id,
-            'blog_name': blog_name
-        })
+        db["tumblr_posts"].insert({"post_id": post_id, "blog_name": blog_name})
 
         return blog_name
 
@@ -125,12 +105,14 @@ def get_saved_blog_name(*, post_id: str, blog_name: str, db_path: pathlib.Path) 
 @log_result("{post_url}")
 def download_tumblr_post(*, post_url, post_data, download_root):
     blog_name = get_saved_blog_name(
-        post_id=post_data['id'],
-        blog_name=post_data['blog_name'],
-        cache_path=download_root / "post_ids.db"
+        post_id=post_data["id"],
+        blog_name=post_data["blog_name"],
+        db_path=download_root / "post_ids.db",
     )
 
-    download_dir = download_root / blog_name[0].lower() / blog_name / str(post_data["id"])
+    download_dir = (
+        download_root / blog_name[0].lower() / blog_name / str(post_data["id"])
+    )
     download_dir.mkdir(exist_ok=True, parents=True)
 
     try:
@@ -156,9 +138,10 @@ def download_tumblr_post(*, post_url, post_data, download_root):
             continue
 
         try:
-            download_asset_url(post_data=post_data, url=asset_url, download_dir=download_dir)
+            download_asset_url(
+                post_data=post_data, url=asset_url, download_dir=download_dir
+            )
         except CannotDownloadAsset:
-
             has_missing_assets = True
             with open(missing_assets, "a") as out_file:
                 out_file.write(asset_url + "\n")
@@ -249,7 +232,6 @@ def get_asset_urls(post_data):
             )
 
     elif post_data["type"] == "audio":
-
         # Exammple contents of the "player" field:
         #
         #     <iframe
@@ -296,7 +278,6 @@ def get_asset_urls(post_data):
         raise ValueError(f"Unrecognised post type: {post_id!r} ({post_type})")
 
 
-
 def download_asset_url(url, *, post_data, download_dir):
     parsed_url = hyperlink.URL.from_text(url)
 
@@ -306,24 +287,28 @@ def download_asset_url(url, *, post_data, download_dir):
         if out_path.exists():
             return
 
-        local_filename, _ = urlretrieve(url)
+        tmp_path = str(out_path) + ".tmp"
+
+        r = httpx.get(url)
+
+        with open(tmp_path, "wb") as tmp_file:
+            tmp_file.write(r.content)
 
         try:
-            os.rename(local_filename, out_path)
+            os.rename(tmp_path, out_path)
         except OSError as err:
             if err.errno == errno.EXDEV:
-                shutil.move(local_filename, out_path)
+                shutil.move(tmp_path, out_path)
             else:
                 raise
 
         return out_path
 
     elif (
-        ("youtube.com" in parsed_url.host) or
-        ("vimeo.com" in parsed_url.host) or
-        ("instagram.com" in parsed_url.host)
+        ("youtube.com" in parsed_url.host)
+        or ("vimeo.com" in parsed_url.host)
+        or ("instagram.com" in parsed_url.host)
     ):
-
         # Check if the video (or a video with a similar-looking name) has already
         # been downloaded before trying to download it again.
         if "youtube.com" in parsed_url.host:
@@ -349,12 +334,11 @@ def download_asset_url(url, *, post_data, download_dir):
         ):
             return
 
-        with changedir(download_dir):
-            try:
-                youtube_dl(url)
-                return
-            except subprocess.CalledProcessError:
-                raise CannotDownloadAsset()
+        try:
+            subprocess.check_call(["yt-dlp", url], cwd=download_dir)
+            return
+        except subprocess.CalledProcessError:
+            raise CannotDownloadAsset()
 
     assert 0, url
 
@@ -363,8 +347,8 @@ class CannotDownloadAsset(Exception):
     pass
 
 
-if __name__ == '__main__':
-    for post_data in get_liked_posts(blog_identifier='alexwlchan.tumblr.com', days=120):
+if __name__ == "__main__":
+    for post_data in get_liked_posts(blog_identifier="alexwlchan.tumblr.com", days=120):
         download_tumblr_post(
             post_url=post_data["post_url"],
             post_data=post_data,