|
#!/usr/bin/env python3 |
|
|
|
# A simple dog picture API suitable for mobile development practice. |
|
|
|
# Copyright 2025 Jason Pepas |
|
# MIT Licensed, see https://opensource.org/license/mit |
|
|
|
import os |
|
import sys |
|
import json |
|
from gzip import GzipFile |
|
from io import BytesIO |
|
import mimetypes |
|
import datetime |
|
import socket |
|
import http.server |
|
import random |
|
import urllib |
|
import urllib.request |
|
import re |
|
|
|
|
|
# Download the dog names. |
|
def fetch_names(): |
|
sys.stderr.write("Downloading dog names.\n") |
|
if not os.path.exists("names.txt"): |
|
# url="https://www.walkervillevet.com.au/pet-care-advice/dogs/puppy-names/" |
|
url="https://web.archive.org/web/20241024133422/https://www.walkervillevet.com.au/pet-care-advice/dogs/puppy-names/" |
|
with urllib.request.urlopen(url) as response: |
|
html = response.read().decode() |
|
names = re.findall(r"<td>([a-zA-Z-\s]+?)</td>", html) |
|
with open("names.txt", "w") as fd: |
|
for name in names: |
|
fd.write(name + "\n") |
|
|
|
|
|
# Load the list of dog names. |
|
def load_names(): |
|
names = [] |
|
sys.stderr.write("Loading names.txt.\n") |
|
with open("names.txt", "r") as fd: |
|
names = sorted([line.rstrip() for line in fd.readlines()]) |
|
return names |
|
|
|
|
|
# Download the dog images. |
|
def fetch_images(): |
|
sys.stderr.write("Downloading dog images.\n") |
|
# url = "http://vision.stanford.edu/aditya86/ImageNetDogs/images.tar" |
|
url = "https://web.archive.org/web/20240121212154/http://vision.stanford.edu/aditya86/ImageNetDogs/images.tar" |
|
cmd = '/bin/bash -e -o pipefail -c "curl -fLsS %s | tar xv"' % url |
|
ret = os.system(cmd) |
|
if ret != 0: |
|
raise Exception("Failed to fetch images.") |
|
|
|
|
|
# Scan the disk for dog images, downloading them first if needed. |
|
def load_images(names, ip, port): |
|
# Parse the breed from the dirname. |
|
# e.g. 'n02098105-soft-coated_wheaten_terrier' -> "Soft-coated Wheaten Terrier" |
|
def parse_breed_name(dirname): |
|
# example dirname: "n02098105-soft-coated_wheaten_terrier" |
|
slug = dirname.split('-', maxsplit=1)[1] # "soft-coated_wheaten_terrier" |
|
words = [word.capitalize() for word in slug.split('_')] # ["Soft-coated", "Wheaten, "Terrier"] |
|
breed = " ".join(words) # "Soft-coated Wheaten Terrier" |
|
return breed |
|
|
|
# Find all of the images. |
|
sys.stderr.write("Scanning ./Images.\n") |
|
image_paths = [] |
|
dogs = [] |
|
for dir in os.listdir("Images"): |
|
if os.path.isdir("Images/%s" % dir): |
|
breed = parse_breed_name(dir) |
|
images = [f for f in os.listdir("Images/%s" % dir) if f.endswith(".jpg")] |
|
for image in images: |
|
relpath = "%s/%s" % (dir, image) |
|
image_paths.append(relpath) |
|
name = random.choice(names) |
|
dog = { |
|
"breed": breed, |
|
"imageUrl": "http://%s:%s/images/%s" % (ip, port, relpath), |
|
"name": name |
|
} |
|
dogs.append(dog) |
|
random.shuffle(dogs) |
|
dogs_by_id = [] |
|
id = 0 |
|
for dog in dogs: |
|
dog["id"] = id |
|
dogs_by_id.append(dog) |
|
id += 1 |
|
return (image_paths, dogs_by_id) |
|
|
|
|
|
# Given '/foo?bar=42, return ['/foo', {'bar':42}] |
|
# The path_query param is typically handler.path. |
|
def parse_GET_path(path_query): |
|
if '?' not in path_query: |
|
path_part = path_query |
|
query_dict = {} |
|
else: |
|
path_part, query_part = path_query.split('?') |
|
# Thanks to https://wsgi.tutorial.codepoint.net/parsing-the-request-get |
|
# Note: parse_qs will return an array for each item, because the user might |
|
# have set a value more than once in the query string. We'll go with the |
|
# last value of each array. |
|
query_dict = {} |
|
for k, v in urllib.parse.parse_qs(query_part).items(): |
|
query_dict[k] = v[-1] |
|
# drop any superfluous trailing slashes |
|
while path_part[-1] == '/' and len(path_part) > 1: |
|
path_part = path_part[:-1] |
|
return [path_part, query_dict] |
|
|
|
|
|
# Compress data using gzip. |
|
def gzip(data): |
|
if isinstance(data, str): |
|
data = data.encode() |
|
buffer = BytesIO() |
|
with GzipFile(fileobj=buffer, mode="wb") as fd: |
|
fd.write(data) |
|
gzdata = buffer.getvalue() |
|
return gzdata |
|
|
|
|
|
# Will the client accept gzip encoding? |
|
def accepts_gzip(handler): |
|
header = handler.headers.get('Accept-Encoding') |
|
if header is not None and "gzip" in header.split(','): |
|
return True |
|
return False |
|
|
|
|
|
# Send a Cache-Control header. |
|
def send_cache_control(handler, max_age): |
|
handler.send_header('Cache-Control', 'public, max-age=%s' % int(max_age)) |
|
|
|
|
|
# Send a response as a Content-Type. |
|
def send_content(handler, code, body, content_type, max_age=None): |
|
handler.send_response(code) |
|
if isinstance(body, str): |
|
data = body.encode() |
|
else: |
|
data = body |
|
if accepts_gzip(handler): |
|
data = gzip(data) |
|
handler.send_header('Content-Encoding', 'gzip') |
|
handler.send_header('Vary', 'Accept-Encoding') |
|
handler.send_header('Content-Type', content_type) |
|
handler.send_header('Content-Length', len(data)) |
|
if max_age is not None: |
|
send_cache_control(handler, max_age) |
|
handler.end_headers() |
|
handler.wfile.write(data) |
|
|
|
|
|
# Send text/plain. |
|
def send_text(handler, code, body, max_age=None): |
|
send_content(handler, code, body, 'text/plain; charset=UTF-8', max_age) |
|
|
|
|
|
# Send application/json. |
|
def send_json(handler, code, body, max_age=None): |
|
send_content(handler, code, body, 'application/json', max_age) |
|
|
|
|
|
# Encode a Python object as JSON and send it. |
|
def send_obj_as_json(handler, code, obj, max_age=None): |
|
body = json.dumps(obj, sort_keys=True, indent=2) |
|
send_json(handler, code, body, max_age) |
|
|
|
|
|
# Guess the content type. |
|
def get_content_type(fpath): |
|
ext = os.path.splitext(fpath)[1].lower() |
|
# hard-coded shorcuts for a few common filetypes: |
|
if ext == '.jpg' or ext == '.jpeg': |
|
return 'image/jpeg' |
|
elif ext == '.png': |
|
return 'image/png' |
|
elif ext == '.mp4': |
|
return 'video/mp4' |
|
else: |
|
return mimetypes.guess_type(fpath)[0] or 'application/octet-stream' |
|
|
|
|
|
# Send the contents of a file from disk. |
|
# Note: 'Range' header not supported. |
|
def send_file(handler, fpath, is_HEAD=False, content_type=None, max_age=None): |
|
def send_whole_file(handler, fd, content_type, file_size): |
|
content_length = file_size |
|
handler.send_response(200) # 'OK' |
|
handler.send_header('Content-Length', "%s" % content_length) |
|
handler.send_header('Content-Type', content_type) |
|
if max_age is not None: |
|
send_cache_control(handler, max_age) |
|
handler.end_headers() |
|
if is_HEAD: |
|
return |
|
chunk_size = 256 * 1024 |
|
while True: |
|
chunk = fd.read(chunk_size) |
|
if not chunk: |
|
break |
|
handler.wfile.write(chunk) |
|
|
|
if not os.path.exists(fpath): |
|
send_text(handler, 404, "Not found") |
|
return |
|
if content_type is None: |
|
content_type = get_content_type(fpath) |
|
file_size = os.path.getsize(fpath) |
|
fd = open(fpath, 'rb') |
|
send_whole_file(handler, fd, content_type, file_size) |
|
fd.close() |
|
|
|
|
|
g_static_routes = {} |
|
g_regex_routes = [] |
|
|
|
|
|
# Add a route using a fixed URL path. |
|
def add_static_route(http_method, url_path, fn): |
|
global g_static_routes |
|
if url_path not in g_static_routes: |
|
g_static_routes[url_path] = {} |
|
g_static_routes[url_path][http_method] = fn |
|
|
|
|
|
# Add a route using a regex-based URL path. |
|
def add_regex_route(http_method, label, regex, fn): |
|
global g_regex_routes |
|
g_regex_routes.append([http_method, label, regex, fn]) |
|
|
|
|
|
# Find the function for a route. |
|
def route(handler): |
|
global g_static_routes |
|
global g_regex_routes |
|
url_path = handler.path.split('?')[0] |
|
method = handler.command |
|
fn = None |
|
fn_dict = g_static_routes.get(url_path, None) |
|
if fn_dict: |
|
fn = fn_dict.get(method, None) |
|
if fn: |
|
sys.stderr.write("Using static route %s\n" % url_path) |
|
return fn |
|
for (method_i, label, regex, fn) in g_regex_routes: |
|
if method_i != method: |
|
continue |
|
m = regex.match(url_path) |
|
if m: |
|
sys.stderr.write("Using regex route %s for %s\n" % (label, url_path)) |
|
return fn |
|
return None |
|
|
|
|
|
# The core of the webapp. |
|
def handle_request(handler): |
|
then = datetime.datetime.now() |
|
fn = route(handler) |
|
if fn is None: |
|
send_text(handler, 404, "Not found") |
|
else: |
|
try: |
|
fn(handler) |
|
except BrokenPipeError: |
|
pass # don't care |
|
except ConnectionResetError: |
|
pass # don't care |
|
except Exception as e: |
|
send_text(handler, 500, "Internal server error: %s" % e) |
|
raise e |
|
now = datetime.datetime.now() |
|
elapsed = now - then |
|
sys.stderr.write(" Elapsed: %0.3fms\n" % (elapsed.total_seconds() * 1000)) |
|
|
|
|
|
# OOP glue. |
|
# Note: BaseHTTPRequestHandler has many properties available, e.g.: |
|
# - client_address |
|
# - requestline, request_version, command (e.g. 'GET'), path |
|
# - headers (see https://docs.python.org/3/library/email.compat32-message.html#email.message.Message) |
|
# - headers["Foo"] (default None), headers.get('Foo', "default"), headers.items() |
|
# - header field name matching is case-insensitive |
|
# - rfile (read the body from this fd) |
|
# - wfile (write the respond to this fd) |
|
# - server (the ThreadingHTTPServer instance, which has server_name server_port) |
|
# - protocol_version (defaults to HTTP/1.0) |
|
# see the full docs at https://docs.python.org/3/library/http.server.html |
|
class Handler(http.server.BaseHTTPRequestHandler): |
|
protocol_version = "HTTP/1.1" |
|
def do_HEAD(self): |
|
handle_request(self) |
|
def do_GET(self): |
|
handle_request(self) |
|
def do_POST(self): |
|
handle_request(self) |
|
def do_PUT(self): |
|
handle_request(self) |
|
def do_PATCH(self): |
|
handle_request(self) |
|
def do_DELETE(self): |
|
handle_request(self) |
|
def version_string(self): |
|
return "hmm" |
|
|
|
|
|
# GET / |
|
# this is the main landing page. |
|
def GET_root(handler): |
|
text = """The Dog API. |
|
|
|
GET / |
|
This endpoint. |
|
|
|
GET /200 |
|
A heartbeat endpoint. |
|
|
|
GET /dogs |
|
GET /dogs?start=:start&limit=:limit |
|
A paginated list of dogs. |
|
|
|
GET /dogs/all |
|
The complete list dogs. |
|
|
|
GET /dogs/random |
|
A random dog. |
|
|
|
GET /images/:breed/:jpg |
|
A dog image. |
|
|
|
GET /images/random |
|
A random dog image. |
|
|
|
GET /names |
|
All dog names. |
|
|
|
GET /names/random |
|
A random dog name. |
|
|
|
GET /feed |
|
GET /feed?start=:start&limit=:limit |
|
GET /feed?start=:start&limit=:limit&period=:seconds |
|
A simulated news feed with a new post once per :period. |
|
Default :period is 1 second. |
|
""" |
|
send_text(handler, 200, text) |
|
|
|
|
|
# GET /200 |
|
# A heartbeat endpoint. |
|
def GET_200(handler): |
|
send_text(handler, 200, "OK") |
|
|
|
|
|
# GET /dogs |
|
# GET /dogs?start=:start&limit=:limit |
|
# Return a paginated list of dogs, e.g.: |
|
# { |
|
# "dogs": [ |
|
# { |
|
# "id": 27, |
|
# "breed": "Irish Setter", |
|
# "imageUrl": "http://192.168.1.2/images/n02100877-Irish_setter/n02100877_5686.jpg", |
|
# "name": "Zoey" |
|
# } |
|
# ], |
|
# "pagination": { |
|
# "next": "/dogs?start=100&limit=100" |
|
# } |
|
# } |
|
def GET_dogs_paginated(handler): |
|
global g_dogs_by_id |
|
(url_path, query_dict) = parse_GET_path(handler.path) |
|
start = int(query_dict.get('start', '0')) |
|
limit = int(query_dict.get('limit', '100')) |
|
slice = g_dogs_by_id[start:start+limit] |
|
next_start = min(start+limit, len(g_dogs_by_id)) |
|
next_limit = min(limit, len(g_dogs_by_id)-next_start) |
|
next_url = "/dogs?start=%s&limit=%s" % (next_start, next_limit) |
|
d = { |
|
"dogs": slice, |
|
"pagination": { "next": next_url } |
|
} |
|
send_obj_as_json(handler, 200, d) |
|
|
|
|
|
# GET /dogs/all |
|
# Return all dogs. |
|
def GET_all_dogs(handler): |
|
global g_dogs_by_id |
|
d = { |
|
"dogs": g_dogs_by_id |
|
} |
|
send_obj_as_json(handler, 200, d) |
|
|
|
|
|
# GET /dogs/random |
|
# Return a random dog in a JSON response. |
|
def GET_random_dog(handler): |
|
global g_dogs_by_id |
|
dog = random.choice(g_dogs_by_id) |
|
d = {"dog": dog} |
|
send_obj_as_json(handler, 200, d) |
|
|
|
|
|
# GET /images/:breed/:jpg |
|
# Return a dog image. |
|
def GET_image(handler): |
|
(url_path, query_dict) = parse_GET_path(handler.path) |
|
(_, _, breed, jpg) = url_path.split('/') |
|
if ".." in breed or ".." in jpg: |
|
send_text(handler, 400, "Bad Request") |
|
return |
|
filepath = "./Images/%s/%s" % (breed, jpg) |
|
send_file(handler, filepath) |
|
|
|
|
|
# GET /images/random |
|
# Return a dog image. |
|
def GET_random_image(handler): |
|
global g_image_paths |
|
relpath = random.choice(g_image_paths) |
|
filepath = "./Images/%s" % relpath |
|
send_file(handler, filepath) |
|
|
|
|
|
# GET /feed |
|
# GET /feed?start=:start&limit=:limit |
|
# GET /feed?start=:start&limit=:limit&period=:seconds |
|
# A simulated feed which updates periodically. |
|
# Once per period, a new post is added to the head of the feed. |
|
def GET_feed_paginated(handler): |
|
global g_dogs_by_id |
|
global g_start_time |
|
now = datetime.datetime.now() |
|
elapsed = now - g_start_time |
|
(url_path, query_dict) = parse_GET_path(handler.path) |
|
period = int(query_dict.get('period', '1')) |
|
start = int(query_dict.get('start', '0')) |
|
limit = int(query_dict.get('limit', '100')) |
|
number_of_updates = int(elapsed.total_seconds()) // period |
|
head = (len(g_dogs_by_id) // 2) - number_of_updates |
|
slice = g_dogs_by_id[head+start:head+start+limit] |
|
next_start = min(head+start+limit, len(g_dogs_by_id)) |
|
next_limit = min(limit, len(g_dogs_by_id)-next_start) |
|
next_url = "/dogs?start=%s&limit=%s" % (next_start, next_limit) |
|
d = { |
|
"dogs": slice, |
|
"pagination": { "next": next_url } |
|
} |
|
send_obj_as_json(handler, 200, d) |
|
|
|
|
|
# GET /names |
|
# Return all dog names in a JSON response. |
|
def GET_names(handler): |
|
global g_names |
|
d = {"names": g_names} |
|
send_obj_as_json(handler, 200, d) |
|
|
|
|
|
# GET /names/random |
|
# Return a random dog name in a JSON response. |
|
def GET_random_name(handler): |
|
global g_names |
|
name = random.choice(g_names) |
|
d = {"name": name} |
|
send_obj_as_json(handler, 200, d) |
|
|
|
|
|
# get the IP address of the interface which is connected to the internet. |
|
# courtesy of chatgpt. |
|
def routable_ip(address): |
|
if address is not None and len(address) > 0: |
|
return address |
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
|
try: |
|
s.connect(("8.8.8.8", 80)) |
|
ip = s.getsockname()[0] |
|
except Exception: |
|
ip = "127.0.0.1" |
|
finally: |
|
s.close() |
|
return ip |
|
|
|
|
|
if __name__ == "__main__": |
|
g_start_time = datetime.datetime.now() |
|
|
|
address = '' |
|
ip = routable_ip(address) |
|
port = 8080 |
|
|
|
if not os.path.exists("names.txt"): |
|
fetch_names() |
|
|
|
# g_names is a list of dog names. |
|
g_names = load_names() |
|
|
|
if not os.path.exists("Images"): |
|
fetch_images() |
|
|
|
# g_image_paths is a list of all relative image paths, e.g.: |
|
# ['n02100877-Irish_setter/n02100877_5686.jpg', |
|
# 'n02111129-Leonberg/n02111129_1447.jpg] |
|
# g_dogs_by_id is a list of all named dogs. |
|
# [ |
|
# { |
|
# "id": 27, |
|
# "breed": "Irish Setter", |
|
# "image": "/Images/n02100877-Irish_setter/n02100877_5686.jpg", |
|
# "name": "Zoey" |
|
# } |
|
# ] |
|
(g_image_paths, g_dogs_by_id) = load_images(g_names, ip, port) |
|
|
|
add_static_route('GET', '/', GET_root) |
|
add_static_route('GET', '/200', GET_200) |
|
add_static_route('GET', '/dogs', GET_dogs_paginated) |
|
add_static_route('GET', '/dogs/all', GET_all_dogs) |
|
add_static_route('GET', '/dogs/random', GET_random_dog) |
|
add_regex_route( |
|
'GET', |
|
'/images/:breed/:jpg', |
|
re.compile(r'/images/[a-zA-Z0-9-_]+/[a-zA-Z0-9_]+.jpg'), |
|
GET_image |
|
) |
|
add_static_route('GET', '/images/random', GET_random_image) |
|
add_static_route('GET', '/feed', GET_feed_paginated) |
|
add_static_route('GET', '/names', GET_names) |
|
add_static_route('GET', '/names/random', GET_random_name) |
|
|
|
address = '' |
|
server = http.server.ThreadingHTTPServer((address, port), Handler) |
|
sys.stderr.write("Listening on %s:%s\n" % (address, port)) |
|
server.serve_forever() |
CHANGELOG: