Created
November 11, 2015 21:03
-
-
Save pleasetrythisathome/3f1fb830686abb429698 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns populace.web.chsk | |
(:require #?@ | |
(:clj | |
[[aleph.http :as http] | |
[bidi.bidi :refer (path-for RouteProvider tag)] | |
[bolt.authentication :refer :all] | |
[bolt.authentication.protocols :refer (RequestAuthenticator)] | |
[byte-streams :as bs] | |
[hara.event :refer :all] | |
[ring.middleware.transit :refer [encode decode]] | |
[holon.datomic.utils :refer :all] | |
[ib5k.component.ctr :as ctr] | |
[juxt.datomic.extras :refer (DatomicConnection as-conn as-db to-ref-id to-entity-map)] | |
[manifold.stream :as m] | |
[manifold.deferred :as d] | |
[manifold.bus :as bus] | |
[plumbing.core :refer :all] | |
[schema.core :as s] | |
[taoensso.timbre :as log]] | |
:cljs | |
[[cljs.core.async :refer [chan put! <! close! timeout]] | |
[cljs.reader :as edn] | |
[datascript.core :as d] | |
[ib5k.component.ctr :as ctr] | |
[om.next :as om :refer-macros [defui]] | |
[populace.utils :as u] | |
[quile.component | |
:as component :refer [Lifecycle system-map system-using using]] | |
[schema.core :as s :include-macros true] | |
[shodan.console :as c :include-macros true]]) | |
[cognitect.transit :as t] | |
[clojure.string :as str]) | |
#?(:cljs | |
(:require-macros [cljs.core.async.macros :refer [go go-loop]]))) | |
#?(:clj | |
(do | |
(def non-websocket-request | |
{:status 400 | |
:headers {"content-type" "application/text"} | |
:body "Expected a websocket request."}) | |
(def clients (atom {})) | |
(def client-send (bus/event-bus)) | |
(def client-receive (bus/event-bus)) | |
(defn connect-client | |
[uid ws] | |
(swap! clients update uid (fnil conj #{}) ws)) | |
(defn disconnect-client | |
[uid ws] | |
(swap! clients update uid disj ws) | |
(when-not (seq (get @clients uid)) | |
(swap! clients dissoc uid)) | |
@clients) | |
(defn create-websocket | |
[req uid] | |
(d/let-flow [ws (http/websocket-connection req)] | |
(m/on-closed ws (partial disconnect-client uid ws)) | |
(connect-client uid ws) | |
ws)) | |
(defn client-wbsk-handler | |
[{:keys [oauth-client]} req] | |
(d/let-flow [uid (:bolt/subject-identifier (authenticate oauth-client req)) | |
conn (d/catch | |
(create-websocket req uid) | |
(fn [_] nil))] | |
(if-not conn | |
non-websocket-request | |
(do | |
(m/connect (bus/subscribe client-send uid) conn) | |
(m/consume | |
#(bus/publish! client-receive uid (decode %)) | |
(m/buffer 100 conn)))))) | |
(defn push-client! [uid data] | |
(bus/publish! client-send uid (encode data))) | |
(s/defrecord ChannelSocket | |
[context :- s/Str | |
oauth-client :- (s/protocol RequestAuthenticator)] | |
RouteProvider | |
(routes [this] | |
[context (partial client-wbsk-handler this)]))) | |
:cljs | |
(do | |
(defn create-websocket | |
[context] | |
(when-let [WebSocket (or (u/oget js/window "WebSocket") | |
(u/oget js/window "MozWebSocket"))] | |
(WebSocket. (-> (.. js/window -location -origin) | |
(str/replace #"http://" "ws://") | |
(str context))))) | |
(defn connect-websocket | |
[context {:keys [send receive]}] | |
(when-let [socket (create-websocket context)] | |
(set! (.-onerror socket) | |
(fn [e] | |
(c/error "websocket error" e))) | |
(set! (.-onopen socket) | |
(fn [] | |
(c/log "websocket opened"))) | |
(set! (.-onmessage socket) | |
(fn [msg] | |
(put! receive (t/read (t/reader :json) (.-data msg))))) | |
(set! (.-onclose socket) | |
(fn [] | |
(c/log "websocket closed"))) | |
socket)) | |
(s/defrecord ChannelSocket | |
[context | |
conn | |
reconciler] | |
Lifecycle | |
(start [this] | |
(let [send (chan) | |
receive (chan) | |
socket (connect-websocket context {:send send | |
:receive receive})] | |
(go-loop [] | |
(when-let [v (<! send)] | |
(.send socket (t/write (t/writer :json) v)) | |
(recur))) | |
(go-loop [] | |
(when-let [delta (<! receive)] | |
;; (c/log "chsk" (pr-str delta)) | |
(om/merge! (:reconciler reconciler) delta) | |
(recur))) | |
(assoc this :stop! (fn [] | |
(close! send) | |
(close! receive) | |
(.close socket))))) | |
(stop [this] | |
(some-> this :stop! (apply [])) | |
this)))) | |
(def new-channel-socket | |
(-> map->ChannelSocket | |
(ctr/wrap-defaults {:context "/chsk"}) | |
(ctr/wrap-using #?(:clj [:oauth-client] | |
:cljs [:conn :reconciler])) | |
(ctr/wrap-kargs))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment