Skip to content

Instantly share code, notes, and snippets.

@jdf-id-au
Created May 16, 2021 06:57
Show Gist options
  • Save jdf-id-au/335744a20d78b2e61703ecf9e49c7511 to your computer and use it in GitHub Desktop.
Save jdf-id-au/335744a20d78b2e61703ecf9e49c7511 to your computer and use it in GitHub Desktop.
Fragment outgoing websocket text messages in yada and aleph
(ns fragmenter
"Support fragmenting outgoing websocket text messages,
while waiting for yada to incorporpate https://github.com/clj-commons/aleph/pull/502
( https://github.com/juxt/yada/issues/266 )"
(:require [taoensso.timbre :as log])
(:import (io.netty.channel ChannelOutboundHandler ChannelFutureListener ChannelPipeline ChannelHandler)
(io.netty.handler.codec.http.websocketx TextWebSocketFrame ContinuationWebSocketFrame)))
(defprotocol Fragmentable
(smash [this frag-size]))
(extend-protocol Fragmentable
TextWebSocketFrame
(smash [this frag-size]
(let [rsv (.rsv this)
content (.content this)
_ (.readerIndex content 0)
[f & r :as frags]
(loop [acc []
remaining (.readableBytes content)]
(if (<= remaining frag-size)
(if (seq acc)
(conj acc (ContinuationWebSocketFrame. true rsv (.readRetainedSlice content remaining)))
[this])
(if (seq acc)
(recur (conj acc (ContinuationWebSocketFrame. false rsv (.readRetainedSlice content frag-size)))
(.readableBytes content))
(recur (conj acc (TextWebSocketFrame. false rsv (.readRetainedSlice content frag-size)))
(.readableBytes content)))))]
(when r (.release this))
frags))
Object
(smash [this _]
[this]))
(defn fragmenter [frame-size]
(reify
ChannelOutboundHandler
(bind [_ ctx localAddress promise]
(.bind ctx localAddress promise))
(connect [_ ctx remoteAddress localAddress promise]
(.connect ctx remoteAddress localAddress promise))
(disconnect [_ ctx promise]
(.disconnect ctx promise))
(close [_ ctx promise]
(.close ctx promise))
(deregister [_ ctx promise]
(.deregister ctx promise))
(read [_ ctx]
(.read ctx))
(write [_ ctx msg promise]
; promise – the ChannelPromise to notify once the operation completes
; AbstractChannelHandlerContext.write returns the promise as a ChannelFuture
(let [frags (smash msg frame-size)
last-frag (peek frags)] ; FIXME relies on no earlier frag being = to last-frag
(reduce
(fn [_ frag]
(if (= frag last-frag)
(.write ctx frag promise)
(-> (.write ctx frag)
(.addListener
(reify ChannelFutureListener
(operationComplete [_ f]
(if-not (.isSuccess f)
(log/warn "Unable to send fragment" frag "to" (-> ctx .channel .id .asShortText) (.cause f)))))))))
nil
frags)))
(flush [_ ctx]
(.flush ctx))
ChannelHandler
(handlerAdded [_ ctx] #_(log/info "Fragmenter added to" (-> ctx .channel .id .asShortText)))
(handlerRemoved [_ ctx] #_(log/info "Fragmenter removed from" (-> ctx .channel .id .asShortText)))))
(defn insert-fragmenter [after]
; FIXME should be after (meaning "before" for outgoing messages) "deflater", but not currently using compression
(fn [^ChannelPipeline pipeline]
(.addAfter pipeline after "fragmenter" (fragmenter (* 16 1024))))) ; FIXME should match max-chunk-size
(ns speak-to-yada
(:require [yada.yada :as yada]
[aleph.http :as http]
[fragmenter :refer [insert-fragmenter]]))
#_ (yada/listener routes... {:port port...
:pipeline-transform (insert-fragmenter "http-server")})
#_ (http/websocket-client ws... {:pipeline-transform (insert-fragmenter "http-client")})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment