Skip to content

Instantly share code, notes, and snippets.

@bearlike
Created September 21, 2024 07:33
Show Gist options
  • Save bearlike/30a2be0a7a7288d8dbee208caaf0fc1f to your computer and use it in GitHub Desktop.
Save bearlike/30a2be0a7a7288d8dbee208caaf0fc1f to your computer and use it in GitHub Desktop.
OpenWebUI Docker Tool
"""
title: Get Information on Docker Containers
author: KK
funding_url: https://github.com/bearlike
version: 0.1.0
license: MIT
"""
from datetime import timedelta, datetime
from typing import Callable, Any, Tuple
import asyncio
import json
from pydantic import BaseModel, Field
import docker
class EventEmitter:
def __init__(self, event_emitter: Callable[[dict], Any] = None):
self.event_emitter = event_emitter
async def emit(self, description="Unknown State", status="in_progress", done=False):
if self.event_emitter:
await self.event_emitter(
{
"type": "status",
"data": {
"status": status,
"description": description,
"done": done,
},
}
)
class Utils:
def __init__(self):
pass
@staticmethod
def unix_to_human_ts(timestamp: int) -> str:
return datetime.utcfromtimestamp(timestamp).strftime("%Y-%b-%d %H:%M:%S")
@staticmethod
def unix_to_human_delta(timestamp):
if isinstance(timestamp, str):
# Example: 2024-09-21T06:11:05.256103146Z
timestamp = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ")
if isinstance(timestamp, int) and (timestamp <= 0 or timestamp >= 8640000):
return "N/A"
return str(timedelta(seconds=timestamp)).replace(", 0:00:00", "")
@staticmethod
def bytes_to_human_size(size: int) -> str:
suffixes = ["B", "KB", "MB", "GB", "TB"]
index = 0
while size >= 1024 and index < len(suffixes) - 1:
size /= 1024
index += 1
return f"{size:.2f} {suffixes[index]}"
@staticmethod
def get_docker_container_info(container_ids) -> Tuple[dict, set]:
stacks = set()
container_map = {}
for container_id in container_ids:
container_map[container_id.short_id] = {
"name": container_id.name,
"status": container_id.status,
"created": container_id.attrs["Created"],
"image": container_id.attrs["Config"]["Image"],
"uptime": container_id.attrs["State"]["StartedAt"],
"ports": container_id.attrs["NetworkSettings"]["Ports"],
}
labels = container_id.labels
labels_tmp = {}
labels_tmp["Compose Project Name"] = labels.get(
"com.docker.compose.project", None
)
if labels_tmp["Compose Project Name"] is not None:
stacks.add(labels_tmp["Compose Project Name"])
labels_tmp["Service ID"] = labels.get(
"com.docker.compose.service", "N/A"
)
container_map[container_id.short_id]["labels"] = labels_tmp
results_json = list(container_map.values())
return results_json, stacks
class Tools:
class Valves(BaseModel):
DAEMON_URL: str = Field(
default="unix://var/run/docker.sock",
description="Docker Daemon URL.",
)
def __init__(self):
self.valves = self.Valves()
async def get_docker_container_info(
self, container_id: str = None, __event_emitter__: Callable[[dict], Any] = None
) -> str:
"""
Retrieve the status, metadata and other known information for docker containers.
:param container_id: Optional ID of a specific container. If not provided, retrieves all containers.
:return: A list of container statuses.
"""
emitter = EventEmitter(__event_emitter__)
stacks = set()
try:
await emitter.emit("Connecting to Docker Daemon")
client = docker.DockerClient(base_url=self.valves.DAEMON_URL)
container_ids = client.containers.list()
results_json, stacks = Utils.get_docker_container_info(container_ids)
await emitter.emit(
status="complete",
description=f"Retrieved {len(results_json)} containers and {len(stacks)} stacks",
done=True,
)
# Add stacks to the results
results_json.append({"Available Docker Compose Stacks": list(stacks)})
except Exception as err:
await emitter.emit(
status="error",
description=f"An error occurred while fetching containers: {str(err)}",
done=True,
)
results_json = [{"error": str(err)}]
return json.dumps(results_json, ensure_ascii=False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment