Created
May 16, 2021 06:57
-
-
Save jdf-id-au/335744a20d78b2e61703ecf9e49c7511 to your computer and use it in GitHub Desktop.
Fragment outgoing websocket text messages in yada and aleph
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns fragmenter | |
"Support fragmenting outgoing websocket text messages, | |
while waiting for yada to incorporpate https://github.com/clj-commons/aleph/pull/502 | |
( https://github.com/juxt/yada/issues/266 )" | |
(:require [taoensso.timbre :as log]) | |
(:import (io.netty.channel ChannelOutboundHandler ChannelFutureListener ChannelPipeline ChannelHandler) | |
(io.netty.handler.codec.http.websocketx TextWebSocketFrame ContinuationWebSocketFrame))) | |
(defprotocol Fragmentable | |
(smash [this frag-size])) | |
(extend-protocol Fragmentable | |
TextWebSocketFrame | |
(smash [this frag-size] | |
(let [rsv (.rsv this) | |
content (.content this) | |
_ (.readerIndex content 0) | |
[f & r :as frags] | |
(loop [acc [] | |
remaining (.readableBytes content)] | |
(if (<= remaining frag-size) | |
(if (seq acc) | |
(conj acc (ContinuationWebSocketFrame. true rsv (.readRetainedSlice content remaining))) | |
[this]) | |
(if (seq acc) | |
(recur (conj acc (ContinuationWebSocketFrame. false rsv (.readRetainedSlice content frag-size))) | |
(.readableBytes content)) | |
(recur (conj acc (TextWebSocketFrame. false rsv (.readRetainedSlice content frag-size))) | |
(.readableBytes content)))))] | |
(when r (.release this)) | |
frags)) | |
Object | |
(smash [this _] | |
[this])) | |
(defn fragmenter [frame-size] | |
(reify | |
ChannelOutboundHandler | |
(bind [_ ctx localAddress promise] | |
(.bind ctx localAddress promise)) | |
(connect [_ ctx remoteAddress localAddress promise] | |
(.connect ctx remoteAddress localAddress promise)) | |
(disconnect [_ ctx promise] | |
(.disconnect ctx promise)) | |
(close [_ ctx promise] | |
(.close ctx promise)) | |
(deregister [_ ctx promise] | |
(.deregister ctx promise)) | |
(read [_ ctx] | |
(.read ctx)) | |
(write [_ ctx msg promise] | |
; promise – the ChannelPromise to notify once the operation completes | |
; AbstractChannelHandlerContext.write returns the promise as a ChannelFuture | |
(let [frags (smash msg frame-size) | |
last-frag (peek frags)] ; FIXME relies on no earlier frag being = to last-frag | |
(reduce | |
(fn [_ frag] | |
(if (= frag last-frag) | |
(.write ctx frag promise) | |
(-> (.write ctx frag) | |
(.addListener | |
(reify ChannelFutureListener | |
(operationComplete [_ f] | |
(if-not (.isSuccess f) | |
(log/warn "Unable to send fragment" frag "to" (-> ctx .channel .id .asShortText) (.cause f))))))))) | |
nil | |
frags))) | |
(flush [_ ctx] | |
(.flush ctx)) | |
ChannelHandler | |
(handlerAdded [_ ctx] #_(log/info "Fragmenter added to" (-> ctx .channel .id .asShortText))) | |
(handlerRemoved [_ ctx] #_(log/info "Fragmenter removed from" (-> ctx .channel .id .asShortText))))) | |
(defn insert-fragmenter [after] | |
; FIXME should be after (meaning "before" for outgoing messages) "deflater", but not currently using compression | |
(fn [^ChannelPipeline pipeline] | |
(.addAfter pipeline after "fragmenter" (fragmenter (* 16 1024))))) ; FIXME should match max-chunk-size |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns speak-to-yada | |
(:require [yada.yada :as yada] | |
[aleph.http :as http] | |
[fragmenter :refer [insert-fragmenter]])) | |
#_ (yada/listener routes... {:port port... | |
:pipeline-transform (insert-fragmenter "http-server")}) | |
#_ (http/websocket-client ws... {:pipeline-transform (insert-fragmenter "http-client")}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment