Skip to content

Instantly share code, notes, and snippets.

@letenkov
Created February 20, 2025 06:21
Show Gist options
  • Save letenkov/13a1c723a3ddcbe6effbab5c3ce217b8 to your computer and use it in GitHub Desktop.
Save letenkov/13a1c723a3ddcbe6effbab5c3ce217b8 to your computer and use it in GitHub Desktop.

Schema Fetcher

This script is designed to fetch schemas from a schema registry and save them as JSON files.

Requirements

  • Python 3.6 or later
  • requests library

Installation

Ensure that Python 3.6 or later is installed, then install the required library:

pip install requests

Usage

The script accepts the following command-line arguments:

  1. schema_registry_url: The URL of the schema registry.
  2. username: Your username for authentication.
  3. password: Your password for authentication.
  4. output_dir: Directory where the schemas will be saved.

Run the script by passing the required arguments:

python schema_fetcher.py <schema_registry_url> <username> <password> <output_dir>

Example

python schema_fetcher.py https://your-schema-registry-url.com your-username your-password ./schemas

This will save the fetched schemas into the schemas directory, relative to the current path.

License

This project is licensed under the Apache License 2.0. See the LICENSE file for more details.

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import requests
import json
from requests.auth import HTTPBasicAuth
from concurrent.futures import ThreadPoolExecutor
import logging
import argparse
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def fetch_and_save_schema(subject, schema_registry_url, auth, output_dir):
logging.info(f"Processing subject: {subject}")
try:
# Fetch the latest schema version for the given subject
schema_response = requests.get(
f"{schema_registry_url}/subjects/{subject}/versions/latest",
auth=auth
)
schema_response.raise_for_status()
schema_data = schema_response.json()
logging.info(f"Fetched schema for {subject}")
# Save the original schema to a JSON file
schema_file = os.path.join(output_dir, f"{subject}.json")
with open(schema_file, 'w') as f:
json.dump(schema_data, f, indent=2)
logging.info(f"Schema for {subject} saved in JSON format at {schema_file}")
except requests.exceptions.RequestException as e:
logging.error(f"Failed to process subject {subject}: {e}")
def main():
parser = argparse.ArgumentParser(description="Fetch and save schemas from a schema registry.")
parser.add_argument('schema_registry_url', type=str, help='The URL of the schema registry.')
parser.add_argument('username', type=str, help='Username for authentication.')
parser.add_argument('password', type=str, help='Password for authentication.')
parser.add_argument('output_dir', type=str, help='Directory to save the schemas.')
args = parser.parse_args()
# Ensure the output directory exists
os.makedirs(args.output_dir, exist_ok=True)
auth = HTTPBasicAuth(args.username, args.password)
try:
# Fetch the list of subjects from the Schema Registry
logging.info("Fetching subjects from Schema Registry")
subjects_response = requests.get(
f"{args.schema_registry_url}/subjects",
auth=auth
)
subjects_response.raise_for_status()
subjects = subjects_response.json()
logging.info(f"Subjects retrieved: {subjects}")
# Use ThreadPoolExecutor for parallel processing
with ThreadPoolExecutor(max_workers=5) as executor:
executor.map(lambda subject: fetch_and_save_schema(subject, args.schema_registry_url, auth, args.output_dir), subjects)
except requests.exceptions.RequestException as e:
logging.error(f"Failed to retrieve subjects: {e}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment