Skip to content

Instantly share code, notes, and snippets.

@PetrGlad
Last active May 12, 2017 17:00
Show Gist options
  • Save PetrGlad/717fcfd1d13cf7efaa9dea0c193700f5 to your computer and use it in GitHub Desktop.
Save PetrGlad/717fcfd1d13cf7efaa9dea0c193700f5 to your computer and use it in GitHub Desktop.
netty lab clojure
(ns netty-lab.core
(require [clojure.data.json :as json])
(:import (io.netty.bootstrap Bootstrap)
(io.netty.channel.socket.nio NioSocketChannel)
(io.netty.channel.socket SocketChannel)
(java.net URL URI)
(io.netty.channel.nio NioEventLoopGroup)
(io.netty.channel ChannelOption ChannelInitializer ChannelPipeline SimpleChannelInboundHandler ChannelHandlerContext ChannelHandler Channel)
(io.netty.handler.ssl SslContextBuilder SslContext)
(io.netty.handler.ssl.util InsecureTrustManagerFactory)
(io.netty.handler.codec.http HttpHeaderNames HttpHeaderValues HttpVersion HttpMethod HttpRequest DefaultFullHttpRequest HttpClientCodec HttpContentDecompressor HttpContent HttpResponse HttpUtil LastHttpContent HttpObject)
(io.netty.util CharsetUtil)
(io.netty.handler.codec MessageToMessageDecoder LineBasedFrameDecoder)
(io.netty.channel.embedded EmbeddedChannel)
(io.netty.handler.codec.json JsonObjectDecoder)
(io.netty.handler.codec.string StringDecoder)
(java.util List)))
(defn ^ChannelHandler create-handler []
;; See http://netty.io/wiki/reference-counted-objects.html
;; Use -Dio.netty.leakDetectionLevel=PARANOID to verify buffer use counting
(proxy [SimpleChannelInboundHandler] []
(channelRead0 [^ChannelHandlerContext ctx ^HttpObject msg]
(cond
(instance? HttpResponse msg)
(do
(println "STATUS: " (.status msg))
(println "VERSION: " (.protocolVersion msg))
(let [headers (.headers msg)]
(if-not (.isEmpty headers)
(println
(->> (map (fn [^String n]
[n (.getAll headers n)])
(.names headers))
(into {})))))
(println (if (HttpUtil/isTransferEncodingChunked msg)
"CHUNKED CONTENT {"
"CONTENT {")))
(instance? HttpContent msg)
(do
(prn (-> msg .content (.toString CharsetUtil/UTF_8)))
(when (instance? LastHttpContent msg)
(println "} END OF CONTENT")
(.close ctx)))
(string? msg)
(prn (json/read-str msg :key-fn keyword))
true
(prn (-> msg type .getSimpleName) msg)))
(exceptionCaught [^ChannelHandlerContext ctx ^Throwable cause]
(.printStackTrace cause)
(.close ctx))))
(defn ^MessageToMessageDecoder make-payload-decoder []
(let [parser-channel (atom nil)] ;; TODO Create new parser for each new http response
(proxy [MessageToMessageDecoder] [HttpContent]
(decode [^ChannelHandlerContext ctx
^HttpContent msg
^List out]
(when (nil? @parser-channel)
(reset! parser-channel
(EmbeddedChannel.
(into-array ChannelHandler
[(LineBasedFrameDecoder. 1024) #_(JsonObjectDecoder.) ; splitter
(StringDecoder. CharsetUtil/UTF_8)]))))
(assert @parser-channel)
(.writeInbound @parser-channel
(to-array [(-> msg .content .retain)]))
(loop [out-msg (.readInbound @parser-channel)]
(when out-msg
(.add out out-msg)
(recur (.readInbound @parser-channel))))))))
(defn channel-initializer [^SslContext ssl-ctx]
(proxy [ChannelInitializer] []
(initChannel [^SocketChannel ch]
(doto (.pipeline ch)
(.addLast "ssl" (.newHandler ssl-ctx (.alloc ch)))
(.addLast "http" (HttpClientCodec.))
(.addLast "decompress" (HttpContentDecompressor.))
(.addLast "payload" (make-payload-decoder))
(.addLast "client" (create-handler))))))
(defn create-worker-group []
(NioEventLoopGroup.))
(defn shutdown-worker-group [worker-group]
(.shutdownGracefully worker-group))
(defn wait-closed [^Channel ch]
(-> (.closeFuture ch)
.sync))
(defn close-channel [^Channel ch]
(.close ch))
(defn get-url [worker-group url]
;; Based on https://github.com/netty/netty/blob/4.1/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClient.java
;; You may call .sync on returned future to wait until everything is returned
(let [uri (URI. url)
scheme (.getScheme uri)
host (.getHost uri)
ssl-ctx (-> (SslContextBuilder/forClient)
(.trustManager (InsecureTrustManagerFactory/INSTANCE))
.build)]
(assert (= "https" scheme))
(let [b (doto (Bootstrap.)
(.group worker-group)
(.channel NioSocketChannel)
(.option ChannelOption/SO_KEEPALIVE false)
(.handler (channel-initializer ssl-ctx)))
ch (-> b
(.connect host 443)
.sync
.channel)
request ^HttpRequest (DefaultFullHttpRequest.
HttpVersion/HTTP_1_1
HttpMethod/GET
(.getRawPath uri))]
(doto (.headers request)
(.set HttpHeaderNames/HOST host)
(.set HttpHeaderNames/CONNECTION HttpHeaderValues/CLOSE) ;
(.set HttpHeaderNames/ACCEPT_ENCODING HttpHeaderValues/GZIP)
(.set HttpHeaderNames/AUTHORIZATION (str "Bearer " "eyJraWQiOiJwbGF0Zm9ybS1pYW0tdmNlaHloajYiLCJhbGciOiJFUzI1NiJ9.eyJhdWQiOiJ6dG9rZW4iLCJzdWIiOiJhYjhhNjU4Yy0xMzMzLTRhMGMtODc3OC1iODI5M2Q5ZTA1YzEiLCJodHRwczovL2lkZW50aXR5LnphbGFuZG8uY29tL3JlYWxtIjoidXNlcnMiLCJodHRwczovL2lkZW50aXR5LnphbGFuZG8uY29tL3Rva2VuIjoiQmVhcmVyIiwiaHR0cHM6Ly9pZGVudGl0eS56YWxhbmRvLmNvbS9tYW5hZ2VkLWlkIjoicGdsYWRraWtoIiwiYXpwIjoienRva2VuIiwiaHR0cHM6Ly9pZGVudGl0eS56YWxhbmRvLmNvbS9icCI6IjgxMGQxZDAwLTQzMTItNDNlNS1iZDMxLWQ4MzczZmRkMjRjNyIsImF1dGhfdGltZSI6MTQ5MzEyMjk3MCwiaXNzIjoiaHR0cHM6Ly9pZGVudGl0eS56YWxhbmRvLmNvbSIsImV4cCI6MTQ5NDYwNzgyNiwiaWF0IjoxNDk0NjA0MjE2fQ.--o2bs6yTc_IrrCnzzis53ftr4ziL4NdxbqZ7E4qvAqtLJJ9vTf4B8O8q3v05p0hFsRIbYfHMjFkGuB4CPOghA"))) ;
(.writeAndFlush ch request)
ch)))
(defproject netty-lab "0.1.0-SNAPSHOT"
:description "FIXME: write description"
:url "http://example.com/FIXME"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.8.0"]
[io.netty/netty-all "4.1.9.Final"]
[io.projectreactor.ipc/reactor-netty "0.6.3.RELEASE"]
[org.clojure/data.json "0.2.6"]])
(ns netty-lab.rs
(:import
(java.time Duration)
(reactor.ipc.netty.http.client HttpClient)
(reactor.ipc.netty.resources PoolResources)
(io.netty.buffer ByteBuf)
(java.util.function Function BiFunction)
(reactor.core.publisher Mono)))
;(defonce pool (PoolResources/fixed "test", 1))
;; opts -> opts.connect("localhost",
;x.address()
;.getPort())
;.poolResources(pool)
(defn ^Function as-function [f]
(reify Function
(apply [_this x] (f x))))
(defmacro function [[x] & body]
`(reify java.util.function.Function
(apply [~(gensym) ~x] ~@body)))
(defn ^BiFunction as-2function [f]
(reify BiFunction
(apply [_this x y] (f x y))))
(defn reactor-netty-get []
(-> (HttpClient/create)
(.get "https://zalando.de/"
(function [r] (-> r .followRedirect .sendHeaders)))
.log
;(.then (as-function #(Mono/just (-> % .status .code))))
(.map (function [x] (.receive x)))
(.map (function [x] (prn (.getSimpleName (type x)) x)))
#_(.flatMapMany (reify Function
(apply [_this r]
(-> r .receive .asString (.limitRate 1)))))
#_(.reduce (reify BiFunction
(apply [_this a b]
(str a b)))) ;
(.block (Duration/ofSeconds 100))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment