Created
July 22, 2015 09:25
-
-
Save komeda-shinji/331e86877425ba61707e to your computer and use it in GitHub Desktop.
aws cli のドライバーを直接呼び出す
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from botocore.compat import OrderedDict | |
from botocore import xform_name | |
from botocore.hooks import HierarchicalEmitter | |
from botocore.paginate import PageIterator | |
import botocore.session | |
import awscli.clidriver | |
from awscli import EnvironmentVariables | |
from awscli.plugin import load_plugins | |
from awscli.clidriver import CLIDriver, ServiceCommand, ServiceOperation, CLIOperationCaller | |
from awscli.formatter import is_response_paginated | |
class MyCLIOperationCaller(CLIOperationCaller): | |
def invoke(self, service_name, operation_name, parameters, parsed_globals): | |
client = self._session.create_client( | |
service_name, region_name=parsed_globals.region, | |
endpoint_url=parsed_globals.endpoint_url, | |
verify=parsed_globals.verify_ssl) | |
py_operation_name = xform_name(operation_name) | |
if client.can_paginate(py_operation_name) and parsed_globals.paginate: | |
paginator = client.get_paginator(py_operation_name) | |
response = paginator.paginate(**parameters) | |
else: | |
response = getattr(client, xform_name(operation_name))( | |
**parameters) | |
if is_response_paginated(response): | |
response_data = response.build_full_result() | |
else: | |
response_data = response | |
return response_data | |
class MyServiceCommand(ServiceCommand): | |
def _create_command_table(self): | |
command_table = OrderedDict() | |
service_model = super(MyServiceCommand, self)._get_service_model() | |
for operation_name in service_model.operation_names: | |
cli_name = xform_name(operation_name, '-') | |
operation_model = service_model.operation_model(operation_name) | |
command_table[cli_name] = ServiceOperation( | |
name=cli_name, | |
parent_name=self._name, | |
session=self.session, | |
operation_model=operation_model, | |
operation_caller=MyCLIOperationCaller(self.session), | |
) | |
self.session.emit('building-command-table.%s' % self._name, | |
command_table=command_table, | |
session=self.session, | |
command_object=self) | |
self._add_lineage(command_table) | |
return command_table | |
class MyCLIDriver(CLIDriver): | |
def _build_builtin_commands(self, session): | |
commands = OrderedDict() | |
services = session.get_available_services() | |
for service_name in services: | |
commands[service_name] = MyServiceCommand(cli_name=service_name, | |
session=self.session, | |
service_name=service_name) | |
return commands | |
def create_my_clidriver(): | |
emitter = HierarchicalEmitter() | |
session = botocore.session.Session(EnvironmentVariables, emitter) | |
awscli.clidriver._set_user_agent_for_session(session) | |
load_plugins(session.full_config.get('plugins', {}), | |
event_hooks=emitter) | |
driver = MyCLIDriver(session=session) | |
return driver | |
if __name__ == '__main__': | |
driver = create_my_clidriver() | |
def describe_instances(): | |
return driver.main(['ec2', 'describe-instances']) | |
result = describe_instances() | |
import sys | |
import json | |
json.dump(result, sys.__stdout__) | |
sys.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment