Start to tidy up some of the code
- ID
1c7ad78- date
2024-02-11 00:10:59+00:00- author
Alex Chan <alex@alexwlchan.net>- parent
4f50d19- message
Start to tidy up some of the code- changed files
1 file, 15 additions, 42 deletions
Changed files
web/save_tumblr_likes.py (13459) → web/save_tumblr_likes.py (12626)
diff --git a/web/save_tumblr_likes.py b/web/save_tumblr_likes.py
index 6024f2d..81d2792 100644
--- a/web/save_tumblr_likes.py
+++ b/web/save_tumblr_likes.py
@@ -6,7 +6,6 @@ import json
import os
import pathlib
import shutil
-import sqlite3
import subprocess
import sys
import textwrap
@@ -17,6 +16,7 @@ from bs4 import BeautifulSoup
import httpx
import hyperlink
import keyring
+from sqlite_utils import Database, NotFoundError
import termcolor
@@ -79,21 +79,6 @@ def youtube_dl(*args):
return subprocess.check_output(["yt-dlp"] + list(args)).strip().decode("utf8")
-@contextlib.contextmanager
-def sqlite3_cursor(path):
- """
- Get a cursor to a sqlite3 database that automatically commits and closes
- the result.
- """
- if not path.exists():
- path.parent.mkdir(exist_ok=True, parents=True)
- conn = sqlite3.connect(path)
- cursor = conn.cursor()
- yield cursor
- conn.commit()
- conn.close()
-
-
def log_result(format_template):
def decorator(inner_fn):
@@ -116,45 +101,33 @@ def log_result(format_template):
return decorator
-def get_blog_name(*, post_data, cache_path):
+def get_saved_blog_name(*, post_id: str, blog_name: str, db_path: pathlib.Path) -> str:
"""
Look up the blog name associated with this post ID.
It might be the blog name that's currently in use, or it might be that
- we've already saved this post under a different name -- if so, defer to
+ we've already saved this post under a different name -- if so, prefer
the already-saved name.
"""
- with sqlite3_cursor(cache_path) as cursor:
- try:
- cursor.execute(
- "SELECT blog_name FROM tumblr_posts WHERE post_id=?", (post_data["id"],)
- )
- except sqlite3.OperationalError as err:
- if str(err) == "no such table: tumblr_posts":
- cursor.execute(
- "CREATE TABLE tumblr_posts (post_id TEXT PRIMARY KEY, blog_name TEXT)"
- )
- else:
- raise
- else:
- try:
- (blog_name,) = cursor.fetchone()
- return blog_name
- except TypeError: # cannot unpack non-iterable NoneType object
- pass
+ db = Database(db_path)
- blog_name = post_data["blog_name"]
+ try:
+ return db['tumblr_posts'].get(post_id)['blog_name']
+ except NotFoundError:
+ db['tumblr_posts'].insert({
+ 'post_id': post_id,
+ 'blog_name': blog_name
+ })
- cursor.execute(
- "INSERT INTO tumblr_posts VALUES (?,?)", (post_data["id"], blog_name)
- )
return blog_name
@log_result("{post_url}")
def download_tumblr_post(*, post_url, post_data, download_root):
- blog_name = get_blog_name(
- post_data=post_data, cache_path=download_root / "post_ids.db"
+ blog_name = get_saved_blog_name(
+ post_id=post_data['id'],
+ blog_name=post_data['blog_name'],
+ cache_path=download_root / "post_ids.db"
)
download_dir = download_root / blog_name[0].lower() / blog_name / str(post_data["id"])