Skip to content

Instantly share code, notes, and snippets.

@valegui
Last active August 9, 2024 14:47
Show Gist options
  • Save valegui/959c52dbc2cdce518dd00190448ff4ae to your computer and use it in GitHub Desktop.
Save valegui/959c52dbc2cdce518dd00190448ff4ae to your computer and use it in GitHub Desktop.
Script to test CLIRAI api
from pathlib import Path
from random import choice
from string import ascii_letters, digits
from typing import Dict
import click
import requests
import yaml
from boto3 import Session
LETTERSDIGITS = ascii_letters + digits
DATASETS = {
"small": "input/small/HD100546_SB17_cont_OOframe_large_mask_15aug22ms_p0_p1_p2_ap0.ms.selfcal",
"medium": "input/medium/HD100546_SB17_IB17_cont_OOframe_p0_p1_p2.ms.selfcal.selfcal",
"large": "input/large/HD100546_SB17_IB17_LB19_cont_OOframe_p0_p1_p2.ms.selfcal",
}
@click.group()
def clirai():
pass
def print_response(response):
print("Response status:", response.status_code)
print("Response text:", response.text)
return
def parse_config_file(config: str) -> Dict:
filepath = Path(config)
click.echo(f"Using {config} as configuration file.")
if not filepath.exists():
click.echo("Configuration file does not exists.")
click.echo("Starting file creation.")
connection_info = dict()
connection_info["user"] = click.prompt("Username", type=str)
connection_info["pw"] = click.prompt(
"Password", type=str, hide_input=True, confirmation_prompt=True
)
connection_info["client_id"] = click.prompt("Cognito App Client ID", type=str)
connection_info["api_url"] = click.prompt("API url", type=str)
connection_info["profile_name"] = click.prompt(
"Session profile name (in ~/.aws/config)", type=str
)
with open(filepath, mode="w") as yaml_file:
yaml.dump(connection_info, yaml_file)
click.echo("Wrote config file.")
else:
with open(filepath, mode="r") as yaml_file:
click.echo("Reading config file.")
connection_info = yaml.safe_load(yaml_file)
return connection_info
def get_headers(connection_info: Dict) -> Dict:
session = Session(profile_name=connection_info["profile_name"])
client = session.client("cognito-idp")
cognito_authentication = client.initiate_auth(
ClientId=connection_info["client_id"],
AuthFlow="USER_PASSWORD_AUTH",
AuthParameters={
"USERNAME": connection_info["user"],
"PASSWORD": connection_info["pw"],
},
)
cognito_token = cognito_authentication["AuthenticationResult"]["IdToken"]
headers = {"Authorization": cognito_token, "Content-Type": "application/json"}
return headers
"""
sftp
"""
@clirai.command()
@click.option(
"--config", default=".config.yml", help="file with configuration to connect to api"
)
def sftp(**kwargs):
connection_info = parse_config_file(kwargs["config"])
headers = get_headers(connection_info)
r = requests.post(connection_info["api_url"] + "sftp", headers=headers)
print_response(r)
"""
contact
"""
@clirai.group()
@click.option(
"--config", default=".config.yml", help="file with configuration to connect to api"
)
@click.pass_context
def contact(ctx, **kwargs):
connection_info = parse_config_file(kwargs["config"])
ctx.ensure_object(dict)
ctx.obj["CONN_INFO"] = connection_info
@contact.command("unsigned")
@click.argument("name")
@click.argument("email")
@click.argument("subject")
@click.argument("message")
@click.pass_context
def _contact(ctx, name, email, subject, message):
headers = {"Content-Type": "application/json"}
connection_info = ctx.obj["CONN_INFO"]
payload = {
"name": name,
"email": email,
"subject": subject,
"message": message,
}
r = requests.post(
connection_info["api_url"] + "contact/assist", headers=headers, json=payload
)
print_response(r)
@contact.command()
@click.argument("name")
@click.argument("email")
@click.argument("subject")
@click.argument("message")
@click.pass_context
def assist(ctx, name, email, subject, message):
click.echo("Assist")
connection_info = ctx.obj["CONN_INFO"]
headers = get_headers(connection_info)
payload = {
"name": name,
"email": email,
"subject": subject,
"message": message,
}
r = requests.post(
connection_info["api_url"] + "contact/assist", headers=headers, json=payload
)
print_response(r)
"""
job
"""
def random_string(n):
my_list = [choice(LETTERSDIGITS) for _ in range(n)]
my_str = "".join(my_list)
return my_str
@clirai.group()
@click.option(
"--config", default=".config.yml", help="file with configuration to connect to api"
)
@click.pass_context
def job(ctx, **kwargs):
connection_info = parse_config_file(kwargs["config"])
headers = get_headers(connection_info)
ctx.ensure_object(dict)
ctx.obj["CONN_INFO"] = connection_info
ctx.obj["HEADERS"] = headers
@job.command()
@click.option(
"-e",
"--environments",
default=[0, 1, 2],
type=list,
help="index of environments to tests",
)
@click.option(
"-d", "--datasets", default=[0, 1, 2], type=list, help="index of datasets to tests"
)
@click.option(
"-g", "--gridding", default=[0, 1], type=list, help="index of griddings to tests"
)
@click.option(
"-s",
"--sizes",
default=[3, 4, 5, 6],
type=list,
help="index of image sizes to tests",
)
@click.pass_context
def run(ctx, **kwargs):
connection_info = ctx.obj["CONN_INFO"]
headers = ctx.obj["HEADERS"]
environments = ["low", "medium", "high"]
datasets = ["small", "medium", "large"]
gridding = [0, 1]
sizes = [128, 256, 512, 1024, 2048, 4096, 8192]
_payload = {"v": "1", "P": "1"}
jobs_launched = 0
for idx_environment in kwargs["environments"]:
_payload["environment"] = environments[int(idx_environment)]
for idx_dataset in kwargs["datasets"]:
dataset_base = DATASETS[datasets[int(idx_dataset)]]
_payload["i"] = dataset_base
for idx_image_size in kwargs["sizes"]:
_payload["m"] = (
dataset_base.replace(".", "_")
+ f"__{sizes[int(idx_image_size)]}.fits"
)
for idx_gridding in kwargs["gridding"]:
_payload["gridding"] = gridding[int(idx_gridding)]
_payload["job_alias"] = (
f"env-{_payload['environment'].capitalize()}_"
f"data-{datasets[int(idx_dataset)].capitalize()}_"
f"img-{sizes[int(idx_image_size)]}_"
f"{'gridding_' if gridding[int(idx_gridding)] else ''}{random_string(3)}"
)
r = requests.post(
connection_info["api_url"] + "job/run",
headers=headers,
json=_payload,
)
print_response(r)
jobs_launched += 1
click.echo(f"{jobs_launched} Jobs Launched.")
@job.command()
@click.option("-u", "--user-id", "user", type=str, help="user id")
@click.option(
"-s",
"--starting-time",
"stime",
type=int,
help="unix time from which recovered jobs start",
)
@click.option(
"-e",
"--end-time",
"etime",
type=int,
help="unix time until which recovered jobs start",
)
@click.option("-l", "--limit", "limit", type=int, help="number of jobs to recover")
@click.option(
"-n",
"--next-token",
"nexttoken",
type=str,
help="token used for pagination",
)
@click.pass_context
def list_user(ctx, **kwargs):
connection_info = ctx.obj["CONN_INFO"]
headers = ctx.obj["HEADERS"]
payload = {}
if kwargs["user"] is not None:
payload["user_id"] = kwargs["user"]
if kwargs["stime"] is not None:
payload["starting_time"] = kwargs["stime"]
if kwargs["etime"] is not None:
payload["end_time"] = kwargs["etime"]
if kwargs["limit"] is not None:
payload["limit"] = kwargs["limit"]
if kwargs["nexttoken"] is not None:
payload["nextToken"] = kwargs["nexttoken"]
r = requests.get(
connection_info["api_url"] + "job/list_user", headers=headers, params=payload
)
print_response(r)
if __name__ == "__main__":
clirai()
boto3==1.34.23
click==8.1.7
PyYAML==6.0.1
requests==2.31.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment