Last active
August 8, 2018 09:05
-
-
Save robinsax/e8334b8332383eefb219623704a8d97b to your computer and use it in GitHub Desktop.
Lightweight, unoptimized wrap of googleapiclient basic instance control
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# A small script that wraps the basic functionality of the | |
# (in my option somewhat confusing) googleapiclient features | |
# for instance control. | |
import time | |
from googleapiclient import discovery | |
cloud_config = dict( | |
project='My-Project', | |
zone='My-Zone' | |
) | |
class GCloudInstance: | |
def __init__(self, name): | |
self.name = name | |
self.compute = discovery.build('compute', 'v1') | |
self._fetch_data() | |
def _fetch_data(self): | |
self.instance_data = None | |
all_instances = self.compute.instances().list(**cloud_config).execute()['items'] | |
for instance_data in all_instances: | |
if instance_data['name'] == self.name: | |
self.instance_data = instance_data | |
break | |
if not instance_data: | |
raise ValueError('No instance %s'%name) | |
@property | |
def status_label(self): | |
return self.instance_data['status'] | |
@property | |
def is_available(self): | |
return self.status_label == 'TERMINATED' | |
@property | |
def is_running(self): | |
return self.status_label == 'RUNNING' | |
@property | |
def is_transitional(self): | |
return not (self.is_available or self.is_running) | |
@property | |
def ip_address(self): | |
access_conf = self.instance_data['accessConfigs'][0] | |
if 'natIP' not in access_conf: | |
return None | |
return access_conf['natIP'] | |
def _instances_call(self, op_name): | |
operation = getattr(self.compute.instances(), op_name)(**cloud_config, instance=self.name).execute() | |
while True: | |
status = self.compute.zoneOperations().get(**cloud_config, operation=operation['name']).execute() | |
if status['status'] == 'DONE': | |
break | |
self._fetch_data() | |
time.sleep(3) | |
def start(self): | |
self._instances_call('start') | |
def stop(self): | |
self._instances_call('stop') | |
# Example usage: | |
import sys | |
if __name__ == '__main__': | |
my_instance = GCloudInstance('my-instance') | |
if not my_instance.is_available: | |
print('The instance is already running') | |
sys.exit(1) | |
my_instance.start() | |
print('Instance available at %s'%my_instance.ip_address) | |
my_instance.stop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment