Skip to content

Instantly share code, notes, and snippets.

@PetrGlad
Created January 13, 2017 17:31
Show Gist options
  • Save PetrGlad/f9c50e211e92e97d1110600ad44c8eb2 to your computer and use it in GitHub Desktop.
Save PetrGlad/f9c50e211e92e97d1110600ad44c8eb2 to your computer and use it in GitHub Desktop.
(ns client.nakadi.lab
(require [manifold.deferred :as d]
[manifold.stream :as s]
[aleph.http :as http]
[clojure.core.async :as a]
[byte-streams :as bs]
[clojure.java.shell :as shell])
(:import (de.undercouch.actson JsonParser JsonEvent JsonFeeder)
(java.nio.charset StandardCharsets)))
(defn get-stream []
(http/request
{:method :get
:url "https://nakadi.somewhere/event-types/an_event/events"
; :content-type :json
; :as :json
:raw-stream? true
:headers {"Authorization" (str "Bearer " (get-oauth-token))}}))
(defn do-call
"Processes incoming buffers from Netty in a clojure.async/go.
s/connect is used to convert between from manifold stream to async's channel."
[]
(let [a @(d/chain
(http/get "https://nakadi.somewhere/event-types/an_event/events"
{:raw-stream? true
:headers {"Authorization" (str "Bearer " (get-oauth-token))}})
:body
#(s/map (comp bs/to-string
bs/to-byte-array) %))
c (a/chan)]
(s/connect a c)
(a/go-loop [v (a/<! c)
i 4]
(println v)
(if (and v (< 1 i))
(recur (a/<! c) (dec i))
(s/close! a)))))
(defn event-token [parser json-event]
(condp = json-event
JsonEvent/START_OBJECT :object-start
JsonEvent/END_OBJECT :object-end
JsonEvent/START_ARRAY :array-start
JsonEvent/END_ARRAY :array-end
JsonEvent/FIELD_NAME [:key (.getCurrentString parser)]
JsonEvent/VALUE_STRING (.getCurrentString parser)
JsonEvent/VALUE_INT (.getCurrentInt parser)
JsonEvent/VALUE_DOUBLE (.getCurrentDouble parser)
JsonEvent/VALUE_TRUE true
JsonEvent/VALUE_FALSE false
JsonEvent/VALUE_NULL nil))
(defn actson-tokens-stream [string-pieces put-token]
(let [parser (JsonParser. StandardCharsets/UTF_8)
pieces (map #(.getBytes %) string-pieces)
feed-next-event (fn [[x & xs :as pieces]]
(let [feeder (.getFeeder parser)]
(if (.isFull feeder)
(let [event (.nextEvent parser)]
(if (= JsonEvent/NEED_MORE_INPUT event)
[nil pieces]
[event (rest pieces)]))
(if-not x
(do (.done feeder)
[(.nextEvent parser) nil])
(do (.feed feeder x)
[(.nextEvent parser) xs])))))]
(loop [[event pieces] (feed-next-event pieces)]
(condp = event
JsonEvent/ERROR [:error ::syntax]
JsonEvent/EOF nil
nil [:error ::large-token]
JsonEvent/NEED_MORE_INPUT (recur (feed-next-event pieces))
(do (println "Got token" (event-token parser event))
(recur (feed-next-event pieces)))))))
(comment
(actson-tokens-stream ["\"\\u00e9\""])
(actson-tokens-stream ["[1" ",2.0, {" "\"прювет\":12, \"enabled\": true, \"confirmed-eh\": false}, \"G\\u00e9hts\", \"é\"]"])
(actson-tokens-stream ["1"]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment