Last active
December 4, 2024 07:44
-
-
Save simeneide/a24179c5ec75848fc927f44f7c80f00e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from datacrunch import DataCrunchClient | |
import time | |
import re | |
import toml | |
import argparse | |
import inquirer | |
# Load secrets and configurations | |
CONFIG_PATH = '/Users/[email protected]/configs/' | |
secrets = toml.load(os.path.join(CONFIG_PATH, 'secrets.toml')) | |
CLIENT_SECRET = secrets['datacrunch']['CLIENT_SECRET'] | |
CLIENT_ID = secrets['datacrunch']['CLIENT_ID'] | |
SSH_CONFIG_FILE = '/Users/[email protected]/.ssh/config' | |
KNOWN_HOSTS_FILE = '/Users/[email protected]/.ssh/known_hosts' | |
preferred_instance_types = ['1A100.22V', '2A100.44V', '1V100.6V', '1H100.80S.30V', '8A100.176V','8V100.48V'] | |
MACHINES = { | |
'le-simendisk-fin1': { | |
'disk_id': '7745835c-d6aa-4d85-ae36-70c41794eeaa', | |
'host_name': 'datacrunch-fin1', | |
'workspace_file': os.path.join(CONFIG_PATH, 'vscode-datacrunch-fin1.code-workspace'), | |
}, | |
'le-simendisk-fin2': { | |
'disk_id': '1cba546c-15b6-4167-827e-5facc03a283c', | |
'host_name': 'datacrunch-fin2', | |
'workspace_file': os.path.join(CONFIG_PATH, 'vscode-datacrunch-fin2.code-workspace'), | |
}, | |
'simen-ice': { | |
'disk_id': '9ab876ed-410e-439d-921c-fdc5ba985f4f', | |
'host_name': 'datacrunch-ice', | |
'workspace_file': os.path.join(CONFIG_PATH, 'vscode-datacrunch-ice-workspace.code-workspace'), | |
}, | |
'le-disk': { | |
'disk_id': '879ff633-b377-496e-a6d5-2de681beee85', | |
'host_name': 'datacrunch-le-disk', | |
'workspace_file': os.path.join(CONFIG_PATH, 'vscode-datacrunch-ledisk.code-workspace'), | |
} | |
} | |
def find_instance_by_disk(datacrunch, volume_id): | |
instances = datacrunch.instances.get() | |
for instance in instances: | |
if instance.os_volume_id == volume_id: | |
return instance | |
return None | |
def update_ssh_config_to_new_ip(instance, host_name): | |
new_ip = instance.ip | |
# Read the file | |
with open(SSH_CONFIG_FILE, 'r') as file: | |
lines = file.readlines() | |
# Find the host and replace the IP | |
for i, line in enumerate(lines): | |
if host_name in line: | |
for j in range(i, len(lines)): | |
if 'HostName' in lines[j]: | |
lines[j] = re.sub(r'HostName \S+', f'HostName {new_ip}', lines[j]) | |
break | |
break | |
# Write the changes back to the file | |
with open(SSH_CONFIG_FILE, 'w') as file: | |
file.writelines(lines) | |
# Remove old host key from known_hosts | |
os.system(f"ssh-keygen -R {new_ip}") | |
# Add new host key to known_hosts | |
os.system(f"ssh-keyscan -H {new_ip} >> {KNOWN_HOSTS_FILE}") | |
return True | |
if __name__ == '__main__': | |
# Set up argument parsing | |
parser = argparse.ArgumentParser() | |
parser.add_argument('command', choices=['start', 'stop'], default='start', nargs='?') | |
parser.add_argument("--type", help="Instance type") | |
parser.add_argument('--disk', choices=list(MACHINES.keys()), help="Specifies which disk to use") | |
parser.add_argument("--spot", help="Whether to use spot instance or not") | |
args = parser.parse_args() | |
print(f'Running command: {args.command}') | |
datacrunch = DataCrunchClient(CLIENT_ID, CLIENT_SECRET) | |
if (not args.disk) and (len(MACHINES) > 1): | |
# If no disk is specified, use inquirer to prompt the user | |
questions = [ | |
inquirer.List( | |
'disk', | |
message="Select a disk to use:", | |
choices=list(MACHINES.keys()), | |
), | |
] | |
answers = inquirer.prompt(questions) | |
selected_disk_key = answers['disk'] | |
elif len(MACHINES) == 1: | |
selected_disk_key = list(MACHINES.keys())[0] | |
else: | |
selected_disk_key = args.disk | |
selected_machine = MACHINES[selected_disk_key] | |
selected_disk = selected_machine['disk_id'] | |
host_name = selected_machine['host_name'] | |
workspace_file = selected_machine['workspace_file'] | |
disk_location = datacrunch.volumes.get_by_id(selected_disk).location | |
print(f'Selected disk: {selected_disk_key}, {disk_location}') | |
if args.command == 'stop': | |
print("Stopping instance..") | |
instance = find_instance_by_disk(datacrunch, selected_disk) | |
if instance is not None: | |
datacrunch.instances.action(id_list=instance.id, | |
action=datacrunch.constants.instance_actions.DELETE, | |
volume_ids=[]) | |
else: | |
Warning(f'No instance with {args.disk} found') | |
print(f"Instance stopped. Disk: {selected_disk_key}") | |
elif args.command == 'start': | |
instance = find_instance_by_disk(datacrunch, selected_disk) | |
# If disk is already running, update the ssh config: | |
if instance is not None: | |
print(f"An instance with {args.disk} is already running.") | |
print(f"Instance ID: {instance.id}, IP: {instance.ip}, Status: {instance.status}") | |
update_ssh_config_to_new_ip(instance, host_name) | |
print("SSH config updated!") | |
print(f"Done! You already have a {instance.instance_type} instance with {selected_disk_key} running. spot: {instance.is_spot}") | |
workspace_answer = {'open_workspace': True} | |
# If disk is not running, start a new instance: | |
else: | |
### SPOT INSTANCE | |
if args.spot is None: | |
questions = [ | |
inquirer.List( | |
'spot', | |
message="Do you want to use a spot instance?", | |
choices=['no', 'yes'], | |
), | |
] | |
spot_answer = inquirer.prompt(questions) | |
args.spot = (spot_answer['spot'] == 'yes') | |
### SELECT INSTANCE TYPE | |
if args.type is None: | |
# Find all available instances in the same location as the disk | |
available_instances = [L for L in datacrunch.instances.get_availabilities(is_spot=args.spot) if L['location_code'] == disk_location][0]['availabilities'] | |
preferred_instances = [instance_type for instance_type in available_instances if instance_type in preferred_instance_types] | |
other_instances = [instance_type for instance_type in available_instances if instance_type not in preferred_instance_types] | |
instance_descriptions = {instance.instance_type: instance.gpu['description'] for instance in datacrunch.instance_types.get()} | |
# Create a mapping from descriptions to instance types | |
choices_with_descriptions = [(instance_descriptions[instance_type], instance_type) for instance_type in preferred_instances] | |
# Add a separator between preferred and other instance types | |
choices_with_descriptions.append(('---', None)) | |
choices_with_descriptions += [(instance_descriptions[instance_type], instance_type) for instance_type in other_instances] | |
questions = [ | |
inquirer.List( | |
'instance_desc', | |
message="Select an available instance type:", | |
choices=[desc for desc, _ in choices_with_descriptions], | |
), | |
] | |
answers = inquirer.prompt(questions) | |
print(answers['instance_desc']) | |
if answers['instance_desc'] == '---': | |
print("You can't choose the separator as instance type. The script will fail now...") | |
# Reverse lookup to find the instance_type based on selected description | |
instance_type = next(instance_type for desc, instance_type in choices_with_descriptions if desc == answers['instance_desc']) | |
else: | |
instance_type = args.type | |
print("Starting a new instance...") | |
instance = datacrunch.instances.create( | |
instance_type=instance_type, | |
image=selected_disk, | |
hostname=f'{args.disk}-instance', | |
is_spot=bool(args.spot), | |
description=f'GPU machine with {args.disk}' | |
) | |
### OPEN WORKSPACE CHOICE | |
questions = [ | |
inquirer.Confirm( | |
'open_workspace', | |
message='Do you want to open the VSCode working environment when it has spun up?', | |
default=True, | |
), | |
] | |
workspace_answer = inquirer.prompt(questions) | |
while True: | |
instance = find_instance_by_disk(datacrunch, selected_disk) | |
if instance and instance.status != 'provisioning': | |
break | |
time.sleep(5) | |
print(f"Status: {instance.status if instance else 'Unknown'} - Waiting for provisioning...") | |
print(f"Instance created with IP: {instance.ip}!") | |
update_ssh_config_to_new_ip(instance, host_name) | |
print("SSH config updated!") | |
print(f"Done! You have a {instance_type} instance with {args.disk} running. Spot status: {instance.is_spot}") | |
if workspace_answer['open_workspace']: | |
os.system(f"code {workspace_file}") | |
else: | |
print("Invalid command. Use either 'start' or 'stop'.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment