Last active
March 9, 2025 04:27
-
-
Save unbracketed/16ea9d57fea1ac51e8602e238351f374 to your computer and use it in GitHub Desktop.
Single-file nanodjango llm chat which can render highlighted code blocks for nice display and easy copy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# /// script | |
# requires-python = ">=3.12" | |
# dependencies = [ | |
# "nanodjango", | |
# "channels", | |
# "daphne", | |
# "htpy", | |
# "markdown", | |
# "markupsafe", | |
# "llm" | |
# ] | |
# /// | |
import json | |
import uuid | |
from channels.generic.websocket import WebsocketConsumer | |
from channels.routing import ProtocolTypeRouter, URLRouter | |
from channels.auth import AuthMiddlewareStack | |
from django.http import HttpResponse | |
from django.urls import path | |
from markupsafe import Markup | |
from markdown import markdown | |
from htpy import ( | |
body, | |
button, | |
div, | |
form, | |
h1, | |
head, | |
html, | |
input, | |
meta, | |
script, | |
link, | |
title, | |
main, | |
style, | |
fieldset, | |
article, | |
) | |
from nanodjango import Django | |
import llm | |
# . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . | |
# | |
# ┌┬┐┌─┐┌┬┐┌─┐┬ ┌─┐┌┬┐┌─┐ | |
# │ ├┤ │││├─┘│ ├─┤ │ ├┤ | |
# ┴ └─┘┴ ┴┴ ┴─┘┴ ┴ ┴ └─┘ | |
def html_template(): | |
return html[ | |
head[ | |
meta(charset="utf-8"), | |
meta(name="viewport", content="width=device-width, initial-scale=1"), | |
title["llm chat"], | |
script(src="https://unpkg.com/[email protected]"), | |
script(src="https://unpkg.com/[email protected]"), | |
script(src="https://cdn.jsdelivr.net/npm/marked/marked.min.js"), | |
link(rel="stylesheet", href="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/styles/default.min.css"), | |
script(src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/highlight.min.js"), | |
script(src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/languages/python.min.js"), | |
script(src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/languages/javascript.min.js"), | |
script(src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/languages/bash.min.js"), | |
script(src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/languages/sql.min.js"), | |
script(src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/languages/yaml.min.js"), | |
script(src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/languages/css.min.js"), | |
script(src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/languages/dockerfile.min.js"), | |
script(src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/languages/http.min.js"), | |
script(src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/languages/json.min.js"), | |
script(src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/languages/makefile.min.js"), | |
script(src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/languages/pgsql.min.js"), | |
link( | |
rel="stylesheet", | |
href="https://cdn.jsdelivr.net/npm/@picocss/pico@2/css/pico.min.css", | |
), | |
style[ | |
Markup(""" | |
.message { padding: .5rem; } | |
.user-message { | |
border: 1px solid #999; | |
border-radius: 0.5rem; | |
margin-bottom: 0.5rem; | |
} | |
.response-message { | |
font-weight: bold; | |
background-color: #333; | |
border: 1px solid green; | |
border-radius: 0.5rem; | |
margin-bottom: 1rem; | |
} | |
.markdown-content { | |
display: none; | |
} | |
""") | |
], | |
script[ | |
Markup(""" | |
// Create a MutationObserver to watch for content changes in hidden elements | |
const observer = new MutationObserver((mutations) => { | |
mutations.forEach((mutation) => { | |
if (mutation.type === 'childList' && mutation.target.classList.contains('markdown-content')) { | |
// Get the visible container (sibling of the hidden content) | |
const visibleContainer = mutation.target.nextElementSibling; | |
if (visibleContainer) { | |
visibleContainer.innerHTML = marked.parse(mutation.target.textContent); | |
visibleContainer.querySelectorAll('pre code').forEach((block) => { | |
hljs.highlightElement(block); | |
}); | |
} | |
} | |
}); | |
}); | |
// Start observing the message list for changes | |
document.addEventListener('DOMContentLoaded', () => { | |
const messageList = document.getElementById('message-list'); | |
if (messageList) { | |
observer.observe(messageList, { | |
childList: true, | |
subtree: true, | |
characterData: true | |
}); | |
} | |
}); | |
""") | |
], | |
], | |
body[ | |
main(class_="container")[ | |
article[ | |
h1["♚ code king"], | |
div(hx_ext="ws", ws_connect="/ws/echo/")[ | |
div("#message-list"), | |
form(ws_send=True)[ | |
fieldset(role="group")[ | |
input( | |
name="message", | |
type="text", | |
placeholder="Type your message...", | |
autocomplete="off", | |
), | |
button( | |
class_="primary outline", | |
type="submit", | |
onclick="setTimeout(() => this.closest('form').querySelector('input[name=message]').value = '', 0)", | |
)["↩"], | |
] | |
], | |
], | |
], | |
], | |
], | |
] | |
# . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . | |
# | |
# ┌─┐┌─┐┌┬┐┌─┐┌─┐┌┐┌┌─┐┌┐┌┌┬┐┌─┐ | |
# │ │ ││││├─┘│ ││││├┤ │││ │ └─┐ | |
# └─┘└─┘┴ ┴┴ └─┘┘└┘└─┘┘└┘ ┴ └─┘ | |
def response_message(message_text, id): | |
return div("#message-list", hx_swap_oob=f"beforeend:{id} .markdown-content")[message_text] | |
def formatted_response_message(message_text, id): | |
return div(id, hx_swap_oob="outerHTML")[ | |
div(data_theme="dark", class_="message response-message")[ | |
Markup(markdown(message_text, extensions=['fenced_code'])) | |
] | |
] | |
def response_container(id, classname="response-message"): | |
return div("#message-list", hx_swap_oob="beforeend")[ | |
div(id, class_=["message", classname], data_theme="dark")[ | |
div(class_="markdown-content")[""], # Hidden element for raw markdown | |
div(class_="rendered-content")[""] # Visible element for rendered HTML | |
] | |
] | |
def user_message(message_text): | |
return div("#message-list", hx_swap_oob="beforeend")[ | |
div(class_=["message", "user-message"])[ | |
message_text | |
] | |
] | |
# . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . | |
# | |
# ┬ ┬┬┌─┐┬ ┬┌─┐ | |
# └┐┌┘│├┤ │││└─┐ | |
# └┘ ┴└─┘└┴┘└─┘ | |
def index(request): | |
return HttpResponse(html_template()) | |
# . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . | |
# | |
# ┬ ┬┌─┐┌┐ ┌─┐┌─┐┌─┐┬┌─┌─┐┌┬┐ | |
# │││├┤ ├┴┐└─┐│ ││ ├┴┐├┤ │ | |
# └┴┘└─┘└─┘└─┘└─┘└─┘┴ ┴└─┘ ┴ | |
def is_code_fence(line): | |
return line.strip().startswith("```") | |
def get_code_language(line): | |
if is_code_fence(line): | |
lang = line.strip()[3:].strip() | |
return lang if lang else None | |
return None | |
class EchoConsumer(WebsocketConsumer): | |
def receive(self, text_data): | |
text_data_json = json.loads(text_data) | |
message_text = text_data_json.get("message", "") | |
if not message_text.strip(): | |
return | |
user_message_html = user_message(message_text) | |
self.send(text_data=user_message_html) | |
response = get_model().prompt(message_text) | |
current_container_id = f"#response-message-{str(uuid.uuid4())}" | |
current_container_html = response_container(current_container_id) | |
self.send(text_data=current_container_html) | |
buffer = "" | |
in_code_block = False | |
code_language = None | |
for chunk in response: | |
buffer += chunk | |
lines = buffer.split('\n') | |
buffer = lines[-1] # Keep the last partial line in the buffer | |
for line in lines[:-1]: # Process all complete lines | |
if is_code_fence(line): | |
if not in_code_block: | |
# Start of code block | |
in_code_block = True | |
code_language = get_code_language(line) | |
# Create new code block container | |
code_container_id = f"#code-block-{str(uuid.uuid4())}" | |
code_container_html = response_container(code_container_id, "code-block") | |
self.send(text_data=code_container_html) | |
# Send the fence line | |
self.send(text_data=response_message(line + '\n', code_container_id)) | |
else: | |
# End of code block | |
in_code_block = False | |
code_language = None | |
# Send the closing fence | |
self.send(text_data=response_message(line + '\n', code_container_id)) | |
# Create new response container for any following text | |
current_container_id = f"#response-message-{str(uuid.uuid4())}" | |
current_container_html = response_container(current_container_id) | |
self.send(text_data=current_container_html) | |
else: | |
# Regular content | |
container_id = code_container_id if in_code_block else current_container_id | |
self.send(text_data=response_message(line + '\n', container_id)) | |
# Handle any remaining content in the buffer | |
if buffer: | |
container_id = code_container_id if in_code_block else current_container_id | |
self.send(text_data=response_message(buffer, container_id)) | |
# . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . | |
# | |
# ╔═╗╔═╗╔═╗ | |
# ╠═╣╠═╝╠═╝ | |
# ╩ ╩╩ ╩ | |
app = Django( | |
# EXTRA_APPS=[ | |
# "channels", | |
# ], | |
# | |
# Nanodjango doesn't yet support prepending "priority" apps to INSTALLED_APPS, | |
# and `daphne` must be the first app in INSTALLED_APPS. | |
INSTALLED_APPS=[ | |
"daphne", | |
"django.contrib.admin", | |
"django.contrib.auth", | |
"django.contrib.contenttypes", | |
"django.contrib.sessions", | |
"django.contrib.messages", | |
"whitenoise.runserver_nostatic", | |
"django.contrib.staticfiles", | |
"channels", | |
], | |
CHANNEL_LAYERS={ | |
"default": { | |
"BACKEND": "channels.layers.InMemoryChannelLayer", | |
}, | |
}, | |
ASGI_APPLICATION="__main__.htmx_websocket_interface", | |
) | |
# . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . | |
# | |
# ┬ ┬┬─┐┬ ┌─┐ | |
# │ │├┬┘│ └─┐ | |
# └─┘┴└─┴─┘└─┘ | |
app.route("/")(index) | |
websocket_urlpatterns = [ | |
path("ws/echo/", EchoConsumer.as_asgi()), | |
] | |
htmx_websocket_interface = ProtocolTypeRouter( | |
{ | |
"http": app.asgi, | |
"websocket": AuthMiddlewareStack(URLRouter(websocket_urlpatterns)), | |
} | |
) | |
# . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . | |
# | |
# ┬ ┬ ┌┬┐ | |
# │ │ │││ | |
# ┴─┘┴─┘┴ ┴ | |
_model = None | |
def get_model(): | |
global _model | |
if _model is None: | |
active_model = llm.get_model() | |
_model = active_model.conversation() | |
return _model | |
# ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ | |
# | |
if __name__ == "__main__": | |
app.run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from datetime import datetime | |
from htpy import div, table, tr, td, a, thead, th, tbody | |
from markdown import markdown | |
from markupsafe import Markup | |
def formatted_response_message(message_text: str, id: str): | |
return div( | |
f"#response-message-{id}", data_theme="dark", class_="message response-message" | |
)[Markup(markdown(message_text, extensions=["fenced_code"]))] | |
def pretty_printed_response_message(message_text: str, model_name: str): | |
return div[ | |
Markup( | |
markdown( | |
f"```json\n{json.dumps(json.loads(message_text), indent=2)}\n```", | |
extensions=["fenced_code"], | |
) | |
) | |
] | |
def formatted_response_message_swap_wrapper(message_text, id): | |
return div(id, hx_swap_oob="outerHTML")[ | |
formatted_response_message(message_text, id) | |
] | |
def response_container(id, classname="response-message"): | |
return div("#message-list", hx_swap_oob="beforeend")[ | |
div(id, class_=["message", classname], data_theme="dark")[ | |
div(class_="markdown-content")[""], # Hidden element for raw markdown | |
div(class_="rendered-content")[""], # Visible element for rendered HTML | |
] | |
] | |
def user_message(message_text, label=None): | |
return div(class_=["message", "user-message"])[ | |
[label, message_text] if label else message_text | |
] | |
def user_message_swap_wrapper(message_text): | |
return div("#message-list", hx_swap_oob="beforeend")[user_message(message_text)] | |
def system_message(message_text: str, label=None): | |
return div(class_=["message", "system-message"])[ | |
[label, message_text] if label else message_text | |
] | |
def message_label(label: str, label_class: str = "message-label-response"): | |
return div(class_=["message-label", label_class])[label] | |
def message_label_swap_wrapper(label): | |
return div("#message-list", hx_swap_oob="beforeend")[message_label(label)] | |
def prompt_attachment(attachment): | |
return div(class_=["message", "prompt-attachment"])[attachment] | |
def conversations_list(conversations): | |
return table[ | |
thead[ | |
tr[ | |
th["Conversation"], | |
th["Most Recent"], | |
th["Models"], | |
th["Schemas Used"], | |
th["System Prompt"], | |
th["Input Tokens"], | |
th["Output Tokens"], | |
] | |
], | |
tbody[ | |
[ | |
tr[ | |
td[a(href=f"/conversation/{row['id']}")[row["name"]]], | |
td[ | |
( | |
datetime.strptime( | |
row["most_recent"].replace("+00:00", ""), | |
"%Y-%m-%dT%H:%M:%S.%f", | |
).strftime("%b %-d, %Y") | |
if row["most_recent"] | |
else "" | |
) | |
], | |
td[row["models"]], | |
td[row["schemas_used"]], | |
td[row["has_system_prompt"]], | |
td[row["input_tokens"]], | |
td[row["output_tokens"]], | |
] | |
for row in conversations | |
] | |
], | |
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# /// script | |
# requires-python = ">=3.12" | |
# dependencies = [ | |
# "nanodjango", | |
# "htpy", | |
# "sqlite-utils", | |
# "markdown", | |
# "markupsafe", | |
# ] | |
# /// | |
import json | |
from django.http import HttpResponse | |
from htpy import ( | |
body, | |
div, | |
h1, | |
head, | |
script, | |
html, | |
meta, | |
link, | |
title, | |
main, | |
article, | |
ul, | |
li, | |
a, | |
img, | |
style, | |
) | |
from nanodjango import Django | |
from markdown import markdown | |
from markupsafe import Markup | |
from sqlite_utils import Database | |
from sqlite_utils.db import NotFoundError | |
from fullmoon.ui.components import ( | |
formatted_response_message, | |
system_message, | |
user_message, | |
pretty_printed_response_message, | |
message_label, | |
prompt_attachment, | |
conversations_list, | |
) | |
# . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . | |
# | |
# ┌┬┐┌─┐┌┬┐┌─┐┬ ┌─┐┌┬┐┌─┐ | |
# │ ├┤ │││├─┘│ ├─┤ │ ├┤ | |
# ┴ └─┘┴ ┴┴ ┴─┘┴ ┴ ┴ └─┘ | |
def html_template(content, page_title="llm conversations"): | |
return html[ | |
head[ | |
meta(charset="utf-8"), | |
meta(name="viewport", content="width=device-width, initial-scale=1"), | |
title[page_title], | |
link( | |
rel="stylesheet", | |
href="https://cdn.jsdelivr.net/npm/@picocss/pico@2/css/pico.min.css", | |
), | |
link( | |
rel="stylesheet", | |
href="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/styles/default.min.css", | |
), | |
script( | |
src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/highlight.min.js" | |
), | |
script( | |
src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/languages/python.min.js" | |
), | |
style[ | |
Markup( | |
""" | |
.message { padding: .5rem; } | |
.system-message { | |
font-weight: bold; | |
background-color: #333; | |
color: #fff; | |
border: 1px solid red; | |
border-radius: 0.5rem; | |
margin-bottom: 0.5rem; | |
} | |
.user-message { | |
border: 1px solid #999; | |
border-radius: 0.5rem; | |
margin-bottom: 0.5rem; | |
} | |
.response-message { | |
font-weight: bold; | |
background-color: #333; | |
border: 1px solid green; | |
border-radius: 0.5rem; | |
margin-bottom: 1rem; | |
} | |
.message-label { | |
margin-left: 0.5rem; | |
} | |
.message-label-system { | |
font-weight: bold; | |
font-size: 0.7rem; | |
color: red; | |
} | |
.message-label-user, .message-label-attachment { | |
font-weight: bold; | |
font-size: 0.7rem; | |
color: blue; | |
} | |
.message-label-model { | |
font-weight: bold; | |
font-size: 0.7rem; | |
color: green; | |
} | |
.prompt-attachment { | |
border: 1px solid #999; | |
border-radius: 0.5rem; | |
margin-bottom: 0.5rem; | |
} | |
.markdown-content { | |
display: none; | |
} | |
""" | |
) | |
], | |
], | |
body[ | |
main(class_="container")[article[h1[page_title], div[content or "begin"]],], | |
script[ | |
Markup( | |
""" | |
hljs.highlightAll(); | |
""" | |
) | |
], | |
], | |
] | |
# . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . | |
# | |
# ┬ ┬┬┌─┐┬ ┬┌─┐ | |
# └┐┌┘│├┤ │││└─┐ | |
# └┘ ┴└─┘└┴┘└─┘ | |
def index(request): | |
conversations = [] | |
for convo in conversations_table().rows: | |
models_used = set() | |
# tokens_used = defaultdict(int) | |
schemas_used = set() | |
most_recent_response_timestamp = None | |
input_tokens = 0 | |
output_tokens = 0 | |
has_system_prompt = False | |
has_attachments = False | |
responses = responses_table().rows_where( | |
select="id, model, input_tokens, output_tokens, schema_id, datetime_utc, system", | |
where="conversation_id = ?", | |
where_args=[convo["id"]], | |
) | |
for response in responses: | |
print(response) | |
# Tokens aren't tracked for all models (llama3, for example) | |
input_tokens += response["input_tokens"] or 0 | |
output_tokens += response["output_tokens"] or 0 | |
if response["schema_id"]: | |
schemas_used.add(response["schema_id"]) | |
if response["model"]: | |
models_used.add(response["model"]) | |
if response["system"]: | |
has_system_prompt = "True" | |
# if response["attachments"]: | |
# has_attachments = True | |
if ( | |
most_recent_response_timestamp is None | |
or response["datetime_utc"] > most_recent_response_timestamp | |
): | |
most_recent_response_timestamp = response["datetime_utc"] | |
print(schemas_used) | |
conversations.append( | |
{ | |
"id": convo["id"], | |
"models": models_used, | |
"name": convo["name"], | |
"most_recent": most_recent_response_timestamp, | |
"input_tokens": input_tokens, | |
"output_tokens": output_tokens, | |
"schemas_used": "✓" if len(schemas_used) > 0 else "", | |
"has_system_prompt": "✓" if has_system_prompt else "", | |
} | |
) | |
return HttpResponse(html_template(conversations_list(reversed(conversations)))) | |
def prompt_attachments(response_id): | |
_attachments = list( | |
prompt_attachments_table().rows_where( | |
select="attachment_id", where="response_id = ?", where_args=[response_id] | |
) | |
) | |
if _attachments: | |
att_list = [] | |
for _attachment in _attachments: | |
attachment = attachments_table().get(_attachment["attachment_id"]) | |
if attachment["type"] in ["image/jpeg", "image/png", "image/gif"]: | |
if attachment["url"]: | |
att_list.append( | |
[ | |
message_label("Attachment", "message-label-attachment"), | |
prompt_attachment( | |
a(href=attachment["url"], target="_blank")[ | |
img(src=attachment["url"]) | |
] | |
), | |
] | |
) | |
if attachment["path"]: | |
att_list.append( | |
[ | |
message_label("Attachment", "message-label-attachment"), | |
prompt_attachment(f"Attachment: {attachment['path']}"), | |
] | |
) | |
else: | |
att_list.append( | |
div[f"Unsupported attachment type: {attachment['type']}"] | |
) | |
return div[att_list] | |
return None | |
def conversation_detail(request, conversation_id): | |
conversation = conversations_table().get(conversation_id) | |
def _system(row): | |
return ( | |
[ | |
message_label("System", "message-label-system"), | |
system_message(row["system"]), | |
] | |
if row["system"] | |
else None | |
) | |
def _prompt(row): | |
return ( | |
[message_label("Prompt", "message-label-user"), user_message(row["prompt"])] | |
if row["prompt"] | |
else None | |
) | |
def _response(row): | |
_schema = None | |
try: | |
_schema = schemas_table().get(row["schema_id"]) | |
except NotFoundError as e: | |
pass | |
if _schema: | |
formatted_response = pretty_printed_response_message( | |
row["response"], row["model"] | |
) | |
schema = [message_label("Schema", "message-label-schema"), pretty_printed_response_message( | |
_schema["content"], "Schema" | |
)] | |
else: | |
formatted_response = formatted_response_message(row["response"], row["id"]) | |
schema = None | |
return [ | |
[message_label(row["model"], "message-label-model"), formatted_response], | |
schema, | |
] | |
responses = [ | |
div(id=row["id"])[ | |
div[_system(row)] if _system(row) else None, | |
div[_response(row)[1]] if _response(row)[1] else None, | |
( | |
div[_prompt(row), prompt_attachments(row["id"])] | |
if prompt_attachments(row["id"]) | |
else _prompt(row) | |
), | |
div[_response(row)[0]], | |
] | |
for row in responses_table().rows | |
if row["conversation_id"] == conversation_id | |
] | |
return HttpResponse( | |
html_template(responses, page_title=f"Conversation: {conversation['name']}") | |
) | |
def _get_schema_display_text(schema): | |
properties = schema.get("properties", {}) | |
if list(properties.keys())[0] == "items": | |
# This is a schema-multi, get the nested properties | |
nested_props = properties["items"].get("items", {}).get("properties", {}) | |
return "Multi: " + ", ".join(nested_props.keys()) | |
else: | |
# Regular schema | |
return ", ".join(properties.keys()) | |
def schema_list(request): | |
schemas = [ | |
ul[ | |
li[ | |
a(href=f"/schemas/{row['id']}")[ | |
_get_schema_display_text(json.loads(row["content"])) | |
] | |
] | |
] | |
for row in schemas_table().rows | |
] | |
return HttpResponse(html_template(schemas, page_title="Schemas")) | |
def schema_detail(request, schema_id): | |
schema = schemas_table().get(schema_id) | |
return HttpResponse( | |
html_template( | |
div[ | |
Markup( | |
markdown( | |
f"```json\n{json.dumps(json.loads(schema['content']), indent=2)}\n```", | |
extensions=["fenced_code"], | |
) | |
) | |
], | |
page_title=f"schema {schema['id']}", | |
) | |
) | |
# | |
# API | |
# | |
def list_conversations(request): | |
conversations = [ | |
{ | |
"id": row["id"], | |
"model": row["model"], | |
"name": row["name"], | |
} | |
for row in conversations_table().rows | |
] | |
return conversations | |
# . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . | |
# | |
# ╔═╗╔═╗╔═╗ | |
# ╠═╣╠═╝╠═╝ | |
# ╩ ╩╩ ╩ | |
app = Django() | |
_db_path = None | |
def get_llm_db(): | |
import subprocess | |
global _db_path | |
if _db_path is None: | |
_db_path = subprocess.check_output(["llm", "logs", "path"]).decode().strip() | |
return Database(_db_path) | |
def conversations_table(): | |
db = get_llm_db() | |
return db["conversations"] | |
def responses_table(): | |
db = get_llm_db() | |
return db["responses"] | |
def schemas_table(): | |
db = get_llm_db() | |
return db["schemas"] | |
def prompt_attachments_table(): | |
db = get_llm_db() | |
return db["prompt_attachments"] | |
def attachments_table(): | |
db = get_llm_db() | |
return db["attachments"] | |
# . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . | |
# | |
# ┬ ┬┬─┐┬ ┌─┐ | |
# │ │├┬┘│ └─┐ | |
# └─┘┴└─┴─┘└─┘ | |
app.route("/")(index) | |
app.route("/conversation/<conversation_id>/")(conversation_detail) | |
app.route("/schemas/")(schema_list) | |
app.route("/schemas/<schema_id>/")(schema_detail) | |
# app.api.get("/list-conversations")(list_conversations) | |
# ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ | |
# | |
if __name__ == "__main__": | |
app.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment