Skip to content

Instantly share code, notes, and snippets.

@unbracketed
Last active March 9, 2025 04:27
Show Gist options
  • Save unbracketed/16ea9d57fea1ac51e8602e238351f374 to your computer and use it in GitHub Desktop.
Save unbracketed/16ea9d57fea1ac51e8602e238351f374 to your computer and use it in GitHub Desktop.
Single-file nanodjango llm chat which can render highlighted code blocks for nice display and easy copy
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "nanodjango",
# "channels",
# "daphne",
# "htpy",
# "markdown",
# "markupsafe",
# "llm"
# ]
# ///
import json
import uuid
from channels.generic.websocket import WebsocketConsumer
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from django.http import HttpResponse
from django.urls import path
from markupsafe import Markup
from markdown import markdown
from htpy import (
body,
button,
div,
form,
h1,
head,
html,
input,
meta,
script,
link,
title,
main,
style,
fieldset,
article,
)
from nanodjango import Django
import llm
# . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
#
# ┌┬┐┌─┐┌┬┐┌─┐┬ ┌─┐┌┬┐┌─┐
# │ ├┤ │││├─┘│ ├─┤ │ ├┤
# ┴ └─┘┴ ┴┴ ┴─┘┴ ┴ ┴ └─┘
def html_template():
return html[
head[
meta(charset="utf-8"),
meta(name="viewport", content="width=device-width, initial-scale=1"),
title["llm chat"],
script(src="https://unpkg.com/[email protected]"),
script(src="https://unpkg.com/[email protected]"),
script(src="https://cdn.jsdelivr.net/npm/marked/marked.min.js"),
link(rel="stylesheet", href="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/styles/default.min.css"),
script(src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/highlight.min.js"),
script(src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/languages/python.min.js"),
script(src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/languages/javascript.min.js"),
script(src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/languages/bash.min.js"),
script(src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/languages/sql.min.js"),
script(src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/languages/yaml.min.js"),
script(src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/languages/css.min.js"),
script(src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/languages/dockerfile.min.js"),
script(src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/languages/http.min.js"),
script(src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/languages/json.min.js"),
script(src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/languages/makefile.min.js"),
script(src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/languages/pgsql.min.js"),
link(
rel="stylesheet",
href="https://cdn.jsdelivr.net/npm/@picocss/pico@2/css/pico.min.css",
),
style[
Markup("""
.message { padding: .5rem; }
.user-message {
border: 1px solid #999;
border-radius: 0.5rem;
margin-bottom: 0.5rem;
}
.response-message {
font-weight: bold;
background-color: #333;
border: 1px solid green;
border-radius: 0.5rem;
margin-bottom: 1rem;
}
.markdown-content {
display: none;
}
""")
],
script[
Markup("""
// Create a MutationObserver to watch for content changes in hidden elements
const observer = new MutationObserver((mutations) => {
mutations.forEach((mutation) => {
if (mutation.type === 'childList' && mutation.target.classList.contains('markdown-content')) {
// Get the visible container (sibling of the hidden content)
const visibleContainer = mutation.target.nextElementSibling;
if (visibleContainer) {
visibleContainer.innerHTML = marked.parse(mutation.target.textContent);
visibleContainer.querySelectorAll('pre code').forEach((block) => {
hljs.highlightElement(block);
});
}
}
});
});
// Start observing the message list for changes
document.addEventListener('DOMContentLoaded', () => {
const messageList = document.getElementById('message-list');
if (messageList) {
observer.observe(messageList, {
childList: true,
subtree: true,
characterData: true
});
}
});
""")
],
],
body[
main(class_="container")[
article[
h1["♚ code king"],
div(hx_ext="ws", ws_connect="/ws/echo/")[
div("#message-list"),
form(ws_send=True)[
fieldset(role="group")[
input(
name="message",
type="text",
placeholder="Type your message...",
autocomplete="off",
),
button(
class_="primary outline",
type="submit",
onclick="setTimeout(() => this.closest('form').querySelector('input[name=message]').value = '', 0)",
)["↩"],
]
],
],
],
],
],
]
# . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
#
# ┌─┐┌─┐┌┬┐┌─┐┌─┐┌┐┌┌─┐┌┐┌┌┬┐┌─┐
# │ │ ││││├─┘│ ││││├┤ │││ │ └─┐
# └─┘└─┘┴ ┴┴ └─┘┘└┘└─┘┘└┘ ┴ └─┘
def response_message(message_text, id):
return div("#message-list", hx_swap_oob=f"beforeend:{id} .markdown-content")[message_text]
def formatted_response_message(message_text, id):
return div(id, hx_swap_oob="outerHTML")[
div(data_theme="dark", class_="message response-message")[
Markup(markdown(message_text, extensions=['fenced_code']))
]
]
def response_container(id, classname="response-message"):
return div("#message-list", hx_swap_oob="beforeend")[
div(id, class_=["message", classname], data_theme="dark")[
div(class_="markdown-content")[""], # Hidden element for raw markdown
div(class_="rendered-content")[""] # Visible element for rendered HTML
]
]
def user_message(message_text):
return div("#message-list", hx_swap_oob="beforeend")[
div(class_=["message", "user-message"])[
message_text
]
]
# . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
#
# ┬ ┬┬┌─┐┬ ┬┌─┐
# └┐┌┘│├┤ │││└─┐
# └┘ ┴└─┘└┴┘└─┘
def index(request):
return HttpResponse(html_template())
# . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
#
# ┬ ┬┌─┐┌┐ ┌─┐┌─┐┌─┐┬┌─┌─┐┌┬┐
# │││├┤ ├┴┐└─┐│ ││ ├┴┐├┤ │
# └┴┘└─┘└─┘└─┘└─┘└─┘┴ ┴└─┘ ┴
def is_code_fence(line):
return line.strip().startswith("```")
def get_code_language(line):
if is_code_fence(line):
lang = line.strip()[3:].strip()
return lang if lang else None
return None
class EchoConsumer(WebsocketConsumer):
def receive(self, text_data):
text_data_json = json.loads(text_data)
message_text = text_data_json.get("message", "")
if not message_text.strip():
return
user_message_html = user_message(message_text)
self.send(text_data=user_message_html)
response = get_model().prompt(message_text)
current_container_id = f"#response-message-{str(uuid.uuid4())}"
current_container_html = response_container(current_container_id)
self.send(text_data=current_container_html)
buffer = ""
in_code_block = False
code_language = None
for chunk in response:
buffer += chunk
lines = buffer.split('\n')
buffer = lines[-1] # Keep the last partial line in the buffer
for line in lines[:-1]: # Process all complete lines
if is_code_fence(line):
if not in_code_block:
# Start of code block
in_code_block = True
code_language = get_code_language(line)
# Create new code block container
code_container_id = f"#code-block-{str(uuid.uuid4())}"
code_container_html = response_container(code_container_id, "code-block")
self.send(text_data=code_container_html)
# Send the fence line
self.send(text_data=response_message(line + '\n', code_container_id))
else:
# End of code block
in_code_block = False
code_language = None
# Send the closing fence
self.send(text_data=response_message(line + '\n', code_container_id))
# Create new response container for any following text
current_container_id = f"#response-message-{str(uuid.uuid4())}"
current_container_html = response_container(current_container_id)
self.send(text_data=current_container_html)
else:
# Regular content
container_id = code_container_id if in_code_block else current_container_id
self.send(text_data=response_message(line + '\n', container_id))
# Handle any remaining content in the buffer
if buffer:
container_id = code_container_id if in_code_block else current_container_id
self.send(text_data=response_message(buffer, container_id))
# . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
#
# ╔═╗╔═╗╔═╗
# ╠═╣╠═╝╠═╝
# ╩ ╩╩ ╩
app = Django(
# EXTRA_APPS=[
# "channels",
# ],
#
# Nanodjango doesn't yet support prepending "priority" apps to INSTALLED_APPS,
# and `daphne` must be the first app in INSTALLED_APPS.
INSTALLED_APPS=[
"daphne",
"django.contrib.admin",
"django.contrib.auth",
"django.contrib.contenttypes",
"django.contrib.sessions",
"django.contrib.messages",
"whitenoise.runserver_nostatic",
"django.contrib.staticfiles",
"channels",
],
CHANNEL_LAYERS={
"default": {
"BACKEND": "channels.layers.InMemoryChannelLayer",
},
},
ASGI_APPLICATION="__main__.htmx_websocket_interface",
)
# . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
#
# ┬ ┬┬─┐┬ ┌─┐
# │ │├┬┘│ └─┐
# └─┘┴└─┴─┘└─┘
app.route("/")(index)
websocket_urlpatterns = [
path("ws/echo/", EchoConsumer.as_asgi()),
]
htmx_websocket_interface = ProtocolTypeRouter(
{
"http": app.asgi,
"websocket": AuthMiddlewareStack(URLRouter(websocket_urlpatterns)),
}
)
# . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
#
# ┬ ┬ ┌┬┐
# │ │ │││
# ┴─┘┴─┘┴ ┴
_model = None
def get_model():
global _model
if _model is None:
active_model = llm.get_model()
_model = active_model.conversation()
return _model
# ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^
#
if __name__ == "__main__":
app.run()
import json
from datetime import datetime
from htpy import div, table, tr, td, a, thead, th, tbody
from markdown import markdown
from markupsafe import Markup
def formatted_response_message(message_text: str, id: str):
return div(
f"#response-message-{id}", data_theme="dark", class_="message response-message"
)[Markup(markdown(message_text, extensions=["fenced_code"]))]
def pretty_printed_response_message(message_text: str, model_name: str):
return div[
Markup(
markdown(
f"```json\n{json.dumps(json.loads(message_text), indent=2)}\n```",
extensions=["fenced_code"],
)
)
]
def formatted_response_message_swap_wrapper(message_text, id):
return div(id, hx_swap_oob="outerHTML")[
formatted_response_message(message_text, id)
]
def response_container(id, classname="response-message"):
return div("#message-list", hx_swap_oob="beforeend")[
div(id, class_=["message", classname], data_theme="dark")[
div(class_="markdown-content")[""], # Hidden element for raw markdown
div(class_="rendered-content")[""], # Visible element for rendered HTML
]
]
def user_message(message_text, label=None):
return div(class_=["message", "user-message"])[
[label, message_text] if label else message_text
]
def user_message_swap_wrapper(message_text):
return div("#message-list", hx_swap_oob="beforeend")[user_message(message_text)]
def system_message(message_text: str, label=None):
return div(class_=["message", "system-message"])[
[label, message_text] if label else message_text
]
def message_label(label: str, label_class: str = "message-label-response"):
return div(class_=["message-label", label_class])[label]
def message_label_swap_wrapper(label):
return div("#message-list", hx_swap_oob="beforeend")[message_label(label)]
def prompt_attachment(attachment):
return div(class_=["message", "prompt-attachment"])[attachment]
def conversations_list(conversations):
return table[
thead[
tr[
th["Conversation"],
th["Most Recent"],
th["Models"],
th["Schemas Used"],
th["System Prompt"],
th["Input Tokens"],
th["Output Tokens"],
]
],
tbody[
[
tr[
td[a(href=f"/conversation/{row['id']}")[row["name"]]],
td[
(
datetime.strptime(
row["most_recent"].replace("+00:00", ""),
"%Y-%m-%dT%H:%M:%S.%f",
).strftime("%b %-d, %Y")
if row["most_recent"]
else ""
)
],
td[row["models"]],
td[row["schemas_used"]],
td[row["has_system_prompt"]],
td[row["input_tokens"]],
td[row["output_tokens"]],
]
for row in conversations
]
],
]
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "nanodjango",
# "htpy",
# "sqlite-utils",
# "markdown",
# "markupsafe",
# ]
# ///
import json
from django.http import HttpResponse
from htpy import (
body,
div,
h1,
head,
script,
html,
meta,
link,
title,
main,
article,
ul,
li,
a,
img,
style,
)
from nanodjango import Django
from markdown import markdown
from markupsafe import Markup
from sqlite_utils import Database
from sqlite_utils.db import NotFoundError
from fullmoon.ui.components import (
formatted_response_message,
system_message,
user_message,
pretty_printed_response_message,
message_label,
prompt_attachment,
conversations_list,
)
# . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
#
# ┌┬┐┌─┐┌┬┐┌─┐┬ ┌─┐┌┬┐┌─┐
# │ ├┤ │││├─┘│ ├─┤ │ ├┤
# ┴ └─┘┴ ┴┴ ┴─┘┴ ┴ ┴ └─┘
def html_template(content, page_title="llm conversations"):
return html[
head[
meta(charset="utf-8"),
meta(name="viewport", content="width=device-width, initial-scale=1"),
title[page_title],
link(
rel="stylesheet",
href="https://cdn.jsdelivr.net/npm/@picocss/pico@2/css/pico.min.css",
),
link(
rel="stylesheet",
href="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/styles/default.min.css",
),
script(
src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/highlight.min.js"
),
script(
src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/languages/python.min.js"
),
style[
Markup(
"""
.message { padding: .5rem; }
.system-message {
font-weight: bold;
background-color: #333;
color: #fff;
border: 1px solid red;
border-radius: 0.5rem;
margin-bottom: 0.5rem;
}
.user-message {
border: 1px solid #999;
border-radius: 0.5rem;
margin-bottom: 0.5rem;
}
.response-message {
font-weight: bold;
background-color: #333;
border: 1px solid green;
border-radius: 0.5rem;
margin-bottom: 1rem;
}
.message-label {
margin-left: 0.5rem;
}
.message-label-system {
font-weight: bold;
font-size: 0.7rem;
color: red;
}
.message-label-user, .message-label-attachment {
font-weight: bold;
font-size: 0.7rem;
color: blue;
}
.message-label-model {
font-weight: bold;
font-size: 0.7rem;
color: green;
}
.prompt-attachment {
border: 1px solid #999;
border-radius: 0.5rem;
margin-bottom: 0.5rem;
}
.markdown-content {
display: none;
}
"""
)
],
],
body[
main(class_="container")[article[h1[page_title], div[content or "begin"]],],
script[
Markup(
"""
hljs.highlightAll();
"""
)
],
],
]
# . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
#
# ┬ ┬┬┌─┐┬ ┬┌─┐
# └┐┌┘│├┤ │││└─┐
# └┘ ┴└─┘└┴┘└─┘
def index(request):
conversations = []
for convo in conversations_table().rows:
models_used = set()
# tokens_used = defaultdict(int)
schemas_used = set()
most_recent_response_timestamp = None
input_tokens = 0
output_tokens = 0
has_system_prompt = False
has_attachments = False
responses = responses_table().rows_where(
select="id, model, input_tokens, output_tokens, schema_id, datetime_utc, system",
where="conversation_id = ?",
where_args=[convo["id"]],
)
for response in responses:
print(response)
# Tokens aren't tracked for all models (llama3, for example)
input_tokens += response["input_tokens"] or 0
output_tokens += response["output_tokens"] or 0
if response["schema_id"]:
schemas_used.add(response["schema_id"])
if response["model"]:
models_used.add(response["model"])
if response["system"]:
has_system_prompt = "True"
# if response["attachments"]:
# has_attachments = True
if (
most_recent_response_timestamp is None
or response["datetime_utc"] > most_recent_response_timestamp
):
most_recent_response_timestamp = response["datetime_utc"]
print(schemas_used)
conversations.append(
{
"id": convo["id"],
"models": models_used,
"name": convo["name"],
"most_recent": most_recent_response_timestamp,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"schemas_used": "✓" if len(schemas_used) > 0 else "",
"has_system_prompt": "✓" if has_system_prompt else "",
}
)
return HttpResponse(html_template(conversations_list(reversed(conversations))))
def prompt_attachments(response_id):
_attachments = list(
prompt_attachments_table().rows_where(
select="attachment_id", where="response_id = ?", where_args=[response_id]
)
)
if _attachments:
att_list = []
for _attachment in _attachments:
attachment = attachments_table().get(_attachment["attachment_id"])
if attachment["type"] in ["image/jpeg", "image/png", "image/gif"]:
if attachment["url"]:
att_list.append(
[
message_label("Attachment", "message-label-attachment"),
prompt_attachment(
a(href=attachment["url"], target="_blank")[
img(src=attachment["url"])
]
),
]
)
if attachment["path"]:
att_list.append(
[
message_label("Attachment", "message-label-attachment"),
prompt_attachment(f"Attachment: {attachment['path']}"),
]
)
else:
att_list.append(
div[f"Unsupported attachment type: {attachment['type']}"]
)
return div[att_list]
return None
def conversation_detail(request, conversation_id):
conversation = conversations_table().get(conversation_id)
def _system(row):
return (
[
message_label("System", "message-label-system"),
system_message(row["system"]),
]
if row["system"]
else None
)
def _prompt(row):
return (
[message_label("Prompt", "message-label-user"), user_message(row["prompt"])]
if row["prompt"]
else None
)
def _response(row):
_schema = None
try:
_schema = schemas_table().get(row["schema_id"])
except NotFoundError as e:
pass
if _schema:
formatted_response = pretty_printed_response_message(
row["response"], row["model"]
)
schema = [message_label("Schema", "message-label-schema"), pretty_printed_response_message(
_schema["content"], "Schema"
)]
else:
formatted_response = formatted_response_message(row["response"], row["id"])
schema = None
return [
[message_label(row["model"], "message-label-model"), formatted_response],
schema,
]
responses = [
div(id=row["id"])[
div[_system(row)] if _system(row) else None,
div[_response(row)[1]] if _response(row)[1] else None,
(
div[_prompt(row), prompt_attachments(row["id"])]
if prompt_attachments(row["id"])
else _prompt(row)
),
div[_response(row)[0]],
]
for row in responses_table().rows
if row["conversation_id"] == conversation_id
]
return HttpResponse(
html_template(responses, page_title=f"Conversation: {conversation['name']}")
)
def _get_schema_display_text(schema):
properties = schema.get("properties", {})
if list(properties.keys())[0] == "items":
# This is a schema-multi, get the nested properties
nested_props = properties["items"].get("items", {}).get("properties", {})
return "Multi: " + ", ".join(nested_props.keys())
else:
# Regular schema
return ", ".join(properties.keys())
def schema_list(request):
schemas = [
ul[
li[
a(href=f"/schemas/{row['id']}")[
_get_schema_display_text(json.loads(row["content"]))
]
]
]
for row in schemas_table().rows
]
return HttpResponse(html_template(schemas, page_title="Schemas"))
def schema_detail(request, schema_id):
schema = schemas_table().get(schema_id)
return HttpResponse(
html_template(
div[
Markup(
markdown(
f"```json\n{json.dumps(json.loads(schema['content']), indent=2)}\n```",
extensions=["fenced_code"],
)
)
],
page_title=f"schema {schema['id']}",
)
)
#
# API
#
def list_conversations(request):
conversations = [
{
"id": row["id"],
"model": row["model"],
"name": row["name"],
}
for row in conversations_table().rows
]
return conversations
# . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
#
# ╔═╗╔═╗╔═╗
# ╠═╣╠═╝╠═╝
# ╩ ╩╩ ╩
app = Django()
_db_path = None
def get_llm_db():
import subprocess
global _db_path
if _db_path is None:
_db_path = subprocess.check_output(["llm", "logs", "path"]).decode().strip()
return Database(_db_path)
def conversations_table():
db = get_llm_db()
return db["conversations"]
def responses_table():
db = get_llm_db()
return db["responses"]
def schemas_table():
db = get_llm_db()
return db["schemas"]
def prompt_attachments_table():
db = get_llm_db()
return db["prompt_attachments"]
def attachments_table():
db = get_llm_db()
return db["attachments"]
# . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
#
# ┬ ┬┬─┐┬ ┌─┐
# │ │├┬┘│ └─┐
# └─┘┴└─┴─┘└─┘
app.route("/")(index)
app.route("/conversation/<conversation_id>/")(conversation_detail)
app.route("/schemas/")(schema_list)
app.route("/schemas/<schema_id>/")(schema_detail)
# app.api.get("/list-conversations")(list_conversations)
# ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^ ^^^^
#
if __name__ == "__main__":
app.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment