Created
March 16, 2025 11:12
-
-
Save sarjsheff/4e963d152f5c951cb635b07903af5828 to your computer and use it in GitHub Desktop.
Docker Registry server on python3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from flask import Flask, request, Response, abort, jsonify, send_file, make_response | |
import os | |
import hashlib | |
import uuid | |
import logging, base64 | |
import argparse | |
app = Flask(__name__) | |
logging.basicConfig(level=logging.INFO) | |
# Папка для хранения Docker-образов | |
STORAGE_DIR = "storage" | |
if not os.path.exists(STORAGE_DIR): | |
os.makedirs(STORAGE_DIR) | |
realm = "Registry Realm" | |
wwwauth = f'Basic realm="{realm}"' | |
auth_type = "basic" | |
readonly = False | |
users = {"admin": "password"} | |
def calculate_digest(data): | |
""" | |
Вычисляет SHA-256 хеш для данных. | |
""" | |
sha256 = hashlib.sha256() | |
sha256.update(data) | |
return f"sha256:{sha256.hexdigest()}" | |
def auth_wrapper(fn): | |
""" | |
checks user authorization | |
""" | |
def protected_view(*a, **ka): | |
if auth_type == "basic": | |
if "Authorization" in request.headers: | |
user, password = base64.b64decode(request.headers["Authorization"].split(" ")[1]).decode("ascii").split(":") | |
if user in users: | |
if users[user] == password: | |
return fn(*a, **ka) | |
err_response = make_response("Docker Registry API v2", 401) | |
err_response.headers["WWW-Authenticate"] = wwwauth | |
return err_response | |
else: | |
err_response = make_response("Docker Registry API v2", 401) | |
err_response.headers["WWW-Authenticate"] = wwwauth | |
return err_response | |
else: | |
return fn(*a, **ka) | |
return protected_view | |
@app.route("/v2/", methods=["GET"], endpoint="v2_endpoint") | |
@auth_wrapper | |
def v2_endpoint(): | |
""" | |
Эндпоинт для проверки доступности Registry. | |
""" | |
return Response(response="{}", status=200) | |
@app.route("/v2/<repository>/manifests/<tag>", methods=["PUT"], endpoint="put_manifest") | |
@auth_wrapper | |
def put_manifest(repository, tag): | |
if readonly: | |
return Response(status=401, response="Readonly") | |
manifest_path = os.path.join(STORAGE_DIR, repository, "manifests") | |
os.makedirs(manifest_path, exist_ok=True) | |
manifest_file = os.path.join(manifest_path, f"{tag}.json") | |
manifest_data = request.data | |
with open(manifest_file, "wb") as f: | |
f.write(manifest_data) | |
manifest_digest = calculate_digest(manifest_data) | |
digest_file = os.path.join(manifest_path, f"{manifest_digest}.json") | |
with open(digest_file, "wb") as f: | |
f.write(manifest_data) | |
logging.info(f"Stored manifest with digest: {manifest_digest}") | |
return Response(status=201, headers={"Docker-Content-Digest": manifest_digest}) | |
@app.route("/v2/<repository>/manifests/<identifier>", methods=["GET"]) | |
def get_manifest(repository, identifier): | |
manifest_path = os.path.join(STORAGE_DIR, repository, "manifests") | |
# Проверяем, запрашивается ли манифест по `tag` | |
manifest_file = os.path.join(manifest_path, f"{identifier}.json") | |
if not os.path.exists(manifest_file): | |
# Если файла нет, возможно, запрашивается по `digest` | |
manifest_file = os.path.join(manifest_path, f"sha256:{identifier}.json") | |
if not os.path.exists(manifest_file): | |
abort(404, description="Manifest not found") | |
with open(manifest_file, "rb") as f: | |
manifest_data = f.read() | |
manifest_digest = calculate_digest(manifest_data) | |
return Response(manifest_data, mimetype="application/vnd.docker.distribution.manifest.v2+json", headers={"Docker-Content-Digest": manifest_digest}) | |
@app.route("/v2/<repository>/blobs/uploads/", methods=["POST"], endpoint="start_upload") | |
@auth_wrapper | |
def start_upload(repository): | |
""" | |
Начало загрузки слоя Docker-образа. | |
""" | |
if readonly: | |
return Response(status=401, response="Readonly") | |
upload_id = str(uuid.uuid4()) | |
upload_path = os.path.join(STORAGE_DIR, repository, "uploads", upload_id) | |
os.makedirs(upload_path, exist_ok=True) | |
return Response( | |
headers={ | |
"Location": f"/v2/{repository}/blobs/uploads/{upload_id}", | |
"Range": "0-0", | |
"Docker-Upload-UUID": upload_id, | |
}, | |
status=202, | |
) | |
@app.route("/v2/<repository>/blobs/uploads/<upload_id>", methods=["PATCH"], endpoint="upload_blob") | |
@auth_wrapper | |
def upload_blob(repository, upload_id): | |
""" | |
Загрузка данных слоя Docker-образа. | |
""" | |
if readonly: | |
return Response(status=401, response="Readonly") | |
upload_path = os.path.join(STORAGE_DIR, repository, "uploads", upload_id, "data") | |
os.makedirs(os.path.dirname(upload_path), exist_ok=True) | |
# Записываем данные в файл | |
with open(upload_path, "ab") as f: | |
f.write(request.data) | |
# Возвращаем текущий прогресс загрузки | |
file_size = os.path.getsize(upload_path) | |
return Response( | |
headers={ | |
"Location": f"/v2/{repository}/blobs/uploads/{upload_id}", | |
"Range": f"0-{file_size - 1}", | |
}, | |
status=202, | |
) | |
@app.route("/v2/<repository>/blobs/uploads/<upload_id>", methods=["PUT"], endpoint="complete_upload") | |
@auth_wrapper | |
def complete_upload(repository, upload_id): | |
if readonly: | |
return Response(status=401, response="Readonly") | |
digest = request.args.get("digest") | |
if not digest or not digest.startswith("sha256:"): | |
abort(400, description="Valid digest is required (sha256:<hash>)") | |
upload_path = os.path.join(STORAGE_DIR, repository, "uploads", upload_id, "data") | |
if not os.path.exists(upload_path): | |
abort(404, description="Upload not found") | |
with open(upload_path, "rb") as f: | |
blob_data = f.read() | |
calculated_digest = calculate_digest(blob_data) | |
logging.info(f"Expected digest: {digest}, Calculated digest: {calculated_digest}") | |
if calculated_digest != digest: | |
abort(400, description=f"Digest mismatch: expected {calculated_digest}, got {digest}") | |
blob_path = os.path.join(STORAGE_DIR, repository, "blobs", digest) | |
os.makedirs(os.path.dirname(blob_path), exist_ok=True) | |
with open(blob_path, "wb") as f: | |
f.write(blob_data) | |
os.remove(upload_path) | |
os.rmdir(os.path.dirname(upload_path)) | |
return Response( | |
headers={"Location": f"/v2/{repository}/blobs/{digest}", "Docker-Content-Digest": digest}, | |
status=201, | |
) | |
@app.route("/v2/<repository>/blobs/<digest>", methods=["HEAD"]) | |
def check_blob(repository, digest): | |
""" | |
Проверка наличия слоя (blob) в Registry. | |
""" | |
blob_path = os.path.join(STORAGE_DIR, repository, "blobs", digest) | |
if not os.path.exists(blob_path): | |
abort(404, description="Blob not found") | |
# Возвращаем заголовки с размером слоя | |
blob_size = os.path.getsize(blob_path) | |
return Response( | |
headers={ | |
"Content-Length": str(blob_size), | |
"Docker-Content-Digest": digest, | |
}, | |
status=200, | |
) | |
if __name__ == "__main__": | |
# Настройка аргументов командной строки | |
parser = argparse.ArgumentParser(description="Docker registry server.") | |
parser.add_argument("--host", type=str, default="0.0.0.0", help="The host to bind to.") | |
parser.add_argument("--port", type=int, default=5001, help="The port to bind to.") | |
parser.add_argument("--no-ssl", action="store_true", help="Without ssl.") | |
parser.add_argument("--ssl-cert", type=str, default="certs/cert.pem", help="Path to SSL certificate.") | |
parser.add_argument("--ssl-key", type=str, default="certs/key.pem", help="Path to SSL key.") | |
parser.add_argument("--readonly", action="store_true", help="Run the registry in read-only mode.") | |
parser.add_argument("--auth-type", type=str, default="basic", choices=["basic", "none"], help="Authentication type.") | |
parser.add_argument("--storage-dir", type=str, default="storage", help="Directory to store Docker images.") | |
parser.add_argument("--adminpwd", type=str, default="password", help="Admin password") | |
args = parser.parse_args() | |
# Применяем настройки из командной строки | |
readonly = args.readonly | |
auth_type = args.auth_type | |
STORAGE_DIR = args.storage_dir | |
users = {"admin": args.adminpwd} | |
if not os.path.exists(STORAGE_DIR): | |
os.makedirs(STORAGE_DIR) | |
if args.no_ssl: | |
app.run(host=args.host, port=args.port) | |
else: | |
app.run(host=args.host, port=args.port, ssl_context=(args.ssl_cert, args.ssl_key)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/sh | |
mkdir certs | |
openssl req -x509 -newkey rsa:4096 -nodes -out certs/cert.pem -keyout certs/key.pem -days 365 -subj "/CN=localhost" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment