Skip to content

Instantly share code, notes, and snippets.

@cellularmitosis
Last active February 28, 2025 03:31
Show Gist options
  • Save cellularmitosis/bf44aac9314fdf3d9bf2cd5aa0b07ded to your computer and use it in GitHub Desktop.
Save cellularmitosis/bf44aac9314fdf3d9bf2cd5aa0b07ded to your computer and use it in GitHub Desktop.
dog-api.py

Dog API

This is a single-file, zero-dependency Python script which implements a paginated list/detail REST API suitable for practicing mobile development.

TL;DR it serves pictures of doggo's.

On first run, the script will:

The script then randomly assigns a name to each dog photo and serves paginated JSON.

Installation

No installation, no dependencies, no virtual environments. Just download the script and run it.

$ chmod +x dog-api.py
$ ./dog-api.py

Endpoints

/dogs is static while /feed simulates new posts being periodically added.

$ curl localhost:8080
The Dog API.

GET /
  This endpoint.

GET /200
  A heartbeat endpoint.

GET /dogs
GET /dogs?start=:start&limit=:limit
  A paginated list of dogs.

GET /dogs/all
  The complete list dogs.

GET /dogs/random
  A random dog.

GET /images/:breed/:jpg
  A dog image.

GET /images/random
  A random dog image.

GET /names
  All dog names.

GET /names/random
  A random dog name.

GET /feed
GET /feed?start=:start&limit=:limit
GET /feed?start=:start&limit=:limit&period=:seconds
  A simulated news feed with a new post once per :period.
  Default :period is 1 second.

Example response:

$ curl localhost:8080/dogs?limit=1
{
  "dogs": [
    {
      "breed": "Bedlington Terrier",
      "id": 0,
      "image": "/images/n02093647-Bedlington_terrier/n02093647_1156.jpg",
      "name": "Chunk"
    }
  ],
  "pagination": {
    "next": "/dogs?start=1&limit=1"
  }
}

License

MIT

#!/usr/bin/env python3
# A simple dog picture API suitable for mobile development practice.
# Copyright 2025 Jason Pepas
# MIT Licensed, see https://opensource.org/license/mit
import os
import sys
import json
from gzip import GzipFile
from io import BytesIO
import mimetypes
import datetime
import socket
import http.server
import random
import urllib
import urllib.request
import re
# Download the dog names.
def fetch_names():
sys.stderr.write("Downloading dog names.\n")
if not os.path.exists("names.txt"):
# url="https://www.walkervillevet.com.au/pet-care-advice/dogs/puppy-names/"
url="https://web.archive.org/web/20241024133422/https://www.walkervillevet.com.au/pet-care-advice/dogs/puppy-names/"
with urllib.request.urlopen(url) as response:
html = response.read().decode()
names = re.findall(r"<td>([a-zA-Z-\s]+?)</td>", html)
with open("names.txt", "w") as fd:
for name in names:
fd.write(name + "\n")
# Load the list of dog names.
def load_names():
names = []
sys.stderr.write("Loading names.txt.\n")
with open("names.txt", "r") as fd:
names = sorted([line.rstrip() for line in fd.readlines()])
return names
# Download the dog images.
def fetch_images():
sys.stderr.write("Downloading dog images.\n")
# url = "http://vision.stanford.edu/aditya86/ImageNetDogs/images.tar"
url = "https://web.archive.org/web/20240121212154/http://vision.stanford.edu/aditya86/ImageNetDogs/images.tar"
cmd = '/bin/bash -e -o pipefail -c "curl -fLsS %s | tar xv"' % url
ret = os.system(cmd)
if ret != 0:
raise Exception("Failed to fetch images.")
# Scan the disk for dog images, downloading them first if needed.
def load_images(names, ip, port):
# Parse the breed from the dirname.
# e.g. 'n02098105-soft-coated_wheaten_terrier' -> "Soft-coated Wheaten Terrier"
def parse_breed_name(dirname):
# example dirname: "n02098105-soft-coated_wheaten_terrier"
slug = dirname.split('-', maxsplit=1)[1] # "soft-coated_wheaten_terrier"
words = [word.capitalize() for word in slug.split('_')] # ["Soft-coated", "Wheaten, "Terrier"]
breed = " ".join(words) # "Soft-coated Wheaten Terrier"
return breed
# Find all of the images.
sys.stderr.write("Scanning ./Images.\n")
image_paths = []
dogs = []
for dir in os.listdir("Images"):
if os.path.isdir("Images/%s" % dir):
breed = parse_breed_name(dir)
images = [f for f in os.listdir("Images/%s" % dir) if f.endswith(".jpg")]
for image in images:
relpath = "%s/%s" % (dir, image)
image_paths.append(relpath)
name = random.choice(names)
dog = {
"breed": breed,
"imageUrl": "http://%s:%s/images/%s" % (ip, port, relpath),
"name": name
}
dogs.append(dog)
random.shuffle(dogs)
dogs_by_id = []
id = 0
for dog in dogs:
dog["id"] = id
dogs_by_id.append(dog)
id += 1
return (image_paths, dogs_by_id)
# Given '/foo?bar=42, return ['/foo', {'bar':42}]
# The path_query param is typically handler.path.
def parse_GET_path(path_query):
if '?' not in path_query:
path_part = path_query
query_dict = {}
else:
path_part, query_part = path_query.split('?')
# Thanks to https://wsgi.tutorial.codepoint.net/parsing-the-request-get
# Note: parse_qs will return an array for each item, because the user might
# have set a value more than once in the query string. We'll go with the
# last value of each array.
query_dict = {}
for k, v in urllib.parse.parse_qs(query_part).items():
query_dict[k] = v[-1]
# drop any superfluous trailing slashes
while path_part[-1] == '/' and len(path_part) > 1:
path_part = path_part[:-1]
return [path_part, query_dict]
# Compress data using gzip.
def gzip(data):
if isinstance(data, str):
data = data.encode()
buffer = BytesIO()
with GzipFile(fileobj=buffer, mode="wb") as fd:
fd.write(data)
gzdata = buffer.getvalue()
return gzdata
# Will the client accept gzip encoding?
def accepts_gzip(handler):
header = handler.headers.get('Accept-Encoding')
if header is not None and "gzip" in header.split(','):
return True
return False
# Send a Cache-Control header.
def send_cache_control(handler, max_age):
handler.send_header('Cache-Control', 'public, max-age=%s' % int(max_age))
# Send a response as a Content-Type.
def send_content(handler, code, body, content_type, max_age=None):
handler.send_response(code)
if isinstance(body, str):
data = body.encode()
else:
data = body
if accepts_gzip(handler):
data = gzip(data)
handler.send_header('Content-Encoding', 'gzip')
handler.send_header('Vary', 'Accept-Encoding')
handler.send_header('Content-Type', content_type)
handler.send_header('Content-Length', len(data))
if max_age is not None:
send_cache_control(handler, max_age)
handler.end_headers()
handler.wfile.write(data)
# Send text/plain.
def send_text(handler, code, body, max_age=None):
send_content(handler, code, body, 'text/plain; charset=UTF-8', max_age)
# Send application/json.
def send_json(handler, code, body, max_age=None):
send_content(handler, code, body, 'application/json', max_age)
# Encode a Python object as JSON and send it.
def send_obj_as_json(handler, code, obj, max_age=None):
body = json.dumps(obj, sort_keys=True, indent=2)
send_json(handler, code, body, max_age)
# Guess the content type.
def get_content_type(fpath):
ext = os.path.splitext(fpath)[1].lower()
# hard-coded shorcuts for a few common filetypes:
if ext == '.jpg' or ext == '.jpeg':
return 'image/jpeg'
elif ext == '.png':
return 'image/png'
elif ext == '.mp4':
return 'video/mp4'
else:
return mimetypes.guess_type(fpath)[0] or 'application/octet-stream'
# Send the contents of a file from disk.
# Note: 'Range' header not supported.
def send_file(handler, fpath, is_HEAD=False, content_type=None, max_age=None):
def send_whole_file(handler, fd, content_type, file_size):
content_length = file_size
handler.send_response(200) # 'OK'
handler.send_header('Content-Length', "%s" % content_length)
handler.send_header('Content-Type', content_type)
if max_age is not None:
send_cache_control(handler, max_age)
handler.end_headers()
if is_HEAD:
return
chunk_size = 256 * 1024
while True:
chunk = fd.read(chunk_size)
if not chunk:
break
handler.wfile.write(chunk)
if not os.path.exists(fpath):
send_text(handler, 404, "Not found")
return
if content_type is None:
content_type = get_content_type(fpath)
file_size = os.path.getsize(fpath)
fd = open(fpath, 'rb')
send_whole_file(handler, fd, content_type, file_size)
fd.close()
g_static_routes = {}
g_regex_routes = []
# Add a route using a fixed URL path.
def add_static_route(http_method, url_path, fn):
global g_static_routes
if url_path not in g_static_routes:
g_static_routes[url_path] = {}
g_static_routes[url_path][http_method] = fn
# Add a route using a regex-based URL path.
def add_regex_route(http_method, label, regex, fn):
global g_regex_routes
g_regex_routes.append([http_method, label, regex, fn])
# Find the function for a route.
def route(handler):
global g_static_routes
global g_regex_routes
url_path = handler.path.split('?')[0]
method = handler.command
fn = None
fn_dict = g_static_routes.get(url_path, None)
if fn_dict:
fn = fn_dict.get(method, None)
if fn:
sys.stderr.write("Using static route %s\n" % url_path)
return fn
for (method_i, label, regex, fn) in g_regex_routes:
if method_i != method:
continue
m = regex.match(url_path)
if m:
sys.stderr.write("Using regex route %s for %s\n" % (label, url_path))
return fn
return None
# The core of the webapp.
def handle_request(handler):
then = datetime.datetime.now()
fn = route(handler)
if fn is None:
send_text(handler, 404, "Not found")
else:
try:
fn(handler)
except BrokenPipeError:
pass # don't care
except ConnectionResetError:
pass # don't care
except Exception as e:
send_text(handler, 500, "Internal server error: %s" % e)
raise e
now = datetime.datetime.now()
elapsed = now - then
sys.stderr.write(" Elapsed: %0.3fms\n" % (elapsed.total_seconds() * 1000))
# OOP glue.
# Note: BaseHTTPRequestHandler has many properties available, e.g.:
# - client_address
# - requestline, request_version, command (e.g. 'GET'), path
# - headers (see https://docs.python.org/3/library/email.compat32-message.html#email.message.Message)
# - headers["Foo"] (default None), headers.get('Foo', "default"), headers.items()
# - header field name matching is case-insensitive
# - rfile (read the body from this fd)
# - wfile (write the respond to this fd)
# - server (the ThreadingHTTPServer instance, which has server_name server_port)
# - protocol_version (defaults to HTTP/1.0)
# see the full docs at https://docs.python.org/3/library/http.server.html
class Handler(http.server.BaseHTTPRequestHandler):
protocol_version = "HTTP/1.1"
def do_HEAD(self):
handle_request(self)
def do_GET(self):
handle_request(self)
def do_POST(self):
handle_request(self)
def do_PUT(self):
handle_request(self)
def do_PATCH(self):
handle_request(self)
def do_DELETE(self):
handle_request(self)
def version_string(self):
return "hmm"
# GET /
# this is the main landing page.
def GET_root(handler):
text = """The Dog API.
GET /
This endpoint.
GET /200
A heartbeat endpoint.
GET /dogs
GET /dogs?start=:start&limit=:limit
A paginated list of dogs.
GET /dogs/all
The complete list dogs.
GET /dogs/random
A random dog.
GET /images/:breed/:jpg
A dog image.
GET /images/random
A random dog image.
GET /names
All dog names.
GET /names/random
A random dog name.
GET /feed
GET /feed?start=:start&limit=:limit
GET /feed?start=:start&limit=:limit&period=:seconds
A simulated news feed with a new post once per :period.
Default :period is 1 second.
"""
send_text(handler, 200, text)
# GET /200
# A heartbeat endpoint.
def GET_200(handler):
send_text(handler, 200, "OK")
# GET /dogs
# GET /dogs?start=:start&limit=:limit
# Return a paginated list of dogs, e.g.:
# {
# "dogs": [
# {
# "id": 27,
# "breed": "Irish Setter",
# "imageUrl": "http://192.168.1.2/images/n02100877-Irish_setter/n02100877_5686.jpg",
# "name": "Zoey"
# }
# ],
# "pagination": {
# "next": "/dogs?start=100&limit=100"
# }
# }
def GET_dogs_paginated(handler):
global g_dogs_by_id
(url_path, query_dict) = parse_GET_path(handler.path)
start = int(query_dict.get('start', '0'))
limit = int(query_dict.get('limit', '100'))
slice = g_dogs_by_id[start:start+limit]
next_start = min(start+limit, len(g_dogs_by_id))
next_limit = min(limit, len(g_dogs_by_id)-next_start)
next_url = "/dogs?start=%s&limit=%s" % (next_start, next_limit)
d = {
"dogs": slice,
"pagination": { "next": next_url }
}
send_obj_as_json(handler, 200, d)
# GET /dogs/all
# Return all dogs.
def GET_all_dogs(handler):
global g_dogs_by_id
d = {
"dogs": g_dogs_by_id
}
send_obj_as_json(handler, 200, d)
# GET /dogs/random
# Return a random dog in a JSON response.
def GET_random_dog(handler):
global g_dogs_by_id
dog = random.choice(g_dogs_by_id)
d = {"dog": dog}
send_obj_as_json(handler, 200, d)
# GET /images/:breed/:jpg
# Return a dog image.
def GET_image(handler):
(url_path, query_dict) = parse_GET_path(handler.path)
(_, _, breed, jpg) = url_path.split('/')
if ".." in breed or ".." in jpg:
send_text(handler, 400, "Bad Request")
return
filepath = "./Images/%s/%s" % (breed, jpg)
send_file(handler, filepath)
# GET /images/random
# Return a dog image.
def GET_random_image(handler):
global g_image_paths
relpath = random.choice(g_image_paths)
filepath = "./Images/%s" % relpath
send_file(handler, filepath)
# GET /feed
# GET /feed?start=:start&limit=:limit
# GET /feed?start=:start&limit=:limit&period=:seconds
# A simulated feed which updates periodically.
# Once per period, a new post is added to the head of the feed.
def GET_feed_paginated(handler):
global g_dogs_by_id
global g_start_time
now = datetime.datetime.now()
elapsed = now - g_start_time
(url_path, query_dict) = parse_GET_path(handler.path)
period = int(query_dict.get('period', '1'))
start = int(query_dict.get('start', '0'))
limit = int(query_dict.get('limit', '100'))
number_of_updates = int(elapsed.total_seconds()) // period
head = (len(g_dogs_by_id) // 2) - number_of_updates
slice = g_dogs_by_id[head+start:head+start+limit]
next_start = min(head+start+limit, len(g_dogs_by_id))
next_limit = min(limit, len(g_dogs_by_id)-next_start)
next_url = "/dogs?start=%s&limit=%s" % (next_start, next_limit)
d = {
"dogs": slice,
"pagination": { "next": next_url }
}
send_obj_as_json(handler, 200, d)
# GET /names
# Return all dog names in a JSON response.
def GET_names(handler):
global g_names
d = {"names": g_names}
send_obj_as_json(handler, 200, d)
# GET /names/random
# Return a random dog name in a JSON response.
def GET_random_name(handler):
global g_names
name = random.choice(g_names)
d = {"name": name}
send_obj_as_json(handler, 200, d)
# get the IP address of the interface which is connected to the internet.
# courtesy of chatgpt.
def routable_ip(address):
if address is not None and len(address) > 0:
return address
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
except Exception:
ip = "127.0.0.1"
finally:
s.close()
return ip
if __name__ == "__main__":
g_start_time = datetime.datetime.now()
address = ''
ip = routable_ip(address)
port = 8080
if not os.path.exists("names.txt"):
fetch_names()
# g_names is a list of dog names.
g_names = load_names()
if not os.path.exists("Images"):
fetch_images()
# g_image_paths is a list of all relative image paths, e.g.:
# ['n02100877-Irish_setter/n02100877_5686.jpg',
# 'n02111129-Leonberg/n02111129_1447.jpg]
# g_dogs_by_id is a list of all named dogs.
# [
# {
# "id": 27,
# "breed": "Irish Setter",
# "image": "/Images/n02100877-Irish_setter/n02100877_5686.jpg",
# "name": "Zoey"
# }
# ]
(g_image_paths, g_dogs_by_id) = load_images(g_names, ip, port)
add_static_route('GET', '/', GET_root)
add_static_route('GET', '/200', GET_200)
add_static_route('GET', '/dogs', GET_dogs_paginated)
add_static_route('GET', '/dogs/all', GET_all_dogs)
add_static_route('GET', '/dogs/random', GET_random_dog)
add_regex_route(
'GET',
'/images/:breed/:jpg',
re.compile(r'/images/[a-zA-Z0-9-_]+/[a-zA-Z0-9_]+.jpg'),
GET_image
)
add_static_route('GET', '/images/random', GET_random_image)
add_static_route('GET', '/feed', GET_feed_paginated)
add_static_route('GET', '/names', GET_names)
add_static_route('GET', '/names/random', GET_random_name)
address = ''
server = http.server.ThreadingHTTPServer((address, port), Handler)
sys.stderr.write("Listening on %s:%s\n" % (address, port))
server.serve_forever()
@cellularmitosis
Copy link
Author

CHANGELOG:

  • 2025/2/27: detect routable IP and serve absolute URLs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment