Created
May 5, 2024 09:51
-
-
Save max-arnold/d059a74bf48fb9be9741f60ea79c5208 to your computer and use it in GitHub Desktop.
YMQ TUI
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
1. Install the following dependencies into a virtualenv | |
pip install textual==0.57.0 textual-dev==1.5.1 requests==2.31.0 requests-aws4auth==1.2.3 yandexcloud==0.267.0 | |
2. Create a env.json file | |
{ | |
"YC_REGION": "ru-central1", | |
"YC_FOLDER_ID": "FFF", | |
"YC_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS": "yc-service-account-key.json", | |
"YMQ_ACCESS_KEY": "XXX", | |
"YMQ_SECRET_KEY": "YYY" | |
} | |
3. Run | |
python ymq_tui.py --env env.json | |
""" | |
import argparse | |
import json | |
import os | |
from xml.etree import ElementTree | |
from textual import on, work | |
from textual.app import App | |
from textual.app import ComposeResult | |
from textual.containers import Grid, Container | |
from textual.message import Message | |
from textual.screen import ModalScreen | |
from textual.widgets import Button, DataTable, Footer, Label | |
import requests | |
from requests_aws4auth import AWS4Auth | |
import yandexcloud | |
from yandex.cloud.serverless.triggers.v1.trigger_pb2 import _TRIGGER_STATUS | |
from yandex.cloud.serverless.triggers.v1.trigger_service_pb2_grpc import TriggerServiceStub | |
from yandex.cloud.serverless.triggers.v1.trigger_service_pb2 import ListTriggersRequest, PauseTriggerRequest, ResumeTriggerRequest | |
YMQ_ENDPOINT_URL = "https://message-queue.api.cloud.yandex.net" | |
session = None | |
def get_session(): | |
global session | |
if session is not None: | |
return session | |
session = requests.Session() | |
return session | |
def export_variables(env): | |
with open(env) as json_file: | |
env_vars = json.loads(json_file.read()) | |
for env_name, env_value in env_vars.items(): | |
os.environ[str(env_name)] = str(env_value) | |
def check_response(res): | |
if res.status_code != 200: | |
tree = ElementTree.fromstring(res.content.decode("utf-8")) | |
msg = tree.find("Error").find("Message").text | |
raise requests.HTTPError(f"{res.status_code}: {msg}", response=res) | |
def queue_list(): | |
aws_auth = AWS4Auth( | |
os.environ["YMQ_ACCESS_KEY"], os.environ["YMQ_SECRET_KEY"], os.environ["YC_REGION"], "sqs" | |
) | |
data = { | |
"Action": "ListQueues", | |
} | |
res = get_session().post(YMQ_ENDPOINT_URL, data=data, auth=aws_auth, timeout=5) | |
check_response(res) | |
tree = ElementTree.fromstring(res.content.decode("utf-8")) | |
return [q.text for q in tree.find("ListQueuesResult").findall("QueueUrl")] | |
def queue_purge(queue): | |
aws_auth = AWS4Auth( | |
os.environ["YMQ_ACCESS_KEY"], os.environ["YMQ_SECRET_KEY"], os.environ["YC_REGION"], "sqs" | |
) | |
data = { | |
"Action": "PurgeQueue", | |
"QueueUrl": queue, | |
} | |
res = get_session().post(YMQ_ENDPOINT_URL, data=data, auth=aws_auth, timeout=5) | |
check_response(res) | |
def queue_get_attributes(queue): | |
aws_auth = AWS4Auth( | |
os.environ["YMQ_ACCESS_KEY"], os.environ["YMQ_SECRET_KEY"], os.environ["YC_REGION"], "sqs" | |
) | |
data = { | |
"Action": "GetQueueAttributes", | |
"AttributeName.1": "ApproximateNumberOfMessages", | |
"AttributeName.2": "ApproximateNumberOfMessagesDelayed", | |
"AttributeName.3": "ApproximateNumberOfMessagesNotVisible", | |
"QueueUrl": queue, | |
} | |
res = get_session().post(YMQ_ENDPOINT_URL, data=data, auth=aws_auth, timeout=5) | |
check_response(res) | |
tree = ElementTree.fromstring(res.content.decode("utf-8")) | |
attrlist = tree.find("GetQueueAttributesResult").findall("Attribute") | |
return {attr.find("Name").text: attr.find("Value").text for attr in attrlist} | |
class ConfirmModal(ModalScreen): | |
BINDINGS = [ | |
("escape", "app.pop_screen", "Cancel"), | |
] | |
DEFAULT_CSS = """ | |
ConfirmModal { | |
align: center middle; | |
} | |
#dialog { | |
grid-size: 2; | |
grid-gutter: 1 2; | |
grid-rows: 1fr 3; | |
padding: 0 1; | |
width: 60; | |
height: 11; | |
border: thick $background 80%; | |
background: $surface; | |
} | |
#question { | |
column-span: 2; | |
height: 1fr; | |
width: 1fr; | |
content-align: center middle; | |
} | |
Button { | |
width: 100%; | |
} | |
Container { | |
align: center middle; | |
} | |
""" | |
def __init__(self, question, name=None, id=None, classes=None): | |
self.question = question | |
super().__init__(name, id, classes) | |
def compose(self): | |
yield Grid( | |
Label(self.question, id="question"), | |
Container(Button("Cancel", id="cancel")), | |
Container(Button("Confirm", variant="primary", id="confirm")), | |
id="dialog", | |
) | |
def on_button_pressed(self, event): | |
if event.button.id == "confirm": | |
self.dismiss(True) | |
else: | |
self.dismiss(False) | |
class QueueManagerApp(App): | |
BINDINGS = [ | |
("escape", "quit()", "Quit"), | |
("ctrl+r", "refresh", "Refresh"), | |
("e", "enable_trigger", "Enable"), | |
("d", "disable_trigger", "Disable"), | |
("p", "purge_queue", "Purge"), | |
("f12", "take_screenshot()", "Screenshot"), | |
] | |
def __init__(self): | |
super().__init__() | |
parser = argparse.ArgumentParser(description="QueueManager") | |
parser.add_argument("--env", required=True, help="Select env") | |
options = parser.parse_args() | |
export_variables(options.env) | |
with open(os.environ["YC_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"]) as json_file: | |
sa_key = json.loads(json_file.read()) | |
self.yc_sdk = yandexcloud.SDK(service_account_key=sa_key) | |
self.yc_triggers = self.yc_sdk.client(TriggerServiceStub) | |
self.queues = {} | |
def compose(self): | |
yield DataTable(cursor_type="row") | |
yield Footer() | |
def on_mount(self): | |
table = self.query_one(DataTable) | |
table.add_columns("Name", "Type", "Messages", "Delayed", "Invisible", "Trigger") | |
self.action_refresh() | |
def action_refresh(self): | |
table = self.query_one(DataTable) | |
# table.loading = True | |
self.get_queues() | |
class ReturnData(Message): | |
def __init__(self, data): | |
self.data = data | |
super().__init__() | |
@work(exclusive=True, thread=True) | |
def get_queues(self): | |
tlist = self.yc_triggers.List(ListTriggersRequest(folder_id=os.environ["YC_FOLDER_ID"])) | |
triggers = {} | |
for trigger in tlist.triggers: | |
qname = trigger.rule.message_queue.queue_id.rsplit(":", 1)[-1] | |
triggers[qname] = { | |
"id": trigger.id, | |
"status": _TRIGGER_STATUS.values_by_number[trigger.status].name | |
} | |
qlist = queue_list() | |
queues = {} | |
for queue in qlist: | |
name = queue.rsplit("/", 1)[-1] | |
attrs = queue_get_attributes(queue) | |
queues[queue] = { | |
"name": self.QMAP.get(name, name), | |
"type": "FIFO" if name.endswith(".fifo") else "Standard", | |
"attrs": attrs, | |
"trigger": triggers.get(name, {"id": None, "status": "N/A"}), | |
} | |
self.post_message(self.ReturnData(queues)) | |
QMAP = { | |
} | |
@on(ReturnData) | |
async def update_table(self, return_data): | |
table = self.query_one(DataTable) | |
coordinate = table.cursor_coordinate | |
table.clear() | |
self.queues = return_data.data | |
for qk, qv in sorted(return_data.data.items(), key=lambda v: v[1]['name']): | |
table.add_row( | |
qv["name"], | |
qv["type"], | |
qv["attrs"]["ApproximateNumberOfMessages"], | |
qv["attrs"]["ApproximateNumberOfMessagesDelayed"], | |
qv["attrs"]["ApproximateNumberOfMessagesNotVisible"], | |
f'[red]{qv["trigger"]["status"]}[/red]' | |
if qv["trigger"]["status"] == "PAUSED" | |
else f'[green]{qv["trigger"]["status"]}[/green]' | |
if qv["trigger"]["status"] == "ACTIVE" | |
else qv["trigger"]["status"], | |
key=qk | |
) | |
table.cursor_coordinate = coordinate | |
table.loading = False | |
table.focus() | |
def action_purge_queue(self): | |
table = self.query_one(DataTable) | |
if not table.is_valid_coordinate(table.cursor_coordinate): | |
return | |
cell_key = table.coordinate_to_cell_key(table.cursor_coordinate) | |
row_key = cell_key.row_key.value | |
if row_key is None: | |
return | |
def check_confirm(confirm): | |
if confirm: | |
queue_purge(row_key) | |
self.notify(self.queues[row_key]["name"], title="Purged") | |
self.action_refresh() | |
self.app.push_screen( | |
ConfirmModal(f'Purge queue {self.queues[row_key]["name"]}?'), | |
check_confirm, | |
) | |
def action_enable_trigger(self): | |
table = self.query_one(DataTable) | |
if not table.is_valid_coordinate(table.cursor_coordinate): | |
return | |
cell_key = table.coordinate_to_cell_key(table.cursor_coordinate) | |
row_key = cell_key.row_key.value | |
if row_key is None: | |
return | |
if self.queues[row_key]["trigger"]["id"] is None: | |
return | |
self.yc_triggers.Resume(ResumeTriggerRequest(trigger_id=self.queues[row_key]["trigger"]["id"])) | |
self.notify(self.queues[row_key]["name"], title="Enabled") | |
self.action_refresh() | |
def action_disable_trigger(self): | |
table = self.query_one(DataTable) | |
if not table.is_valid_coordinate(table.cursor_coordinate): | |
return | |
cell_key = table.coordinate_to_cell_key(table.cursor_coordinate) | |
row_key = cell_key.row_key.value | |
if row_key is None: | |
return | |
if self.queues[row_key]["trigger"]["id"] is None: | |
return | |
self.yc_triggers.Pause(PauseTriggerRequest(trigger_id=self.queues[row_key]["trigger"]["id"])) | |
self.notify(self.queues[row_key]["name"], title="Disabled") | |
self.action_refresh() | |
def action_take_screenshot(self): | |
filename = self.save_screenshot() | |
self.notify(f"Saved in {filename}", title="Screenshot saved") | |
app = QueueManagerApp() | |
if __name__ == "__main__": | |
app.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment