Last active
October 1, 2024 09:27
-
-
Save SmartManoj/791e7aa0f504eb1e23afc300b0cb627b to your computer and use it in GitHub Desktop.
This script converts an OpenAPI 3.X specification into an OpenAPI 2.0 specification, handling changes like `requestBody`, `$ref`, and security schemes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import requests | |
import json | |
from pyperclip import copy | |
debug = 0 | |
HOST_URL = 'https://default-host.com' | |
def convert_to_openapi_2(spec): | |
global debug | |
# Initialize the OpenAPI 2.0 specification | |
openapi2_spec = { | |
'swagger': '2.0', | |
'info': spec.get('info', {}), | |
'host': 'default-host.com', | |
'basePath': '/', | |
'schemes': ['https'], | |
'paths': {}, | |
'definitions': {}, | |
'securityDefinitions': {}, | |
} | |
# Convert servers to host and basePath | |
servers = spec.get('servers', []) | |
if servers: | |
url = servers[0].get('url', '') | |
# Check if the URL is an absolute URL with a scheme | |
if '://' in url: | |
scheme, rest = url.split('://', 1) | |
openapi2_spec['schemes'] = [scheme] | |
if '/' in rest: | |
host, basePath = rest.split('/', 1) | |
openapi2_spec['host'] = host if host else HOST_URL | |
openapi2_spec['basePath'] = '/' + basePath | |
else: | |
openapi2_spec['host'] = rest if rest else HOST_URL | |
openapi2_spec['basePath'] = '/' | |
else: | |
# Handle relative URLs (e.g., /api/DataImport/v1) | |
openapi2_spec['host'] = "default-host.com" | |
openapi2_spec['basePath'] = url if url else '/' | |
# Convert components to definitions and securityDefinitions | |
components = spec.get('components', {}) | |
openapi2_spec['definitions'] = convert_definitions(components.get('schemas', {})) | |
openapi2_spec['parameters'] = convert_parameters(components.get('parameters', {})) | |
openapi2_spec['responses'] = convert_responses(components.get('responses', {})) | |
# Convert security schemes | |
security_schemes = components.get('securitySchemes', {}) | |
for key, scheme in security_schemes.items(): | |
openapi2_spec['securityDefinitions'][key] = convert_security_scheme(scheme) | |
# Convert paths | |
paths = spec.get('paths', {}) | |
for path, methods in paths.items(): | |
openapi2_spec['paths'][path] = {} | |
for method, operation in methods.items(): | |
if method.startswith('x-'): | |
continue # Skip extensions | |
openapi2_operation = convert_operation(operation, path) | |
openapi2_spec['paths'][path][method] = openapi2_operation | |
return openapi2_spec | |
def convert_responses(responses): | |
converted_responsed = {} | |
for key, response in responses.items(): | |
if 'content' in response: | |
del response['content'] | |
if 'headers' in response: | |
del response['headers'] | |
converted_responsed[key] = response | |
return converted_responsed | |
def convert_ref(ref): | |
return ref.replace('#/components/schemas/', '#/definitions/') | |
def convert_definitions(schemas): | |
definitions = {} | |
for key, schema in schemas.items(): | |
# Handle $ref conversion for schemas | |
if '$ref' in schema: | |
schema['$ref'] = schema['$ref'].replace('#/components/schemas/', '#/definitions/') | |
# remove nullable | |
for property in schema.get('properties', {}).values(): | |
if 'nullable' in property: | |
del property['nullable'] | |
if '$ref' in property: | |
property['$ref'] = property['$ref'].replace('#/components/schemas/', '#/definitions/') | |
if 'items' in property: | |
property['items'] = convert_schema(property['items']) | |
definitions[key] = schema | |
return definitions | |
def convert_items(items): | |
converted_items = {} | |
for key, item in items.items(): | |
if 'schema' in item: | |
item['schema'] = convert_schema(item['schema']) | |
converted_items[key] = item | |
return converted_items | |
def convert_security_scheme(scheme): | |
type_mapping = { | |
'http': 'basic', | |
'apiKey': 'apiKey', | |
'oauth2': 'oauth2', | |
'openIdConnect': 'oauth2', # OpenAPI 2.0 doesn't support OpenID Connect directly | |
} | |
converted_scheme = { | |
'type': type_mapping.get(scheme.get('type'), 'apiKey'), | |
'description': scheme.get('description', ''), | |
} | |
if converted_scheme['type'] == 'apiKey': | |
converted_scheme['name'] = scheme.get('name') | |
converted_scheme['in'] = scheme.get('in') | |
elif converted_scheme['type'] == 'oauth2': | |
# Simplified conversion; OAuth flows differ between versions | |
converted_scheme['flow'] = 'accessCode' | |
flows = scheme.get('flows', {}).get('authorizationCode', {}) | |
converted_scheme['authorizationUrl'] = HOST_URL + flows.get('authorizationUrl', '') | |
converted_scheme['tokenUrl'] = HOST_URL + flows.get('tokenUrl', '') | |
converted_scheme['scopes'] = flows.get('scopes', {}) | |
return converted_scheme | |
def convert_operation(operation, path): | |
openapi2_operation = { | |
'summary': operation.get('summary', ''), | |
'description': operation.get('description', ''), | |
'operationId': operation.get('operationId', ''), | |
'produces': ['application/json'], # Default value; adjust as needed | |
'parameters': [], | |
'responses': {}, | |
'security': operation.get('security', []), | |
'tags': operation.get('tags', []), | |
} | |
# Convert parameters | |
for param in operation.get('parameters', []): | |
openapi2_param = convert_path_parameters(param, path) | |
if openapi2_param: | |
openapi2_operation['parameters'].append(openapi2_param) | |
# Convert requestBody to parameters | |
if 'requestBody' in operation: | |
request_body = operation['requestBody'] | |
content = request_body.get('content', {}) | |
# Initialize consumes for this operation | |
consumes = [] | |
# Handle 'multipart/form-data' for file uploads | |
if 'multipart/form-data' in content: | |
media_details = content['multipart/form-data'] | |
schema = media_details.get('schema', {}) | |
properties = schema.get('properties', {}) | |
# Set consumes to 'multipart/form-data' for file uploads | |
consumes.append('multipart/form-data') | |
# Convert each property into formData parameters | |
for prop_name, prop_details in properties.items(): | |
openapi2_param = { | |
'name': prop_name, | |
'in': 'formData', # In OpenAPI 2.0, file uploads are handled via formData | |
'description': request_body.get('description', ''), | |
'required': request_body.get('required', False), | |
'type': 'file' if prop_details.get('format') == 'binary' else prop_details.get('type', 'string') | |
} | |
openapi2_operation['parameters'].append(openapi2_param) | |
# Handle 'application/json' as a body parameter | |
elif 'application/json' in content: | |
media_details = content['application/json'] | |
schema = media_details.get('schema', {}) | |
# Set consumes to 'application/json' for JSON content | |
consumes.append('application/json') | |
openapi2_param = { | |
'name': 'body', # In OpenAPI 2.0, the body param is called 'body' | |
'in': 'body', | |
'description': request_body.get('description', ''), | |
'required': request_body.get('required', False), | |
'schema': convert_schema(schema), # Ensure schema conversion with $ref fixes | |
} | |
# Append the body parameter to the operation parameters (only one body allowed) | |
openapi2_operation['parameters'].append(openapi2_param) | |
# Fallback to the first available media type if none of the above is present | |
else: | |
media_type, media_details = next(iter(content.items()), (None, {})) | |
if media_details: | |
schema = media_details.get('schema', {}) | |
consumes.append(media_type) # Add the media type to consumes | |
openapi2_param = { | |
'name': 'body', | |
'in': 'body', | |
'description': request_body.get('description', ''), | |
'required': request_body.get('required', False), | |
'schema': convert_schema(schema), | |
} | |
openapi2_operation['parameters'].append(openapi2_param) | |
# Add consumes to the OpenAPI 2.0 operation if applicable | |
if consumes: | |
openapi2_operation['consumes'] = consumes | |
# Convert responses | |
responses = operation.get('responses', {}) | |
for code, response in responses.items(): | |
openapi2_response = { | |
'description': response.get('description', ''), | |
'schema': {}, | |
} | |
content = response.get('content', {}) | |
if 'application/json' in content: | |
schema = content['application/json'].get('schema', {}) | |
openapi2_response['schema'] = convert_schema(schema) | |
if 'schema' in openapi2_response: | |
openapi2_response['schema'] = convert_schema(openapi2_response['schema']) | |
openapi2_operation['responses'][code] = openapi2_response | |
return openapi2_operation | |
def convert_schema(schema): | |
if schema is None or 'oneOf' in schema: | |
return {} | |
# Handle $ref conversion | |
if '$ref' in schema: | |
schema['$ref'] = schema['$ref'].replace('#/components/schemas/', '#/definitions/') | |
if 'items' in schema: | |
schema['items'] = convert_schema(schema['items']) | |
return schema | |
def get_value_from_ref(ref): | |
ref = ref[2:] | |
new_spec = spec | |
for i in ref.split('/'): | |
new_spec = new_spec[i] | |
return new_spec | |
def convert_parameters(parameters): | |
openapi2_params = {} | |
for key,param in parameters.items(): | |
openapi2_param = convert_path_parameters(param, '') | |
if openapi2_param: | |
openapi2_params[key] = openapi2_param | |
return openapi2_params | |
def convert_path_parameters(param, path): | |
if '$ref' in param: | |
param = (get_value_from_ref(param['$ref'])) | |
# print(spec['components']['schemas']) | |
openapi2_param = { | |
'name': param.get('name'), | |
'in': param.get('in'), | |
'description': param.get('description', ''), | |
'required': param.get('required', False), | |
} | |
is_path = param.get('in') == 'path' | |
if is_path and path: | |
name = param.get('name') | |
if f'{name}' not in path: | |
# print(f'Parameter {name} in path {path} is missing in the operation.') | |
return None | |
if 'schema' in param: | |
schema = param['schema'] | |
openapi2_param['type'] = schema.get('type', 'string') | |
if 'format' in schema: | |
openapi2_param['format'] = schema['format'] | |
else: | |
openapi2_param['type'] = 'string' # Default type | |
return openapi2_param | |
# Declared path parameter "orgUnitCode" needs to be defined within every operation in the path (missing in "put", "delete"), or moved to the path-level parameters object | |
# iterate over paths and check if the parameter is missing in the operations | |
def check_path_parameters(spec): | |
for path, methods in spec['paths'].items(): | |
for method, operation in methods.items(): | |
parameters = operation.get('parameters', []) | |
for parameter in parameters: | |
if parameter.get('name') == 'orgUnitCode' and parameter.get('in') == 'path': | |
print(f"Parameter 'orgUnitCode' in path '{path}' with method '{method}' is missing in the operation.") | |
if __name__ == '__main__': | |
if 0: | |
url='https://basic-curd-fast-api.vercel.app/openapi.json' | |
dir_name = '.' | |
file_name = 'openapi.json' | |
response = requests.get(url) | |
spec = response.json() | |
else: | |
path = r'c:\Users\smart\Downloads\64Gram Desktop\yooz.json' | |
dir_name = os.path.dirname(path) | |
file_name = os.path.basename(path).split('.')[0] | |
with open(path, 'r') as file: | |
spec = json.load(file) | |
openapi2_spec = convert_to_openapi_2(spec) | |
copy(json.dumps(openapi2_spec, indent=2)) | |
with open(os.path.join(dir_name, file_name+'_2.0.json'), 'w') as file: | |
json.dump(openapi2_spec, file, indent=2) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment