|
import argparse |
|
import json |
|
import cfn_tools |
|
import pprint |
|
|
|
try: |
|
getattr(__builtins__, 'basestring') |
|
except AttributeError: |
|
basestring = str |
|
|
|
|
|
class object_registry(object): |
|
"""Keep track of objects being created as Parameters or Resources |
|
in order to map back to due to use in intrinsic functions like |
|
Ref() and GetAtt(). |
|
""" |
|
def __init__(self): |
|
self.objects = {} |
|
|
|
def add(self, o): |
|
new_name = o.replace('-', '_') |
|
self.objects[o] = new_name |
|
return new_name |
|
|
|
def lookup(self, o): |
|
if o in self.objects: |
|
return self.objects[o] |
|
else: |
|
return output_value(o) |
|
|
|
|
|
objects = object_registry() |
|
|
|
object_functions = { |
|
"Table": ["ProvisionedThroughput", "PrimaryKey", "Element"], |
|
"LoadBalancer": ["HealthCheck", "ConnectionDrainingPolicy", |
|
"AccessLoggingPolicy"], |
|
"Queue": ["RedrivePolicy"], |
|
"Bucket": ["WebsiteConfiguration"], |
|
"User": ["LoginProfile"], |
|
"Topic": ["Subscription"], |
|
"Instance": ["NetworkInterfaceProperty", |
|
"PrivateIpAddressSpecification"], |
|
"RecordSet": ["RecordSetType"], |
|
"Policy": ["PolicyType"], |
|
} |
|
|
|
|
|
def additional_imports(o): |
|
if o in object_functions: |
|
return ", ".join([o] + object_functions[o]) |
|
else: |
|
return o |
|
|
|
|
|
def do_header(d): |
|
"""Output a stock header for the new Python script and also try to |
|
figure out the Resource imports needed by the template. |
|
""" |
|
print('from troposphere import Base64, Select, FindInMap, GetAtt') |
|
print('from troposphere import GetAZs, Join, Output, If, And, Not') |
|
print('from troposphere import Or, Equals, Condition') |
|
print('from troposphere import Parameter, Ref, Tags, Template') |
|
print('from troposphere.cloudformation import Init') |
|
print('from troposphere.cloudfront import Distribution') |
|
print('from troposphere.cloudfront import DistributionConfig') |
|
print('from troposphere.cloudfront import Origin, DefaultCacheBehavior') |
|
print('from troposphere.ec2 import PortRange') |
|
|
|
# Loop over the resources to find imports |
|
if 'Resources' in d: |
|
seen = [] |
|
resources = d['Resources'] |
|
for k, v in resources.items(): |
|
(mod, tropo_object) = generate_troposphere_object(v['Type']) |
|
if tropo_object not in seen: |
|
seen.append(tropo_object) |
|
print('from troposphere.{} import {}' |
|
.format(mod, additional_imports(tropo_object))) |
|
print() |
|
print() |
|
print("t = Template()") |
|
print() |
|
|
|
|
|
def do_awstemplateformatversion(d): |
|
"""Output the template version""" |
|
print('t.add_version("{}")'.format(d['AWSTemplateFormatVersion'])) |
|
print() |
|
|
|
|
|
def do_description(d): |
|
"""Output the template Description""" |
|
print('t.add_description("""\\\n{}""")'.format(d['Description'])) |
|
|
|
|
|
def do_parameters(d): |
|
"""Output the template Parameters""" |
|
params = d['Parameters'] |
|
for k, v in params.items(): |
|
object_name = objects.add(k) |
|
print('{} = t.add_parameter(Parameter('.format(object_name)) |
|
print(' "{}",'.format(k)) |
|
for pk, pv in v.items(): |
|
print(' {}={},'.format(pk, output_value(pv))) |
|
print("))") |
|
print() |
|
|
|
|
|
def do_conditions(d): |
|
"""Output the template Conditions""" |
|
conditions = d['Conditions'] |
|
for k, v in conditions.items(): |
|
print('t.add_condition("{}",'.format(k)) |
|
print(' {}'.format(output_value(v))) |
|
print(")") |
|
print() |
|
|
|
|
|
def do_mappings(d): |
|
"""Output the template Mappings""" |
|
mappings = d['Mappings'] |
|
for k, v in mappings.items(): |
|
print('t.add_mapping("{}",'.format(k)) |
|
pprint.pprint(v) |
|
print(")") |
|
print() |
|
|
|
|
|
def map_module(mod): |
|
"""Map module names as needed""" |
|
if mod == "lambda": |
|
return "awslambda" |
|
return mod |
|
|
|
|
|
def generate_troposphere_object(typename): |
|
"""Try to determine the troposphere object to create from the Type |
|
specification from the Resource being converted. |
|
""" |
|
t = typename.split(':') |
|
if len(t) == 5: |
|
return (map_module(t[2].lower()), t[4]) |
|
else: |
|
return ('', typename) |
|
|
|
|
|
def output_dict(d): |
|
out = [] |
|
for k, v in d.items(): |
|
out.append("{}={}".format(k.replace('\\', '\\\\'), output_value(v))) |
|
return ", ".join(out) |
|
|
|
|
|
known_functions = { |
|
"DistributionConfig": 1, |
|
"DefaultCacheBehavior": 1, |
|
"ProvisionedThroughput": 1, |
|
"NetworkInterfaces": 1, |
|
"WebsiteConfiguration": 1, |
|
"RedrivePolicy": 1, |
|
"Subscription": 1, |
|
"KeySchema": 1, |
|
"HashKeyElement": 1, |
|
"HealthCheck": 1, |
|
"LoginProfile": 1, |
|
"ConnectionDrainingPolicy": 1, |
|
"AccessLoggingPolicy": 1, |
|
"AWS::CloudFormation::Init": 1, |
|
"PrivateIpAddresses": 1, |
|
"ContainerDefinitions": 1, |
|
} |
|
|
|
function_quirks = { |
|
"KeySchema": "PrimaryKey", |
|
"HashKeyElement": {"Element": ["AttributeName", "AttributeType"]}, |
|
"NetworkInterfaces": ["NetworkInterfaceProperty"], |
|
"Subscription": ["Subscription"], |
|
"LoginProfile": {"LoginProfile": ["Password"]}, |
|
"AWS::CloudFormation::Init": {"Init": []}, |
|
"PrivateIpAddresses": ["PrivateIpAddressSpecification"], |
|
"ContainerDefinitions": ["ContainerDefinition"], |
|
} |
|
|
|
|
|
def do_output_function(k, f, v): |
|
print(' {}={}('.format(k, f)) |
|
for pk, pv in v.items(): |
|
if pk in known_functions: |
|
do_resources_content(pk, pv, "") |
|
else: |
|
print(" {}={},".format(pk, output_value(pv))) |
|
print(' ),') |
|
|
|
|
|
def do_output_quirk_list(k, f, v): |
|
print(' {}=['.format(k)) |
|
for e in v: |
|
print(' {}('.format(f)) |
|
for pk, pv in e.items(): |
|
if pk in known_functions: |
|
do_resources_content(pk, pv) |
|
else: |
|
print(" {}={},".format(pk, output_value(pv))) |
|
print(' ),') |
|
print(' ],') |
|
|
|
|
|
def do_output_quirk_mapping(k, v): |
|
m = function_quirks[k] |
|
for pk in m.keys(): |
|
print(' {}={}('.format(k, pk)) |
|
for e in m[pk]: |
|
print(" {},".format(output_value(v[e]))) |
|
print(' ),') |
|
|
|
|
|
def do_output_quirk_metadata(k, v): |
|
m = function_quirks[k] |
|
for pk in m.keys(): |
|
print(' Metadata={}('.format(pk)) |
|
print(" {},".format(output_value(v))) |
|
print(' ),') |
|
|
|
|
|
def do_resources_content(k, v, p=""): |
|
if k in function_quirks: |
|
x = function_quirks[k] |
|
if(isinstance(x, dict)): |
|
if(p == "Metadata"): |
|
do_output_quirk_metadata(k, v) |
|
else: |
|
do_output_quirk_mapping(k, v) |
|
elif(isinstance(x, list)): |
|
do_output_quirk_list(k, x[0], v) |
|
else: |
|
do_output_function(k, x, v) |
|
else: |
|
do_output_function(k, k, v) |
|
|
|
|
|
top_level_aliases = { |
|
"RecordSet": "RecordSetType", |
|
"Policy": "PolicyType", |
|
} |
|
|
|
|
|
def do_resources(d): |
|
"""Output the template Resources""" |
|
|
|
resources = d['Resources'] |
|
for k, v in resources.items(): |
|
object_name = objects.add(k) |
|
(_, tropo_object) = generate_troposphere_object(v['Type']) |
|
if tropo_object in top_level_aliases: |
|
tropo_object = top_level_aliases[tropo_object] |
|
print('{} = t.add_resource({}('.format(object_name, tropo_object)) |
|
print(' "{}",'.format(k)) |
|
for p in filter(lambda x: x in v, ['Metadata', 'Properties']): |
|
for pk, pv in v[p].items(): |
|
if pk == "Tags": |
|
print(' Tags=Tags(') |
|
for d in pv: |
|
print(" {}={},".format( |
|
d['Key'], output_value(d['Value']))) |
|
print(' ),') |
|
elif pk == 'PortRange': |
|
print(" {}={}({}),".format(pk, pk, output_dict(pv))) |
|
elif pk in known_functions: |
|
do_resources_content(pk, pv, p) |
|
elif isinstance(pv, basestring): |
|
print(' {}="{}",'.format(pk, pv)) |
|
else: |
|
print(' {}={},'.format(pk, output_value(pv))) |
|
if "DependsOn" in v: |
|
print(' {}={},' |
|
.format("DependsOn", output_value(v['DependsOn']))) |
|
if "Condition" in v: |
|
print(' {}={},' |
|
.format("Condition", output_value(v['Condition']))) |
|
print("))") |
|
print() |
|
|
|
|
|
def handle_no_objects(name, values): |
|
"""Handle intrinsic functions which do not have a named resource""" |
|
return name + "(" + ", ".join(map(output_value, values)) + ")" |
|
|
|
|
|
def handle_one_object(name, values): |
|
"""Handle intrinsic functions which have a single named resource""" |
|
ret = name + "(" |
|
for i, param in enumerate(values): |
|
if i > 0: |
|
ret += ", " |
|
# First parameter might be an object name or pseudo parameter |
|
if i == 0: |
|
ret += objects.lookup(param) |
|
else: |
|
ret += output_value(param) |
|
return ret + ")" |
|
|
|
|
|
function_map = { |
|
'Fn::Base64': ("Base64", handle_no_objects), |
|
'Fn::And': ("And", handle_no_objects), |
|
'Fn::Or': ("Or", handle_no_objects), |
|
'Fn::Not': ("Not", handle_no_objects), |
|
'Fn::If': ("If", handle_one_object), |
|
'Fn::Equals': ("Equals", handle_no_objects), |
|
'Fn::FindInMap': ("FindInMap", handle_no_objects), |
|
'Fn::GetAtt': ("GetAtt", handle_one_object), |
|
'Fn::GetAZs': ("GetAZs", handle_no_objects), |
|
'Fn::Join': ("Join", handle_no_objects), |
|
'Fn::Select': ("Select", handle_one_object), |
|
'Ref': ("Ref", handle_one_object), |
|
'Condition': ("Condition", handle_one_object), |
|
} |
|
|
|
|
|
def output_value(v): |
|
"""Output a value which may be a string or a set of function calls.""" |
|
|
|
if isinstance(v, basestring): |
|
return '"{}"'.format(v.replace('\\', '\\\\') |
|
.replace('\n', '\\n') |
|
.replace("\"", "\\\"")) |
|
elif isinstance(v, bool): |
|
return '{}'.format(str(v)) |
|
elif isinstance(v, int): |
|
return '{}'.format(v) |
|
elif isinstance(v, float): |
|
return '{}'.format(v) |
|
elif isinstance(v, list): |
|
return "[" + ", ".join(map(output_value, v)) + "]" |
|
|
|
out = [] |
|
# Should only be one of these... |
|
for fk, fv in v.items(): |
|
if fk in function_map: |
|
(shortname, handler) = function_map[fk] |
|
if not isinstance(fv, list): |
|
fv = [fv] |
|
return handler(shortname, fv) |
|
else: |
|
out.append('"' + fk + '": ' + output_value(fv)) |
|
return "{ " + ", ".join(out) + " }" |
|
|
|
|
|
def do_outputs(d): |
|
"""Output the template Outputs""" |
|
|
|
outputs = d['Outputs'] |
|
for k, v in outputs.items(): |
|
print('{} = t.add_output(Output('.format(k)) |
|
print(' "{}",'.format(k)) |
|
for pk, pv in v.items(): |
|
if isinstance(pv, basestring): |
|
print(' {}="{}",'.format(pk, pv)) |
|
else: |
|
print(' {}={},'.format(pk, output_value(pv))) |
|
print("))") |
|
print() |
|
|
|
|
|
def do_trailer(d): |
|
"""Output a trailer section for the new Python script.""" |
|
print('print(t.to_json())') |
|
|
|
def run(filename): |
|
f = open(filename).read() |
|
try: |
|
d = json.loads(f) |
|
except json.JSONDecodeError: |
|
d = cfn_tools.load_yaml(f) |
|
except: |
|
print('Failed to load file') |
|
return 0 |
|
|
|
do_header(d) |
|
|
|
sections = [ |
|
'AWSTemplateFormatVersion', |
|
'Description', |
|
'Parameters', |
|
'Conditions', |
|
'Mappings', |
|
'Resources', |
|
'Outputs', |
|
] |
|
|
|
for s in sections: |
|
if s in d.keys(): |
|
globals()["do_" + s.lower()](d) |
|
|
|
do_trailer(d) |
|
|
|
if __name__ == '__main__': |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument("filename", help="template to convert") |
|
args = parser.parse_args() |
|
|
|
run(args.filename) |