Created
January 13, 2017 17:31
-
-
Save PetrGlad/f9c50e211e92e97d1110600ad44c8eb2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns client.nakadi.lab | |
(require [manifold.deferred :as d] | |
[manifold.stream :as s] | |
[aleph.http :as http] | |
[clojure.core.async :as a] | |
[byte-streams :as bs] | |
[clojure.java.shell :as shell]) | |
(:import (de.undercouch.actson JsonParser JsonEvent JsonFeeder) | |
(java.nio.charset StandardCharsets))) | |
(defn get-stream [] | |
(http/request | |
{:method :get | |
:url "https://nakadi.somewhere/event-types/an_event/events" | |
; :content-type :json | |
; :as :json | |
:raw-stream? true | |
:headers {"Authorization" (str "Bearer " (get-oauth-token))}})) | |
(defn do-call | |
"Processes incoming buffers from Netty in a clojure.async/go. | |
s/connect is used to convert between from manifold stream to async's channel." | |
[] | |
(let [a @(d/chain | |
(http/get "https://nakadi.somewhere/event-types/an_event/events" | |
{:raw-stream? true | |
:headers {"Authorization" (str "Bearer " (get-oauth-token))}}) | |
:body | |
#(s/map (comp bs/to-string | |
bs/to-byte-array) %)) | |
c (a/chan)] | |
(s/connect a c) | |
(a/go-loop [v (a/<! c) | |
i 4] | |
(println v) | |
(if (and v (< 1 i)) | |
(recur (a/<! c) (dec i)) | |
(s/close! a))))) | |
(defn event-token [parser json-event] | |
(condp = json-event | |
JsonEvent/START_OBJECT :object-start | |
JsonEvent/END_OBJECT :object-end | |
JsonEvent/START_ARRAY :array-start | |
JsonEvent/END_ARRAY :array-end | |
JsonEvent/FIELD_NAME [:key (.getCurrentString parser)] | |
JsonEvent/VALUE_STRING (.getCurrentString parser) | |
JsonEvent/VALUE_INT (.getCurrentInt parser) | |
JsonEvent/VALUE_DOUBLE (.getCurrentDouble parser) | |
JsonEvent/VALUE_TRUE true | |
JsonEvent/VALUE_FALSE false | |
JsonEvent/VALUE_NULL nil)) | |
(defn actson-tokens-stream [string-pieces put-token] | |
(let [parser (JsonParser. StandardCharsets/UTF_8) | |
pieces (map #(.getBytes %) string-pieces) | |
feed-next-event (fn [[x & xs :as pieces]] | |
(let [feeder (.getFeeder parser)] | |
(if (.isFull feeder) | |
(let [event (.nextEvent parser)] | |
(if (= JsonEvent/NEED_MORE_INPUT event) | |
[nil pieces] | |
[event (rest pieces)])) | |
(if-not x | |
(do (.done feeder) | |
[(.nextEvent parser) nil]) | |
(do (.feed feeder x) | |
[(.nextEvent parser) xs])))))] | |
(loop [[event pieces] (feed-next-event pieces)] | |
(condp = event | |
JsonEvent/ERROR [:error ::syntax] | |
JsonEvent/EOF nil | |
nil [:error ::large-token] | |
JsonEvent/NEED_MORE_INPUT (recur (feed-next-event pieces)) | |
(do (println "Got token" (event-token parser event)) | |
(recur (feed-next-event pieces))))))) | |
(comment | |
(actson-tokens-stream ["\"\\u00e9\""]) | |
(actson-tokens-stream ["[1" ",2.0, {" "\"прювет\":12, \"enabled\": true, \"confirmed-eh\": false}, \"G\\u00e9hts\", \"é\"]"]) | |
(actson-tokens-stream ["1"])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment