Skip to content

Instantly share code, notes, and snippets.

@uint0
Last active October 24, 2019 23:48
Show Gist options
  • Save uint0/ab4ef4e735de7ff68417ce83406dea58 to your computer and use it in GitHub Desktop.
Save uint0/ab4ef4e735de7ff68417ce83406dea58 to your computer and use it in GitHub Desktop.
A script for converting yaml Cloudformation templates to a https://github.com/cloudtools/troposphere format

Tropopause

A script for converting yaml Cloudformation templates to a troposphere format.

A simple adaption of the troposphere cfn2py.py script to ingest CFN yaml files.

Outputs the converted file to stdout.

Installation

$ pip install -r requirements.txt

Usage

$ python3 tropopause.py <CFNtemplate.yaml | CFNTemplate.json>
cfn-flip==1.2.1
Click==7.0
PyYAML==5.1.2
six==1.12.0
import argparse
import json
import cfn_tools
import pprint
try:
getattr(__builtins__, 'basestring')
except AttributeError:
basestring = str
class object_registry(object):
"""Keep track of objects being created as Parameters or Resources
in order to map back to due to use in intrinsic functions like
Ref() and GetAtt().
"""
def __init__(self):
self.objects = {}
def add(self, o):
new_name = o.replace('-', '_')
self.objects[o] = new_name
return new_name
def lookup(self, o):
if o in self.objects:
return self.objects[o]
else:
return output_value(o)
objects = object_registry()
object_functions = {
"Table": ["ProvisionedThroughput", "PrimaryKey", "Element"],
"LoadBalancer": ["HealthCheck", "ConnectionDrainingPolicy",
"AccessLoggingPolicy"],
"Queue": ["RedrivePolicy"],
"Bucket": ["WebsiteConfiguration"],
"User": ["LoginProfile"],
"Topic": ["Subscription"],
"Instance": ["NetworkInterfaceProperty",
"PrivateIpAddressSpecification"],
"RecordSet": ["RecordSetType"],
"Policy": ["PolicyType"],
}
def additional_imports(o):
if o in object_functions:
return ", ".join([o] + object_functions[o])
else:
return o
def do_header(d):
"""Output a stock header for the new Python script and also try to
figure out the Resource imports needed by the template.
"""
print('from troposphere import Base64, Select, FindInMap, GetAtt')
print('from troposphere import GetAZs, Join, Output, If, And, Not')
print('from troposphere import Or, Equals, Condition')
print('from troposphere import Parameter, Ref, Tags, Template')
print('from troposphere.cloudformation import Init')
print('from troposphere.cloudfront import Distribution')
print('from troposphere.cloudfront import DistributionConfig')
print('from troposphere.cloudfront import Origin, DefaultCacheBehavior')
print('from troposphere.ec2 import PortRange')
# Loop over the resources to find imports
if 'Resources' in d:
seen = []
resources = d['Resources']
for k, v in resources.items():
(mod, tropo_object) = generate_troposphere_object(v['Type'])
if tropo_object not in seen:
seen.append(tropo_object)
print('from troposphere.{} import {}'
.format(mod, additional_imports(tropo_object)))
print()
print()
print("t = Template()")
print()
def do_awstemplateformatversion(d):
"""Output the template version"""
print('t.add_version("{}")'.format(d['AWSTemplateFormatVersion']))
print()
def do_description(d):
"""Output the template Description"""
print('t.add_description("""\\\n{}""")'.format(d['Description']))
def do_parameters(d):
"""Output the template Parameters"""
params = d['Parameters']
for k, v in params.items():
object_name = objects.add(k)
print('{} = t.add_parameter(Parameter('.format(object_name))
print(' "{}",'.format(k))
for pk, pv in v.items():
print(' {}={},'.format(pk, output_value(pv)))
print("))")
print()
def do_conditions(d):
"""Output the template Conditions"""
conditions = d['Conditions']
for k, v in conditions.items():
print('t.add_condition("{}",'.format(k))
print(' {}'.format(output_value(v)))
print(")")
print()
def do_mappings(d):
"""Output the template Mappings"""
mappings = d['Mappings']
for k, v in mappings.items():
print('t.add_mapping("{}",'.format(k))
pprint.pprint(v)
print(")")
print()
def map_module(mod):
"""Map module names as needed"""
if mod == "lambda":
return "awslambda"
return mod
def generate_troposphere_object(typename):
"""Try to determine the troposphere object to create from the Type
specification from the Resource being converted.
"""
t = typename.split(':')
if len(t) == 5:
return (map_module(t[2].lower()), t[4])
else:
return ('', typename)
def output_dict(d):
out = []
for k, v in d.items():
out.append("{}={}".format(k.replace('\\', '\\\\'), output_value(v)))
return ", ".join(out)
known_functions = {
"DistributionConfig": 1,
"DefaultCacheBehavior": 1,
"ProvisionedThroughput": 1,
"NetworkInterfaces": 1,
"WebsiteConfiguration": 1,
"RedrivePolicy": 1,
"Subscription": 1,
"KeySchema": 1,
"HashKeyElement": 1,
"HealthCheck": 1,
"LoginProfile": 1,
"ConnectionDrainingPolicy": 1,
"AccessLoggingPolicy": 1,
"AWS::CloudFormation::Init": 1,
"PrivateIpAddresses": 1,
"ContainerDefinitions": 1,
}
function_quirks = {
"KeySchema": "PrimaryKey",
"HashKeyElement": {"Element": ["AttributeName", "AttributeType"]},
"NetworkInterfaces": ["NetworkInterfaceProperty"],
"Subscription": ["Subscription"],
"LoginProfile": {"LoginProfile": ["Password"]},
"AWS::CloudFormation::Init": {"Init": []},
"PrivateIpAddresses": ["PrivateIpAddressSpecification"],
"ContainerDefinitions": ["ContainerDefinition"],
}
def do_output_function(k, f, v):
print(' {}={}('.format(k, f))
for pk, pv in v.items():
if pk in known_functions:
do_resources_content(pk, pv, "")
else:
print(" {}={},".format(pk, output_value(pv)))
print(' ),')
def do_output_quirk_list(k, f, v):
print(' {}=['.format(k))
for e in v:
print(' {}('.format(f))
for pk, pv in e.items():
if pk in known_functions:
do_resources_content(pk, pv)
else:
print(" {}={},".format(pk, output_value(pv)))
print(' ),')
print(' ],')
def do_output_quirk_mapping(k, v):
m = function_quirks[k]
for pk in m.keys():
print(' {}={}('.format(k, pk))
for e in m[pk]:
print(" {},".format(output_value(v[e])))
print(' ),')
def do_output_quirk_metadata(k, v):
m = function_quirks[k]
for pk in m.keys():
print(' Metadata={}('.format(pk))
print(" {},".format(output_value(v)))
print(' ),')
def do_resources_content(k, v, p=""):
if k in function_quirks:
x = function_quirks[k]
if(isinstance(x, dict)):
if(p == "Metadata"):
do_output_quirk_metadata(k, v)
else:
do_output_quirk_mapping(k, v)
elif(isinstance(x, list)):
do_output_quirk_list(k, x[0], v)
else:
do_output_function(k, x, v)
else:
do_output_function(k, k, v)
top_level_aliases = {
"RecordSet": "RecordSetType",
"Policy": "PolicyType",
}
def do_resources(d):
"""Output the template Resources"""
resources = d['Resources']
for k, v in resources.items():
object_name = objects.add(k)
(_, tropo_object) = generate_troposphere_object(v['Type'])
if tropo_object in top_level_aliases:
tropo_object = top_level_aliases[tropo_object]
print('{} = t.add_resource({}('.format(object_name, tropo_object))
print(' "{}",'.format(k))
for p in filter(lambda x: x in v, ['Metadata', 'Properties']):
for pk, pv in v[p].items():
if pk == "Tags":
print(' Tags=Tags(')
for d in pv:
print(" {}={},".format(
d['Key'], output_value(d['Value'])))
print(' ),')
elif pk == 'PortRange':
print(" {}={}({}),".format(pk, pk, output_dict(pv)))
elif pk in known_functions:
do_resources_content(pk, pv, p)
elif isinstance(pv, basestring):
print(' {}="{}",'.format(pk, pv))
else:
print(' {}={},'.format(pk, output_value(pv)))
if "DependsOn" in v:
print(' {}={},'
.format("DependsOn", output_value(v['DependsOn'])))
if "Condition" in v:
print(' {}={},'
.format("Condition", output_value(v['Condition'])))
print("))")
print()
def handle_no_objects(name, values):
"""Handle intrinsic functions which do not have a named resource"""
return name + "(" + ", ".join(map(output_value, values)) + ")"
def handle_one_object(name, values):
"""Handle intrinsic functions which have a single named resource"""
ret = name + "("
for i, param in enumerate(values):
if i > 0:
ret += ", "
# First parameter might be an object name or pseudo parameter
if i == 0:
ret += objects.lookup(param)
else:
ret += output_value(param)
return ret + ")"
function_map = {
'Fn::Base64': ("Base64", handle_no_objects),
'Fn::And': ("And", handle_no_objects),
'Fn::Or': ("Or", handle_no_objects),
'Fn::Not': ("Not", handle_no_objects),
'Fn::If': ("If", handle_one_object),
'Fn::Equals': ("Equals", handle_no_objects),
'Fn::FindInMap': ("FindInMap", handle_no_objects),
'Fn::GetAtt': ("GetAtt", handle_one_object),
'Fn::GetAZs': ("GetAZs", handle_no_objects),
'Fn::Join': ("Join", handle_no_objects),
'Fn::Select': ("Select", handle_one_object),
'Ref': ("Ref", handle_one_object),
'Condition': ("Condition", handle_one_object),
}
def output_value(v):
"""Output a value which may be a string or a set of function calls."""
if isinstance(v, basestring):
return '"{}"'.format(v.replace('\\', '\\\\')
.replace('\n', '\\n')
.replace("\"", "\\\""))
elif isinstance(v, bool):
return '{}'.format(str(v))
elif isinstance(v, int):
return '{}'.format(v)
elif isinstance(v, float):
return '{}'.format(v)
elif isinstance(v, list):
return "[" + ", ".join(map(output_value, v)) + "]"
out = []
# Should only be one of these...
for fk, fv in v.items():
if fk in function_map:
(shortname, handler) = function_map[fk]
if not isinstance(fv, list):
fv = [fv]
return handler(shortname, fv)
else:
out.append('"' + fk + '": ' + output_value(fv))
return "{ " + ", ".join(out) + " }"
def do_outputs(d):
"""Output the template Outputs"""
outputs = d['Outputs']
for k, v in outputs.items():
print('{} = t.add_output(Output('.format(k))
print(' "{}",'.format(k))
for pk, pv in v.items():
if isinstance(pv, basestring):
print(' {}="{}",'.format(pk, pv))
else:
print(' {}={},'.format(pk, output_value(pv)))
print("))")
print()
def do_trailer(d):
"""Output a trailer section for the new Python script."""
print('print(t.to_json())')
def run(filename):
f = open(filename).read()
try:
d = json.loads(f)
except json.JSONDecodeError:
d = cfn_tools.load_yaml(f)
except:
print('Failed to load file')
return 0
do_header(d)
sections = [
'AWSTemplateFormatVersion',
'Description',
'Parameters',
'Conditions',
'Mappings',
'Resources',
'Outputs',
]
for s in sections:
if s in d.keys():
globals()["do_" + s.lower()](d)
do_trailer(d)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("filename", help="template to convert")
args = parser.parse_args()
run(args.filename)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment