Skip to content

Instantly share code, notes, and snippets.

@openstacker
Created January 12, 2018 00:41
Show Gist options
  • Select an option

  • Save openstacker/cf699c7de7171d2d53d78dc273fc40bc to your computer and use it in GitHub Desktop.

Select an option

Save openstacker/cf699c7de7171d2d53d78dc273fc40bc to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
#
# Copyright 2017 Catalyst IT Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import bz2
import datetime
import json
import os
import prettytable
import subprocess
import sys
import time
import traceback
import urllib
import uuid
from cinderclient.v1 import client as cinder_client
import eventlet
from glanceclient import client as glance_client
from oslo_utils import importutils
from oslo_utils import encodeutils
from keystoneclient.v2_0 import client as keystone_client
from novaclient import client as nova_client
from ceilometerclient import client as ceilometer_client
from neutronclient.v2_0 import client as neutron_client
PING_TIMEOUT = 300
def arg(*args, **kwargs):
def _decorator(func):
func.__dict__.setdefault('arguments', []).insert(0, (args, kwargs))
return func
return _decorator
class Shell(object):
def get_base_parser(self):
parser = argparse.ArgumentParser(
prog='test_image',
description='Testing script used for any OpenStack test.',
add_help=False,
)
# Global arguments
parser.add_argument('-h', '--help',
action='store_true',
help=argparse.SUPPRESS,
)
parser.add_argument('-a', '--os-auth-url', metavar='OS_AUTH_URL',
type=str, required=False, dest='OS_AUTH_URL',
default=os.environ.get('OS_AUTH_URL', None),
help='Keystone Authentication URL')
parser.add_argument('-u', '--os-username', metavar='OS_USERNAME',
type=str, required=False, dest='OS_USERNAME',
default=os.environ.get('OS_USERNAME', None),
help='Username for authentication')
parser.add_argument('-p', '--os-password', metavar='OS_PASSWORD',
type=str, required=False, dest='OS_PASSWORD',
default=os.environ.get('OS_PASSWORD', None),
help='Password for authentication')
parser.add_argument('-t', '--os-tenant-name',
metavar='OS_TENANT_NAME',
type=str, required=False,
dest='OS_TENANT_NAME',
default=os.environ.get('OS_TENANT_NAME', None),
help='Tenant name for authentication')
parser.add_argument('-r', '--os-region-name',
metavar='OS_REGION_NAME',
type=str, required=False,
dest='OS_REGION_NAME',
default=os.environ.get('OS_REGION_NAME', None),
help='Region for authentication')
parser.add_argument('-c', '--os-cacert', metavar='OS_CACERT',
dest='OS_CACERT',
default=os.environ.get('OS_CACERT'),
help='Path of CA TLS certificate(s) used to '
'verify the remote server\'s certificate. '
'Without this option glance looks for the '
'default system CA certificates.')
parser.add_argument('-k', '--insecure',
default=False,
action='store_true', dest='OS_INSECURE',
help='Explicitly allow script to perform '
'\"insecure SSL\" (https) requests. '
'The server\'s certificate will not be '
'verified against any certificate authorities.'
' This option should be used with caution.')
return parser
def get_subcommand_parser(self):
parser = self.get_base_parser()
self.subcommands = {}
subparsers = parser.add_subparsers(metavar='<subcommand>')
submodule = importutils.import_module('test_image')
self._find_actions(subparsers, submodule)
self._find_actions(subparsers, self)
return parser
def _find_actions(self, subparsers, actions_module):
for attr in (a for a in dir(actions_module) if a.startswith('do_')):
command = attr[3:].replace('_', '-')
callback = getattr(actions_module, attr)
desc = callback.__doc__ or ''
help = desc.strip().split('\n')[0]
arguments = getattr(callback, 'arguments', [])
subparser = subparsers.add_parser(command,
help=help,
description=desc,
add_help=False,
formatter_class=HelpFormatter
)
subparser.add_argument('-h', '--help',
action='help',
help=argparse.SUPPRESS,
)
self.subcommands[command] = subparser
for (args, kwargs) in arguments:
subparser.add_argument(*args, **kwargs)
subparser.set_defaults(func=callback)
@arg('command', metavar='<subcommand>', nargs='?',
help='Display help for <subcommand>.')
def do_help(self, args):
"""Display help about this program or one of its subcommands."""
if getattr(args, 'command', None):
if args.command in self.subcommands:
self.subcommands[args.command].print_help()
else:
raise Exception("'%s' is not a valid subcommand" %
args.command)
else:
self.parser.print_help()
def init_client(self, args):
try:
from keystoneauth1.identity import v2
from keystoneauth1 import session
auth = v2.Password(auth_url=args.OS_AUTH_URL,
username=args.OS_USERNAME,
password=args.OS_PASSWORD,
tenant_name=args.OS_TENANT_NAME)
sess = session.Session(auth=auth)
keystone = keystone_client.Client(session=sess)
self.keystone = keystone
except Exception as e:
# FIXME(flwang): Improve the exception catching
raise e
try:
nova = nova_client.Client('2', session=sess, region_name=args.OS_REGION_NAME)
self.nova = nova
except Exception as e:
raise e
try:
glance = glance_client.Client('1', session=sess,
region_name=args.OS_REGION_NAME)
self.glance = glance
except Exception as e:
raise e
try:
neutron = neutron_client.Client(session=sess,
region_name=args.OS_REGION_NAME)
self.neutron = neutron
except Exception as e:
raise e
def main(self, argv):
parser = self.get_base_parser()
(options, args) = parser.parse_known_args(argv)
subcommand_parser = self.get_subcommand_parser()
self.parser = subcommand_parser
if options.help or not argv:
self.do_help(options)
return 0
args = subcommand_parser.parse_args(argv)
if args.func == self.do_help:
self.do_help(args)
return 0
try:
self.init_client(args)
args.func(self, args)
except Exception:
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_exception(exc_type, exc_value, exc_traceback,
limit=2, file=sys.stdout)
class HelpFormatter(argparse.HelpFormatter):
def start_section(self, heading):
# Title-case the headings
heading = '%s%s' % (heading[0].upper(), heading[1:])
super(HelpFormatter, self).start_section(heading)
@arg('--name', metavar='IMAGE_NAME',
dest='IMAGE_NAME', required=True,
help='Image name to be created')
@arg('--path', metavar='IMAGE_PATH', dest='IMAGE_PATH',
help='The path of image data', required=True)
def do_create(shell, args):
image = _create_image(shell, args.IMAGE_PATH, args.IMAGE_NAME)
print image
def _create_image(shell, image_path, image_name):
kwargs = {}
image = shell.glance.images.create(name=image_name,
container_format='bare',
disk_format='raw',
min_disk=10,
min_ram=1024,
data=open(image_path, 'r'),
**kwargs)
print('Created new image %s successfully.' % image.name)
return image
def print_list(objs, fields, formatters={}):
pt = prettytable.PrettyTable([f for f in fields], caching=False)
pt.align = 'l'
for o in objs:
row = []
for field in fields:
if field in formatters:
row.append(formatters[field](o))
else:
field_name = field.lower().replace(' ', '_')
if type(o) == dict and field in o:
data = o[field_name]
else:
data = getattr(o, field_name, None) or ''
row.append(data)
pt.add_row(row)
print(encodeutils.safe_encode(pt.get_string()))
if __name__ == '__main__':
try:
Shell().main(sys.argv[1:])
except KeyboardInterrupt:
print("Terminating...")
sys.exit(1)
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_exception(exc_type, exc_value, exc_traceback,
limit=2, file=sys.stdout)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment