Skip to content

Instantly share code, notes, and snippets.

@ulises
Last active August 29, 2015 14:01
Show Gist options
  • Save ulises/7813714ad83c8acd67f0 to your computer and use it in GitHub Desktop.
Save ulises/7813714ad83c8acd67f0 to your computer and use it in GitHub Desktop.
(ns your.namespace
(:require [riemann.transport :as rtransport]
[riemann.transport.udp :as rudp]
[interval-metrics.core :as metrics]
[clojure.tools.logging :as log])
(:import (java.nio ByteBuffer ByteOrder)
(org.jboss.netty.buffer ChannelBufferInputStream)
(org.jboss.netty.handler.codec.oneone OneToOneDecoder)
(org.jboss.netty.handler.execution ExecutionHandler
MemoryAwareThreadPoolExecutor)))
;; your-parsing-fn must return a seq of events
(defn decode-message
[^ChannelBufferInputStream msg]
{:events (your-parsing-fn (.readLine msg))])
(defn your-frame-decoder
[]
(proxy [OneToOneDecoder] []
(decode [context channel message]
(let [msg (ChannelBufferInputStream. message)]
(decode-message msg)))))
(defn server [{:keys [host port max-channel-memory-size max-total-memory-size]
:or {host "127.0.0.1" port 12345
max-channel-memory-size 1048576
max-total-memory-size 10485760}
:as opts}]
(let [core (atom nil)
max-size (get opts :max-size 16384)
channel-group (rtransport/channel-group
(str "your-udp-server" host ":" port
"(" max-size ")"))
pipeline-factory (rtransport/channel-pipeline-factory
executor (ExecutionHandler.
(MemoryAwareThreadPoolExecutor.
(cpu-count) ;; you provide this
max-channel-memory-size
max-total-memory-size))
^:shared msg-decoder (your-frame-decoder)
handler (rudp/gen-udp-handler core
(metrics/rate+latency)
channel-group
rudp/udp-handler))]
(rudp/udp-server {:core core
:host host
:port port
:max-size max-size
:channel-group channel-group
:pipeline-factory pipeline-factory})))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment