Created
March 6, 2025 19:56
-
-
Save unbracketed/aa72e5d314302e04cc49112279b5884f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# /// script | |
# requires-python = ">=3.12" | |
# dependencies = [ | |
# "nanodjango", | |
# "channels", | |
# "daphne", | |
# "htpy", | |
# "markdown", | |
# "markupsafe", | |
# "llm" | |
# ] | |
# /// | |
import json | |
import uuid | |
from channels.generic.websocket import WebsocketConsumer | |
from channels.routing import ProtocolTypeRouter, URLRouter | |
from channels.auth import AuthMiddlewareStack | |
from django.http import HttpResponse | |
from django.urls import path | |
from markupsafe import Markup | |
from markdown import markdown | |
from htpy import ( | |
body, | |
button, | |
div, | |
form, | |
h1, | |
head, | |
html, | |
input, | |
meta, | |
script, | |
link, | |
title, | |
main, | |
style, | |
fieldset, | |
article, | |
) | |
from nanodjango import Django | |
import llm | |
model = llm.get_model("4o") | |
def html_template(): | |
return html[ | |
head[ | |
meta(charset="utf-8"), | |
meta(name="viewport", content="width=device-width, initial-scale=1"), | |
title["llm chat"], | |
script(src="https://unpkg.com/[email protected]"), | |
script(src="https://unpkg.com/[email protected]"), | |
script(src="https://cdn.jsdelivr.net/npm/marked/marked.min.js"), | |
link( | |
rel="stylesheet", | |
href="https://cdn.jsdelivr.net/npm/@picocss/pico@2/css/pico.min.css", | |
), | |
style[ | |
Markup(""" | |
.message { padding: .5rem; } | |
.message.user-message { | |
border: 1px solid #999; | |
border-radius: 0.5rem; | |
margin-bottom: 0.5rem; | |
} | |
.message.echo-message { | |
font-weight: bold; | |
border: 1px solid green; | |
border-radius: 0.5rem; | |
margin-bottom: 1rem; | |
} | |
""") | |
], | |
script[ | |
Markup(""" | |
// Create a MutationObserver to watch for content changes | |
const observer = new MutationObserver((mutations) => { | |
mutations.forEach((mutation) => { | |
if (mutation.type === 'childList' && mutation.target.classList.contains('echo-message')) { | |
// Only parse if the content isn't already HTML | |
if (!mutation.target.innerHTML.includes('<')) { | |
mutation.target.innerHTML = marked.parse(mutation.target.innerHTML); | |
} | |
} | |
}); | |
}); | |
// Start observing the message list for changes | |
document.addEventListener('DOMContentLoaded', () => { | |
const messageList = document.getElementById('message-list'); | |
if (messageList) { | |
observer.observe(messageList, { | |
childList: true, | |
subtree: true, | |
characterData: true | |
}); | |
} | |
}); | |
""") | |
], | |
], | |
body[ | |
main(class_="container")[ | |
article[ | |
h1["Chat with GPT-4o"], | |
div(hx_ext="ws", ws_connect="/ws/echo/")[ | |
div("#message-list"), | |
form(ws_send=True)[ | |
fieldset(role="group")[ | |
input( | |
name="message", | |
type="text", | |
placeholder="Type your message...", | |
autocomplete="off", | |
), | |
button( | |
class_="primary outline", | |
type="submit", | |
onclick="setTimeout(() => this.closest('form').querySelector('input[name=message]').value = '', 0)", | |
)["↩"], | |
] | |
], | |
], | |
], | |
] | |
], | |
] | |
def response_message(message_text, id): | |
return div("#message-list", hx_swap_oob=f"beforeend:{id}")[message_text] | |
def formatted_response_message(message_text, id): | |
return div(id, hx_swap_oob="outerHTML")[ | |
Markup(markdown(message_text, extensions=['fenced_code'])) | |
] | |
def response_markdown_trigger(id): | |
return json.dumps({ | |
"type": "parse-markdown", | |
"elementId": id | |
}) | |
def response_container(id): | |
return div("#message-list", hx_swap_oob="beforeend")[ | |
div(id, class_=["message", "echo-message"]) | |
] | |
def user_message(message_text): | |
return div("#message-list", hx_swap_oob="beforeend")[ | |
div(class_=["message", "user-message"])[ | |
message_text | |
] | |
] | |
app = Django( | |
# EXTRA_APPS=[ | |
# "channels", | |
# ], | |
# | |
# Nanodjango doesn't yet support prepending "priority" apps to INSTALLED_APPS, | |
# and `daphne` must be the first app in INSTALLED_APPS. | |
INSTALLED_APPS=[ | |
"daphne", | |
"django.contrib.admin", | |
"django.contrib.auth", | |
"django.contrib.contenttypes", | |
"django.contrib.sessions", | |
"django.contrib.messages", | |
"whitenoise.runserver_nostatic", | |
"django.contrib.staticfiles", | |
"channels", | |
], | |
CHANNEL_LAYERS={ | |
"default": { | |
"BACKEND": "channels.layers.InMemoryChannelLayer", | |
}, | |
}, | |
ASGI_APPLICATION="__main__.htmx_websocket_app", | |
) | |
@app.route("/") | |
def index(request): | |
return HttpResponse(html_template()) | |
class EchoConsumer(WebsocketConsumer): | |
def receive(self, text_data): | |
text_data_json = json.loads(text_data) | |
message_text = text_data_json.get("message", "") | |
if not message_text.strip(): | |
return | |
user_message_html = user_message(message_text) | |
self.send(text_data=user_message_html) | |
response = model.prompt(message_text) | |
response_container_id = f"#echo-message-{str(uuid.uuid4())}" | |
response_container_html = response_container(response_container_id) | |
self.send(text_data=response_container_html) | |
full_response = "" | |
for chunk in response: | |
full_response += chunk | |
echo_message_html = response_message(chunk, response_container_id) | |
self.send(text_data=echo_message_html) | |
self.send(text_data=formatted_response_message(full_response, response_container_id)) | |
websocket_urlpatterns = [ | |
path("ws/echo/", EchoConsumer.as_asgi()), | |
] | |
# Configure ASGI application. Channels | |
htmx_websocket_app = ProtocolTypeRouter( | |
{ | |
"http": app.asgi, | |
"websocket": AuthMiddlewareStack(URLRouter(websocket_urlpatterns)), | |
} | |
) | |
if __name__ == "__main__": | |
app.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment