Created
March 31, 2022 17:21
-
-
Save crosstyan/8a87ebdb0c23e549b1c75e9e4013ffa5 to your computer and use it in GitHub Desktop.
an example of clojure udp server using aleph
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defproject udp-server "0.1.0-SNAPSHOT" | |
:description "FIXME: write description" | |
:url "http://example.com/FIXME" | |
:license {:name "EPL-2.0 OR GPL-2.0-or-later WITH Classpath-exception-2.0" | |
:url "https://www.eclipse.org/legal/epl-2.0/"} | |
:dependencies [[org.clojure/clojure "1.10.3"] | |
[aleph "0.4.6"] | |
[org.clojure/core.match "1.0.0"]] | |
:repl-options {:init-ns udp-server.core}) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns udp-server.core | |
(:require | |
[manifold.deferred :as d] | |
[manifold.stream :as s] | |
[aleph.udp :as udp] | |
[clojure.string :as str] | |
[byte-streams :as bs] | |
[clojure.core.match :refer [match]] | |
)) | |
(def server-port 12345) | |
;; manifold.stream | |
;; ->> input as last | |
;{:sender #object[java.net.InetSocketAddress], | |
; :message <byte-array>} | |
(defn start-server | |
[port] | |
;; server is a manifold.deferred | |
;; need to use (deref) to convert it to manifold.stream | |
(let [server (udp/socket {:port port})] | |
server)) | |
(defn convert-msg [msg] | |
(let [addr (-> msg (:sender) (bean) (:address) (bean) (:hostAddress)) | |
port (-> msg (:sender) (bean) (:port)) | |
vector-msg (-> msg (:message) (vec)) | |
string-msg (bs/to-string (:message msg))] | |
{:host addr | |
:port port | |
:message (:message msg) | |
:vector vector-msg | |
:string string-msg})) | |
(defn echo-back! [server msg] | |
; keys are [host port message socket-address] | |
(let [converted (convert-msg msg)] | |
(s/put! server {:host (:host converted) | |
:port (:port converted) | |
:message (:message converted)}))) | |
(defn send-back! | |
"@param `server` a `aleph.udp/socket` or any `manifold.stream` | |
@param `recv-msg` raw msg received by `manifold.stream` | |
@param `msg` string or array-bytes" | |
[server recv-msg msg] | |
(let [converted (convert-msg recv-msg)] | |
(s/put! server {:host (:host converted) | |
:port (:port converted) | |
:message msg}))) | |
(defn str-hex [x] (format "0x%02x" x)) | |
;; 0 to 255. 256 is exclusive | |
(defn rand-hex-arr [n] (take n (repeatedly #(rand-int 256)))) | |
(defn gen-msg | |
"`recv-msg` raw msg received by `manifold.stream`. return byte-array" | |
[recv-msg] | |
;; msg is a vector of bytes | |
(let [msg (vec (:message (convert-msg recv-msg))) | |
head (first msg) | |
heq #(= head (unchecked-byte %1))] | |
(cond | |
;; See https://clojuredocs.org/clojure.core/unchecked-byte | |
;; (byte 0x80) is illegal, because 0x80 = 128 | |
;; but byte in clojure is [-128, 128) | |
;; (byte -128) = 0x80 wired! | |
(heq 0x70) (byte-array (vec (concat [0x70] (rand-hex-arr 16)))) | |
(heq 0x78) (byte-array [0x78 0x00]) | |
(heq 0x80) (byte-array (vec (concat [0x80 0x00] (rand-hex-arr 4)))) | |
:else (byte-array [0x70 0x01])))) | |
;; make a stream pipeline | |
(defn msg-recv | |
[server] | |
(s/map convert-msg @server)) | |
;; make an echo server | |
(defn start-echo [server] | |
(s/consume (fn [m] (echo-back! @server m)) @server)) | |
(defn start-handle-msg [server] | |
(s/consume (fn [m] | |
(send-back! @server m (gen-msg m))) @server)) | |
;(start-echo (start-server server-port)) | |
;(start-handle-msg (start-server server-port)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment