fetch: improve the code based on my blog post review
- ID
416106e- date
2026-04-23 07:17:06+00:00- author
Alex Chan <alex@alexwlchan.net>- parent
ca6964e- message
fetch: improve the code based on my blog post review- changed files
1 file, 16 additions, 32 deletions
Changed files
src/chives/fetch.py (3218) → src/chives/fetch.py (2920)
diff --git a/src/chives/fetch.py b/src/chives/fetch.py
index 21ba817..92f0f3c 100644
--- a/src/chives/fetch.py
+++ b/src/chives/fetch.py
@@ -4,7 +4,6 @@ Make HTTP requests using the standard library.
from pathlib import Path
import ssl
-from typing import Literal
import urllib.parse
import urllib.request
@@ -24,23 +23,19 @@ def build_request(
url: str, *, params: QueryParams | None = None, headers: Headers | None = None
) -> urllib.request.Request:
"""
- Build a request based on the given inputs.
+ Build a urllib Request, appending query parameters and attaching headers.
"""
- if isinstance(params, dict):
- params = [(k, v) for k, v in params.items()]
if params is not None:
+ params_list = list(params.items()) if isinstance(params, dict) else params
+
u = urllib.parse.urlsplit(url)
- query = urllib.parse.parse_qsl(u.query) + params
+ query = urllib.parse.parse_qsl(u.query) + params_list
new_query = urllib.parse.urlencode(query)
url = urllib.parse.urlunsplit(
(u.scheme, u.netloc, u.path, new_query, u.fragment)
)
- req = urllib.request.Request(url)
-
- if headers:
- for name, value in headers.items():
- req.add_header(name, value)
+ req = urllib.request.Request(url, headers=headers or {})
return req
@@ -55,26 +50,20 @@ def fetch_url(
req = build_request(url, params=params, headers=headers)
with urllib.request.urlopen(req, context=ssl_context) as resp:
- data = resp.read()
-
- assert isinstance(data, bytes), type(data)
+ data: bytes = resp.read()
return data
-ImageFormat = Literal["jpg", "png", "gif", "webp"]
-
-
-def _guess_image_format(content_type: str | None) -> ImageFormat:
+def choose_filename_extension(content_type: str | None) -> str:
"""
- Given the Content-Type response header, guess the image format.
+ Choose a filename extension for an image downloaded with the given
+ Content-Type header.
"""
if content_type is None:
- raise RuntimeError(
- "no Content-Type header in response, cannot guess image format"
- )
+ raise ValueError("no Content-Type header, cannot determine image format")
- content_type_mapping: dict[str, ImageFormat] = {
+ content_type_mapping = {
"image/jpeg": "jpg",
"image/png": "png",
"image/gif": "gif",
@@ -84,7 +73,7 @@ def _guess_image_format(content_type: str | None) -> ImageFormat:
try:
return content_type_mapping[content_type]
except KeyError:
- raise ValueError(f"unrecognised image format: {content_type}")
+ raise ValueError(f"unrecognised Content-Type header: {content_type}")
def download_image(
@@ -102,21 +91,16 @@ def download_image(
Throws a FileExistsError if you try to overwrite an existing file.
"""
- ssl_context = ssl.create_default_context(cafile=certifi.where())
-
req = build_request(url, params=params, headers=headers)
with urllib.request.urlopen(req, context=ssl_context) as resp:
- img_data = resp.read()
- assert isinstance(img_data, bytes), type(img_data)
+ image_data: bytes = resp.read()
- img_format = _guess_image_format(content_type=resp.headers["content-type"])
-
- out_path = out_prefix.with_suffix("." + img_format)
+ image_format = choose_filename_extension(content_type=resp.headers["content-type"])
+ out_path = out_prefix.with_suffix("." + image_format)
out_path.parent.mkdir(exist_ok=True, parents=True)
-
with open(out_path, "xb") as out_file:
- out_file.write(img_data)
+ out_file.write(image_data)
return out_path