Skip to content

Instantly share code, notes, and snippets.

@amirouche
Created March 6, 2022 18:13
Show Gist options
  • Save amirouche/45a6a7677e531ae55a9df21432dcbfe8 to your computer and use it in GitHub Desktop.
Save amirouche/45a6a7677e531ae55a9df21432dcbfe8 to your computer and use it in GitHub Desktop.
Frontend controlled by the backend
from collections.abc import Iterable
from uuid import uuid4
from hyperdev.base import HyperDevException
class FrontendException(HyperDevException):
pass
class HTMLElement: # inspired from nevow
"""Python representaiton of html nodes.
Text nodes are python strings.
You must not instantiate this class directly, instead use the
global instance of the class `HTMLElementSugar` called `h`.
"""
__slots__ = ('_tag', '_children', '_properties')
def __init__(self, tag):
self._tag = tag
self._children = list()
self._properties = dict()
def __call__(self, **properties):
# HTMLElement is a callable to be able to implement the
# intented interface such as h.div(**properties) see
# HTMLElementSugar. Replace existing properties, even if it
# is not meant to be called more than once, and try to make
# sure of that.
assert not self._properties, "Invalid use of HTMLElement"
self._properties = properties
return self
def __repr__(self):
return '<HTMLElement: %s %s>' % (self._tag, self._properties)
def extend(self, nodes):
for node in nodes:
self.append(node)
def append(self, node):
"""Append a single node or string as a child"""
# Sometime it is handy to instanciate an HTML node, and then
# add children to it, prolly in a loop, so that the html code
# can be read from top to bottom.
#
# Instead of the following:
#
# items = [h.li()[f"Item number {index}" for index in range(10)]]
# container = h.ul(id="root")[items]
#
# In the above the children are instanciated before
# use. Instead one can do the following:
#
# container = h.ul(id="root")
# for index in range(10):
# container.append(h.li()[f"Item number {index}"])
#
# The code flows in way that is similar to how the html will
# be rendered.
#
assert isinstance(node, (str, float, int, HTMLElement)), "Invalid node type"
self._children.append(node)
def __getitem__(self, something):
"""Add `something` as children of the node"""
assert isinstance(something, (str, float, int, Iterable, HTMLElement))
if isinstance(something, (str, float, int, HTMLElement)):
self.append(something)
elif isinstance(something, Iterable):
for child in something:
# HTMLElement.append will assert child is valid.
self.append(child)
else:
# Oops!
raise FrontendException("Unexpected child node: {}".format(something))
# Returning self, allows method chaining, such as:
#
# h.div(id="root")[h.h1()["hello"]]
#
# Withtout the following return the root div, would not have a
# h1 as child.
return self
class HTMLElementSugar:
"""Sugar syntax for building HTML with the help of `HTMLElement`.
E.g.
h.div(id="container", Class="minimal thing", For="something")["Héllo World!"]
container = h.div(id="container", Class="minimal thing")
container.append("Héllo World!")
Properties are optional, subscripting to add children is also
optional. So the general pattern is the following:
h.tag(**properties)
Or with children:
h.tag(**properties)[children]
In particular, `h.tag[something]` or even `h.tag` is invalid, such
as `<br/>` must be instanciated with:
h.br()
Note: Autoclosing tags are handled browser side with javascript.
"""
def __getattr__(self, tag):
return HTMLElement(tag)
h = HTMLElementSugar()
def serialize(element):
"""Convert an `HTMLElement` hierarchy to a json string.
Returns two values:
- The composition of python dict, list, int, float, str
- ie. json-like representation
- An event dictionary mapping keys to callables
"""
# Events maps unique identifiers to python callables, they are
# gathered from the whole HTMLElement tree.
events = dict()
PROPERTIES = 1
CHILDREN = 2
def recurse(element):
"""Recursively convert `element` into json-like python structure"""
if isinstance(element, (str, float, int)):
return element
elif isinstance(element, HTMLElement):
out = [
element._tag,
dict(),
None,
]
for key, value in element._properties.items():
if key.startswith('on'):
uid = uuid4().hex
# XXX: mutate non-local dictionary called `events`
assert callable(value)
events[uid] = value
out[PROPERTIES][key] = uid
else:
if key == "Class":
out[PROPERTIES]["class"] = value
else:
out[PROPERTIES][key] = value
out[CHILDREN] = [recurse(child) for child in element._children]
return out
else:
raise FrontendException("Invalid element: {}".format(element))
out = recurse(element)
return out, events
import * as preact from 'https://unpkg.com/preact?module';
console.log("echo hello world");
let root = document.getElementById('root');
// TODO: support https/wss
let ws = new WebSocket(`ws://${window.location.host}/api/websocket`);
function makeEventHandlerCallback(name, uid) {
return function(event) {
event.preventDefault();
let msg = {
type: 'dom-event',
name: name,
uid: uid,
path: location.pathname,
payload: {'target.value': event.target.value},
};
console.log('send', msg);
ws.send(JSON.stringify(msg));
return false;
}
}
// Translate json to preact vdom node
let TAG = 0;
let PROPERTIES = 1;
let CHILDREN = 2;
function translate(json) {
// create callbacks
Object.keys(json[PROPERTIES]).forEach(function(key) {
// If the key starts with on, it must be an event handler,
// replace the value with a callback that sends the event
// to the backend.
if (key.startsWith('on')) {
json[PROPERTIES][key] = makeEventHandlerCallback(key, json[PROPERTIES][key]);
}
});
let children = json[CHILDREN].map(function(child) {
if (child instanceof Array) {
// recurse
return translate(child);
} else { // it's a string or a number
return child;
}
});
return preact.h(json[TAG], json[PROPERTIES], children);
}
ws.onmessage = function(msg) {
msg = JSON.parse(msg.data);
console.log('onmessage', msg);
let app = translate(msg);
preact.render(app, root);
let input = document.querySelector("#input");
input.scrollIntoView();
input.focus();
}
ws.onopen = function (_) {
let msg = {
type: 'init',
path: location.pathname,
};
ws.send(JSON.stringify(msg));
};
ws.onclose = function (_) {
window.location.reload();
};
document.addEventListener('keyup', (event) => {
const keyName = event.key;
if (keyName === '/' && event.ctrlKey) {
let input = document.querySelector("#input");
input.scrollIntoView();
input.focus();
}
}, false);
from urllib.parse import urljoin
import json
import asyncio
import time
import sys
from pathlib import Path
from collections import namedtuple
from mimetypes import guess_type
from lxml.html import fromstring as string2html
from pampy import match, _
from loguru import logger as log
import httpx
import found
from hyperdev.base import HyperDevException
from hyperdev import frontend
from hyperdev.frontend import h
import pstore
from html2text import html2text
from string import punctuation
from collections import Counter
ROOT = Path(__file__).parent.resolve()
log.info('That is beautiful, and simple logging')
HELP = """Welcome!
- Type anything to search with duckduckgo;
- To use the builtin search engine use `search my keywords ...` that
will search for `my keywords ...` inside babelia's very own index;
- To iterate over search result use `next`;
- To view the last url, use `view`;
- To bookmark, and index the last url, use the command `bookmark`;
That is all... foxes!
"""
def bagomatic(html):
try:
text = html2text(html)
text = ''.join(' ' if x in punctuation else x for x in text)
bag = Counter(x for x in text.split() if 256 > len(x) > 3)
return bag
except Exception:
return None
def html2h(html, stack=None):
# remove useless stuff
if html.tag in ["script", "style", 'form', 'input', 'button',
'textarea', 'svg', 'nav', 'header', 'footer',
'include-fragment', 'fragment']:
return ""
# remove comments
if not isinstance(html.tag, str):
return ""
# try to sanitize links
if html.tag == 'a' and html.attrib.get('href', '').startswith('https://'):
attrib = dict(href=html.attrib.get('href'), target="_blank")
else:
attrib = dict()
if stack is None:
stack = list()
stack.append(html.tag)
try:
stack.index('pre')
except ValueError:
pre = False
else:
pre = True
# create children
children = list()
if html.text and pre:
children.append(html.text)
elif html.text and not pre and html.text.strip():
children.append(html.text.strip())
for child in html.iterchildren():
children.append(html2h(child, stack))
if html.tail and pre:
children.append(html.tail)
elif html.tail and not pre and html.tail.strip():
children.append(html.tail.strip())
if all(not x.strip() if isinstance(x, str) else (not x) for x in children):
return ""
else:
node = getattr(h, html.tag)(**attrib)
node.extend(children)
return node
def make_timestamper():
start_monotonic = time.monotonic()
start = time.time()
loop = asyncio.get_event_loop()
def timestamp():
# Wanna be faster than datetime.now().timestamp()
# approximation of current epoch time in float seconds
out = start + loop.time() - start_monotonic
return out
return timestamp
Application = namedtuple('Application', ('database', 'http', 'make_timestamp', 'index'))
async def make_application():
# setup loguru logging
# TODO: integrate with python's logging
log.remove()
log.add(sys.stderr, enqueue=True)
# setup app
make_timestamp = make_timestamper()
http = httpx.AsyncClient()
database = await found.open()
app = Application(
database,
http,
make_timestamp,
pstore.make('index', ('indexv1',)),
)
return app
async def view_index(app, scope):
index = ROOT / 'index.html'
with index.open('rb') as f:
index = f.read()
return 200, b'text/html', index
application = None
async def main(scope, receive, send):
log.debug('Scope: {}', scope)
global application
if application is None:
application = await make_application()
try:
if scope['type'] == 'http':
out = await http(application, scope, receive, send)
return out
elif scope['type'] == 'websocket':
out = await websocket(application, scope, receive, send)
return out
else:
raise HyperDevException('unknown scope type')
except Exception as exc:
log.exception(exc)
async def websocket(application, scope, receive, send):
# https://asgi.readthedocs.io/en/latest/specs/www.html#websocket
assert scope['type'] == 'websocket'
events = dict()
previous = dict()
history = []
context = dict()
event = await receive()
assert event['type'] == 'websocket.connect'
# TODO: Accept only when path == /api/websocket
# TODO: Authentication
await send({'type': 'websocket.accept'})
while True:
event = await receive()
# TODO: support type == websocket.close
assert event['type'] == 'websocket.receive'
message = json.loads(event['text'])
log.debug(message)
if message['type'] == 'init':
async def onQuery(event):
query = event['payload']['target.value']
context['query'] = query = query or context['query']
user = h.div(Class="babelia-convo")[
h.span()["🐵 " + query],
]
history.append(user)
if query == 'next':
try:
hit = await context['hits'].__anext__()
except StopAsyncIteration:
pass
else:
context["hit"] = hit
history.append(h.div(Class="babelia-convo")[
h.a(href=hit, target="_blank")[hit]
])
elif query == 'view':
response = await application.http.get(context['hit'])
html = string2html(response.text)
body = html.xpath('//body')[0]
body.tag = 'div'
if 'github.com' in context['hit'] and body.xpath('//*[@id="readme"]'):
body = body.xpath('//*[@id="readme"]')[0]
if 'stackoverflow.com' in context['hit'] and body.xpath('//*[@id="content"]'):
body = body.xpath('//*[@id="content"]')[0]
for a in body.xpath('//a'):
href = a.attrib.get('href', '')
href = urljoin(context['hit'], href)
a.attrib['href'] = href
html = html2h(body)
convo = h.div(Class="babelia-convo")
convo.append(html)
history.append(convo)
elif query == 'bookmark':
try:
url = context['hit']
response = await application.http.get(url)
bag = bagomatic(response.text)
await found.transactional(
application.database,
pstore.index,
application.index,
url,
bag,
)
except Exception:
computer = h.p()
computer.append("🤖 ")
computer.append("There was an error :/")
history.append(h.div(Class="babelia-convo")[computer])
else:
context["hit"] = url
computer = h.p()
computer.append("🤖 ")
computer.append("bookmarked :]")
history.append(h.div(Class="babelia-convo")[computer])
elif query.startswith('search'):
keywords = query[len("search"):].strip().split()
out = await found.transactional(
application.database,
pstore.search,
application.index,
keywords,
)
async def shim():
for hit in out:
yield hit[0]
context['hits'] = hits = shim()
try:
hit = await hits.__anext__()
except StopAsyncIteration:
pass
else:
context["hit"] = hit
computer = h.p()
computer.append("🤖 ")
computer.append(h.a(href=hit, target="_blank")[hit])
history.append(h.div(Class="babelia-convo")[computer])
else:
from hyperdev.raxes import search
hits = search(query)
context['hits'] = hits
try:
hit = await hits.__anext__()
except StopAsyncIteration:
pass
else:
context["hit"] = hit
computer = h.p()
computer.append("🤖 ")
computer.append(h.a(href=hit, target="_blank")[hit])
history.append(h.div(Class="babelia-convo")[computer])
user = h.div(Class="babelia-convo")[
h.label(Class="blink", For="input")["🐵 "],
h.input(
id="input",
type="text",
onChange=onQuery,
value="",
placeholder="somewhere over the rainbow..."
),
]
html = h.div()
html.extend(history)
html.append(user)
babelia = h.div(Class="babelia-convo")[
h.input(readonly=True)
]
html.append(babelia)
html, events = frontend.serialize(html)
return html, events
babelia = h.div(Class="babelia-convo")[
h.pre()["🤖 ", HELP]
]
user = h.div(Class="babelia-convo")[
h.label(Class="blink", For="input")["🐵 "],
h.input(
id="input",
type="text",
onChange=onQuery,
placeholder="somewhere over the rainbow...",
value=""
),
]
html = h.div()
history = [babelia]
html.extend(history)
html.append(user)
babelia = h.div(Class="babelia-convo")[
h.input(readonly=True)
]
html.append(babelia)
html, events = frontend.serialize(html)
previous = events
await send({'type': 'websocket.send', 'text': json.dumps(html)})
elif message['type'] == 'dom-event':
try:
handler = events[message['uid']]
except KeyError:
await send({'type': 'websocket.send', 'text': json.dumps(html)})
else:
html, new_events = await handler(message)
events = dict(new_events)
events.update(previous)
previous = new_events
await send({'type': 'websocket.send', 'text': json.dumps(html)})
else:
log.critical('Unknown event type: {}', message['type'])
async def http(application, scope, receive, send):
assert scope['type'] == 'http'
path = scope['path']
if path.startswith('/static/'):
# XXX: Secure the /static/* route, and avoid people poking at
# files that are not in the local ./static/
# directory. Security can be as simple as that.
if '..' in path:
await send({
'type': 'http.response.start',
'status': 404,
})
await send({
'type': 'http.response.body',
'body': b"File not found",
})
else:
components = path.split('/')
filename = components[-1]
filepath = ROOT / '/'.join(components[1:])
mimetype = guess_type(filename)[0] or 'application/octet-stream'
await send({
'type': 'http.response.start',
'status': 200,
'headers': [
[b'content-type', mimetype.encode('utf8')],
],
})
with filepath.open('rb') as f:
await send({
'type': 'http.response.body',
'body': f.read(),
})
elif path == '/favicon.ico':
await send({
'type': 'http.response.start',
'status': 200,
})
await send({
'type': 'http.response.body',
'body': b"File not found",
})
elif not path.endswith('/'):
# XXX: All paths but static path must end with a slash. That
# is a dubious choice when considering files, possibly large
# files served dynamically.
# XXX: Also at this time it is not used, since all HTTP path
# serve the ./index.html stuff which always connect via
# websockets (and there is no check on the websocket path).
path += '/'
await send({
'type': 'http.response.start',
'status': 301,
'headers': [
[b'location', path.encode('utf8')],
],
})
await send({
'type': 'http.response.body',
'body': b"Moved permanently",
})
else:
path = tuple(path.split('/')[1:-1])
# TODO: match on the HTTP method too, and fallback to 404
view = match(path,
_, lambda x: view_index,
)
log.debug(view)
# XXX: the body must be bytes, maybe it would be wise to also
# support sendfiles and / or a body that is a bytes generator
code, mimetype, body = await view(application, scope)
await send({
'type': 'http.response.start',
'status': code,
'headers': [
[b'content-type', mimetype],
],
})
await send({
'type': 'http.response.body',
'body': body,
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment