Skip to content

Instantly share code, notes, and snippets.

@max-arnold
Created May 5, 2024 09:51
Show Gist options
  • Save max-arnold/d059a74bf48fb9be9741f60ea79c5208 to your computer and use it in GitHub Desktop.
Save max-arnold/d059a74bf48fb9be9741f60ea79c5208 to your computer and use it in GitHub Desktop.
YMQ TUI
"""
1. Install the following dependencies into a virtualenv
pip install textual==0.57.0 textual-dev==1.5.1 requests==2.31.0 requests-aws4auth==1.2.3 yandexcloud==0.267.0
2. Create a env.json file
{
"YC_REGION": "ru-central1",
"YC_FOLDER_ID": "FFF",
"YC_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS": "yc-service-account-key.json",
"YMQ_ACCESS_KEY": "XXX",
"YMQ_SECRET_KEY": "YYY"
}
3. Run
python ymq_tui.py --env env.json
"""
import argparse
import json
import os
from xml.etree import ElementTree
from textual import on, work
from textual.app import App
from textual.app import ComposeResult
from textual.containers import Grid, Container
from textual.message import Message
from textual.screen import ModalScreen
from textual.widgets import Button, DataTable, Footer, Label
import requests
from requests_aws4auth import AWS4Auth
import yandexcloud
from yandex.cloud.serverless.triggers.v1.trigger_pb2 import _TRIGGER_STATUS
from yandex.cloud.serverless.triggers.v1.trigger_service_pb2_grpc import TriggerServiceStub
from yandex.cloud.serverless.triggers.v1.trigger_service_pb2 import ListTriggersRequest, PauseTriggerRequest, ResumeTriggerRequest
YMQ_ENDPOINT_URL = "https://message-queue.api.cloud.yandex.net"
session = None
def get_session():
global session
if session is not None:
return session
session = requests.Session()
return session
def export_variables(env):
with open(env) as json_file:
env_vars = json.loads(json_file.read())
for env_name, env_value in env_vars.items():
os.environ[str(env_name)] = str(env_value)
def check_response(res):
if res.status_code != 200:
tree = ElementTree.fromstring(res.content.decode("utf-8"))
msg = tree.find("Error").find("Message").text
raise requests.HTTPError(f"{res.status_code}: {msg}", response=res)
def queue_list():
aws_auth = AWS4Auth(
os.environ["YMQ_ACCESS_KEY"], os.environ["YMQ_SECRET_KEY"], os.environ["YC_REGION"], "sqs"
)
data = {
"Action": "ListQueues",
}
res = get_session().post(YMQ_ENDPOINT_URL, data=data, auth=aws_auth, timeout=5)
check_response(res)
tree = ElementTree.fromstring(res.content.decode("utf-8"))
return [q.text for q in tree.find("ListQueuesResult").findall("QueueUrl")]
def queue_purge(queue):
aws_auth = AWS4Auth(
os.environ["YMQ_ACCESS_KEY"], os.environ["YMQ_SECRET_KEY"], os.environ["YC_REGION"], "sqs"
)
data = {
"Action": "PurgeQueue",
"QueueUrl": queue,
}
res = get_session().post(YMQ_ENDPOINT_URL, data=data, auth=aws_auth, timeout=5)
check_response(res)
def queue_get_attributes(queue):
aws_auth = AWS4Auth(
os.environ["YMQ_ACCESS_KEY"], os.environ["YMQ_SECRET_KEY"], os.environ["YC_REGION"], "sqs"
)
data = {
"Action": "GetQueueAttributes",
"AttributeName.1": "ApproximateNumberOfMessages",
"AttributeName.2": "ApproximateNumberOfMessagesDelayed",
"AttributeName.3": "ApproximateNumberOfMessagesNotVisible",
"QueueUrl": queue,
}
res = get_session().post(YMQ_ENDPOINT_URL, data=data, auth=aws_auth, timeout=5)
check_response(res)
tree = ElementTree.fromstring(res.content.decode("utf-8"))
attrlist = tree.find("GetQueueAttributesResult").findall("Attribute")
return {attr.find("Name").text: attr.find("Value").text for attr in attrlist}
class ConfirmModal(ModalScreen):
BINDINGS = [
("escape", "app.pop_screen", "Cancel"),
]
DEFAULT_CSS = """
ConfirmModal {
align: center middle;
}
#dialog {
grid-size: 2;
grid-gutter: 1 2;
grid-rows: 1fr 3;
padding: 0 1;
width: 60;
height: 11;
border: thick $background 80%;
background: $surface;
}
#question {
column-span: 2;
height: 1fr;
width: 1fr;
content-align: center middle;
}
Button {
width: 100%;
}
Container {
align: center middle;
}
"""
def __init__(self, question, name=None, id=None, classes=None):
self.question = question
super().__init__(name, id, classes)
def compose(self):
yield Grid(
Label(self.question, id="question"),
Container(Button("Cancel", id="cancel")),
Container(Button("Confirm", variant="primary", id="confirm")),
id="dialog",
)
def on_button_pressed(self, event):
if event.button.id == "confirm":
self.dismiss(True)
else:
self.dismiss(False)
class QueueManagerApp(App):
BINDINGS = [
("escape", "quit()", "Quit"),
("ctrl+r", "refresh", "Refresh"),
("e", "enable_trigger", "Enable"),
("d", "disable_trigger", "Disable"),
("p", "purge_queue", "Purge"),
("f12", "take_screenshot()", "Screenshot"),
]
def __init__(self):
super().__init__()
parser = argparse.ArgumentParser(description="QueueManager")
parser.add_argument("--env", required=True, help="Select env")
options = parser.parse_args()
export_variables(options.env)
with open(os.environ["YC_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"]) as json_file:
sa_key = json.loads(json_file.read())
self.yc_sdk = yandexcloud.SDK(service_account_key=sa_key)
self.yc_triggers = self.yc_sdk.client(TriggerServiceStub)
self.queues = {}
def compose(self):
yield DataTable(cursor_type="row")
yield Footer()
def on_mount(self):
table = self.query_one(DataTable)
table.add_columns("Name", "Type", "Messages", "Delayed", "Invisible", "Trigger")
self.action_refresh()
def action_refresh(self):
table = self.query_one(DataTable)
# table.loading = True
self.get_queues()
class ReturnData(Message):
def __init__(self, data):
self.data = data
super().__init__()
@work(exclusive=True, thread=True)
def get_queues(self):
tlist = self.yc_triggers.List(ListTriggersRequest(folder_id=os.environ["YC_FOLDER_ID"]))
triggers = {}
for trigger in tlist.triggers:
qname = trigger.rule.message_queue.queue_id.rsplit(":", 1)[-1]
triggers[qname] = {
"id": trigger.id,
"status": _TRIGGER_STATUS.values_by_number[trigger.status].name
}
qlist = queue_list()
queues = {}
for queue in qlist:
name = queue.rsplit("/", 1)[-1]
attrs = queue_get_attributes(queue)
queues[queue] = {
"name": self.QMAP.get(name, name),
"type": "FIFO" if name.endswith(".fifo") else "Standard",
"attrs": attrs,
"trigger": triggers.get(name, {"id": None, "status": "N/A"}),
}
self.post_message(self.ReturnData(queues))
QMAP = {
}
@on(ReturnData)
async def update_table(self, return_data):
table = self.query_one(DataTable)
coordinate = table.cursor_coordinate
table.clear()
self.queues = return_data.data
for qk, qv in sorted(return_data.data.items(), key=lambda v: v[1]['name']):
table.add_row(
qv["name"],
qv["type"],
qv["attrs"]["ApproximateNumberOfMessages"],
qv["attrs"]["ApproximateNumberOfMessagesDelayed"],
qv["attrs"]["ApproximateNumberOfMessagesNotVisible"],
f'[red]{qv["trigger"]["status"]}[/red]'
if qv["trigger"]["status"] == "PAUSED"
else f'[green]{qv["trigger"]["status"]}[/green]'
if qv["trigger"]["status"] == "ACTIVE"
else qv["trigger"]["status"],
key=qk
)
table.cursor_coordinate = coordinate
table.loading = False
table.focus()
def action_purge_queue(self):
table = self.query_one(DataTable)
if not table.is_valid_coordinate(table.cursor_coordinate):
return
cell_key = table.coordinate_to_cell_key(table.cursor_coordinate)
row_key = cell_key.row_key.value
if row_key is None:
return
def check_confirm(confirm):
if confirm:
queue_purge(row_key)
self.notify(self.queues[row_key]["name"], title="Purged")
self.action_refresh()
self.app.push_screen(
ConfirmModal(f'Purge queue {self.queues[row_key]["name"]}?'),
check_confirm,
)
def action_enable_trigger(self):
table = self.query_one(DataTable)
if not table.is_valid_coordinate(table.cursor_coordinate):
return
cell_key = table.coordinate_to_cell_key(table.cursor_coordinate)
row_key = cell_key.row_key.value
if row_key is None:
return
if self.queues[row_key]["trigger"]["id"] is None:
return
self.yc_triggers.Resume(ResumeTriggerRequest(trigger_id=self.queues[row_key]["trigger"]["id"]))
self.notify(self.queues[row_key]["name"], title="Enabled")
self.action_refresh()
def action_disable_trigger(self):
table = self.query_one(DataTable)
if not table.is_valid_coordinate(table.cursor_coordinate):
return
cell_key = table.coordinate_to_cell_key(table.cursor_coordinate)
row_key = cell_key.row_key.value
if row_key is None:
return
if self.queues[row_key]["trigger"]["id"] is None:
return
self.yc_triggers.Pause(PauseTriggerRequest(trigger_id=self.queues[row_key]["trigger"]["id"]))
self.notify(self.queues[row_key]["name"], title="Disabled")
self.action_refresh()
def action_take_screenshot(self):
filename = self.save_screenshot()
self.notify(f"Saved in {filename}", title="Screenshot saved")
app = QueueManagerApp()
if __name__ == "__main__":
app.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment