|
# Licensed under the Apache License, Version 2.0 (the "License"); |
|
# you may not use this file except in compliance with the License. |
|
# You may obtain a copy of the License at |
|
# |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
|
# |
|
# Unless required by applicable law or agreed to in writing, software |
|
# distributed under the License is distributed on an "AS IS" BASIS, |
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
# See the License for the specific language governing permissions and |
|
# limitations under the License. |
|
|
|
import os |
|
import requests |
|
import json |
|
from requests.auth import HTTPBasicAuth |
|
from concurrent.futures import ThreadPoolExecutor |
|
import logging |
|
import argparse |
|
|
|
# Set up logging |
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
|
def fetch_and_save_schema(subject, schema_registry_url, auth, output_dir): |
|
logging.info(f"Processing subject: {subject}") |
|
try: |
|
# Fetch the latest schema version for the given subject |
|
schema_response = requests.get( |
|
f"{schema_registry_url}/subjects/{subject}/versions/latest", |
|
auth=auth |
|
) |
|
schema_response.raise_for_status() |
|
schema_data = schema_response.json() |
|
logging.info(f"Fetched schema for {subject}") |
|
|
|
# Save the original schema to a JSON file |
|
schema_file = os.path.join(output_dir, f"{subject}.json") |
|
with open(schema_file, 'w') as f: |
|
json.dump(schema_data, f, indent=2) |
|
logging.info(f"Schema for {subject} saved in JSON format at {schema_file}") |
|
|
|
except requests.exceptions.RequestException as e: |
|
logging.error(f"Failed to process subject {subject}: {e}") |
|
|
|
def main(): |
|
parser = argparse.ArgumentParser(description="Fetch and save schemas from a schema registry.") |
|
parser.add_argument('schema_registry_url', type=str, help='The URL of the schema registry.') |
|
parser.add_argument('username', type=str, help='Username for authentication.') |
|
parser.add_argument('password', type=str, help='Password for authentication.') |
|
parser.add_argument('output_dir', type=str, help='Directory to save the schemas.') |
|
|
|
args = parser.parse_args() |
|
|
|
# Ensure the output directory exists |
|
os.makedirs(args.output_dir, exist_ok=True) |
|
|
|
auth = HTTPBasicAuth(args.username, args.password) |
|
|
|
try: |
|
# Fetch the list of subjects from the Schema Registry |
|
logging.info("Fetching subjects from Schema Registry") |
|
subjects_response = requests.get( |
|
f"{args.schema_registry_url}/subjects", |
|
auth=auth |
|
) |
|
subjects_response.raise_for_status() |
|
subjects = subjects_response.json() |
|
logging.info(f"Subjects retrieved: {subjects}") |
|
|
|
# Use ThreadPoolExecutor for parallel processing |
|
with ThreadPoolExecutor(max_workers=5) as executor: |
|
executor.map(lambda subject: fetch_and_save_schema(subject, args.schema_registry_url, auth, args.output_dir), subjects) |
|
|
|
except requests.exceptions.RequestException as e: |
|
logging.error(f"Failed to retrieve subjects: {e}") |
|
|
|
if __name__ == "__main__": |
|
main() |