Skip to content

Instantly share code, notes, and snippets.

@sarjsheff
Created March 16, 2025 11:12
Show Gist options
  • Save sarjsheff/4e963d152f5c951cb635b07903af5828 to your computer and use it in GitHub Desktop.
Save sarjsheff/4e963d152f5c951cb635b07903af5828 to your computer and use it in GitHub Desktop.
Docker Registry server on python3
from flask import Flask, request, Response, abort, jsonify, send_file, make_response
import os
import hashlib
import uuid
import logging, base64
import argparse
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
# Папка для хранения Docker-образов
STORAGE_DIR = "storage"
if not os.path.exists(STORAGE_DIR):
os.makedirs(STORAGE_DIR)
realm = "Registry Realm"
wwwauth = f'Basic realm="{realm}"'
auth_type = "basic"
readonly = False
users = {"admin": "password"}
def calculate_digest(data):
"""
Вычисляет SHA-256 хеш для данных.
"""
sha256 = hashlib.sha256()
sha256.update(data)
return f"sha256:{sha256.hexdigest()}"
def auth_wrapper(fn):
"""
checks user authorization
"""
def protected_view(*a, **ka):
if auth_type == "basic":
if "Authorization" in request.headers:
user, password = base64.b64decode(request.headers["Authorization"].split(" ")[1]).decode("ascii").split(":")
if user in users:
if users[user] == password:
return fn(*a, **ka)
err_response = make_response("Docker Registry API v2", 401)
err_response.headers["WWW-Authenticate"] = wwwauth
return err_response
else:
err_response = make_response("Docker Registry API v2", 401)
err_response.headers["WWW-Authenticate"] = wwwauth
return err_response
else:
return fn(*a, **ka)
return protected_view
@app.route("/v2/", methods=["GET"], endpoint="v2_endpoint")
@auth_wrapper
def v2_endpoint():
"""
Эндпоинт для проверки доступности Registry.
"""
return Response(response="{}", status=200)
@app.route("/v2/<repository>/manifests/<tag>", methods=["PUT"], endpoint="put_manifest")
@auth_wrapper
def put_manifest(repository, tag):
if readonly:
return Response(status=401, response="Readonly")
manifest_path = os.path.join(STORAGE_DIR, repository, "manifests")
os.makedirs(manifest_path, exist_ok=True)
manifest_file = os.path.join(manifest_path, f"{tag}.json")
manifest_data = request.data
with open(manifest_file, "wb") as f:
f.write(manifest_data)
manifest_digest = calculate_digest(manifest_data)
digest_file = os.path.join(manifest_path, f"{manifest_digest}.json")
with open(digest_file, "wb") as f:
f.write(manifest_data)
logging.info(f"Stored manifest with digest: {manifest_digest}")
return Response(status=201, headers={"Docker-Content-Digest": manifest_digest})
@app.route("/v2/<repository>/manifests/<identifier>", methods=["GET"])
def get_manifest(repository, identifier):
manifest_path = os.path.join(STORAGE_DIR, repository, "manifests")
# Проверяем, запрашивается ли манифест по `tag`
manifest_file = os.path.join(manifest_path, f"{identifier}.json")
if not os.path.exists(manifest_file):
# Если файла нет, возможно, запрашивается по `digest`
manifest_file = os.path.join(manifest_path, f"sha256:{identifier}.json")
if not os.path.exists(manifest_file):
abort(404, description="Manifest not found")
with open(manifest_file, "rb") as f:
manifest_data = f.read()
manifest_digest = calculate_digest(manifest_data)
return Response(manifest_data, mimetype="application/vnd.docker.distribution.manifest.v2+json", headers={"Docker-Content-Digest": manifest_digest})
@app.route("/v2/<repository>/blobs/uploads/", methods=["POST"], endpoint="start_upload")
@auth_wrapper
def start_upload(repository):
"""
Начало загрузки слоя Docker-образа.
"""
if readonly:
return Response(status=401, response="Readonly")
upload_id = str(uuid.uuid4())
upload_path = os.path.join(STORAGE_DIR, repository, "uploads", upload_id)
os.makedirs(upload_path, exist_ok=True)
return Response(
headers={
"Location": f"/v2/{repository}/blobs/uploads/{upload_id}",
"Range": "0-0",
"Docker-Upload-UUID": upload_id,
},
status=202,
)
@app.route("/v2/<repository>/blobs/uploads/<upload_id>", methods=["PATCH"], endpoint="upload_blob")
@auth_wrapper
def upload_blob(repository, upload_id):
"""
Загрузка данных слоя Docker-образа.
"""
if readonly:
return Response(status=401, response="Readonly")
upload_path = os.path.join(STORAGE_DIR, repository, "uploads", upload_id, "data")
os.makedirs(os.path.dirname(upload_path), exist_ok=True)
# Записываем данные в файл
with open(upload_path, "ab") as f:
f.write(request.data)
# Возвращаем текущий прогресс загрузки
file_size = os.path.getsize(upload_path)
return Response(
headers={
"Location": f"/v2/{repository}/blobs/uploads/{upload_id}",
"Range": f"0-{file_size - 1}",
},
status=202,
)
@app.route("/v2/<repository>/blobs/uploads/<upload_id>", methods=["PUT"], endpoint="complete_upload")
@auth_wrapper
def complete_upload(repository, upload_id):
if readonly:
return Response(status=401, response="Readonly")
digest = request.args.get("digest")
if not digest or not digest.startswith("sha256:"):
abort(400, description="Valid digest is required (sha256:<hash>)")
upload_path = os.path.join(STORAGE_DIR, repository, "uploads", upload_id, "data")
if not os.path.exists(upload_path):
abort(404, description="Upload not found")
with open(upload_path, "rb") as f:
blob_data = f.read()
calculated_digest = calculate_digest(blob_data)
logging.info(f"Expected digest: {digest}, Calculated digest: {calculated_digest}")
if calculated_digest != digest:
abort(400, description=f"Digest mismatch: expected {calculated_digest}, got {digest}")
blob_path = os.path.join(STORAGE_DIR, repository, "blobs", digest)
os.makedirs(os.path.dirname(blob_path), exist_ok=True)
with open(blob_path, "wb") as f:
f.write(blob_data)
os.remove(upload_path)
os.rmdir(os.path.dirname(upload_path))
return Response(
headers={"Location": f"/v2/{repository}/blobs/{digest}", "Docker-Content-Digest": digest},
status=201,
)
@app.route("/v2/<repository>/blobs/<digest>", methods=["HEAD"])
def check_blob(repository, digest):
"""
Проверка наличия слоя (blob) в Registry.
"""
blob_path = os.path.join(STORAGE_DIR, repository, "blobs", digest)
if not os.path.exists(blob_path):
abort(404, description="Blob not found")
# Возвращаем заголовки с размером слоя
blob_size = os.path.getsize(blob_path)
return Response(
headers={
"Content-Length": str(blob_size),
"Docker-Content-Digest": digest,
},
status=200,
)
if __name__ == "__main__":
# Настройка аргументов командной строки
parser = argparse.ArgumentParser(description="Docker registry server.")
parser.add_argument("--host", type=str, default="0.0.0.0", help="The host to bind to.")
parser.add_argument("--port", type=int, default=5001, help="The port to bind to.")
parser.add_argument("--no-ssl", action="store_true", help="Without ssl.")
parser.add_argument("--ssl-cert", type=str, default="certs/cert.pem", help="Path to SSL certificate.")
parser.add_argument("--ssl-key", type=str, default="certs/key.pem", help="Path to SSL key.")
parser.add_argument("--readonly", action="store_true", help="Run the registry in read-only mode.")
parser.add_argument("--auth-type", type=str, default="basic", choices=["basic", "none"], help="Authentication type.")
parser.add_argument("--storage-dir", type=str, default="storage", help="Directory to store Docker images.")
parser.add_argument("--adminpwd", type=str, default="password", help="Admin password")
args = parser.parse_args()
# Применяем настройки из командной строки
readonly = args.readonly
auth_type = args.auth_type
STORAGE_DIR = args.storage_dir
users = {"admin": args.adminpwd}
if not os.path.exists(STORAGE_DIR):
os.makedirs(STORAGE_DIR)
if args.no_ssl:
app.run(host=args.host, port=args.port)
else:
app.run(host=args.host, port=args.port, ssl_context=(args.ssl_cert, args.ssl_key))
#!/bin/sh
mkdir certs
openssl req -x509 -newkey rsa:4096 -nodes -out certs/cert.pem -keyout certs/key.pem -days 365 -subj "/CN=localhost"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment