Skip to content

Instantly share code, notes, and snippets.

@komeda-shinji
Created July 22, 2015 09:25
Show Gist options
  • Save komeda-shinji/331e86877425ba61707e to your computer and use it in GitHub Desktop.
Save komeda-shinji/331e86877425ba61707e to your computer and use it in GitHub Desktop.
aws cli のドライバーを直接呼び出す
#!/usr/bin/env python
from botocore.compat import OrderedDict
from botocore import xform_name
from botocore.hooks import HierarchicalEmitter
from botocore.paginate import PageIterator
import botocore.session
import awscli.clidriver
from awscli import EnvironmentVariables
from awscli.plugin import load_plugins
from awscli.clidriver import CLIDriver, ServiceCommand, ServiceOperation, CLIOperationCaller
from awscli.formatter import is_response_paginated
class MyCLIOperationCaller(CLIOperationCaller):
def invoke(self, service_name, operation_name, parameters, parsed_globals):
client = self._session.create_client(
service_name, region_name=parsed_globals.region,
endpoint_url=parsed_globals.endpoint_url,
verify=parsed_globals.verify_ssl)
py_operation_name = xform_name(operation_name)
if client.can_paginate(py_operation_name) and parsed_globals.paginate:
paginator = client.get_paginator(py_operation_name)
response = paginator.paginate(**parameters)
else:
response = getattr(client, xform_name(operation_name))(
**parameters)
if is_response_paginated(response):
response_data = response.build_full_result()
else:
response_data = response
return response_data
class MyServiceCommand(ServiceCommand):
def _create_command_table(self):
command_table = OrderedDict()
service_model = super(MyServiceCommand, self)._get_service_model()
for operation_name in service_model.operation_names:
cli_name = xform_name(operation_name, '-')
operation_model = service_model.operation_model(operation_name)
command_table[cli_name] = ServiceOperation(
name=cli_name,
parent_name=self._name,
session=self.session,
operation_model=operation_model,
operation_caller=MyCLIOperationCaller(self.session),
)
self.session.emit('building-command-table.%s' % self._name,
command_table=command_table,
session=self.session,
command_object=self)
self._add_lineage(command_table)
return command_table
class MyCLIDriver(CLIDriver):
def _build_builtin_commands(self, session):
commands = OrderedDict()
services = session.get_available_services()
for service_name in services:
commands[service_name] = MyServiceCommand(cli_name=service_name,
session=self.session,
service_name=service_name)
return commands
def create_my_clidriver():
emitter = HierarchicalEmitter()
session = botocore.session.Session(EnvironmentVariables, emitter)
awscli.clidriver._set_user_agent_for_session(session)
load_plugins(session.full_config.get('plugins', {}),
event_hooks=emitter)
driver = MyCLIDriver(session=session)
return driver
if __name__ == '__main__':
driver = create_my_clidriver()
def describe_instances():
return driver.main(['ec2', 'describe-instances'])
result = describe_instances()
import sys
import json
json.dump(result, sys.__stdout__)
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment