Last active
July 24, 2019 19:24
-
-
Save torbjornvatn/89804fe22277ac79f5ca7ab22ebf7b71 to your computer and use it in GitHub Desktop.
Streaming word extract
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns streaming-word-extract | |
(:require | |
[pubsub-utils] ;; This is our local pubsub-utils namespace | |
[datasplash | |
[api :as ds] | |
[bq :as bq] | |
[pubsub :as ps]] | |
[clojure.string :as string]) | |
(:import | |
(com.google.cloud.dataflow.sdk.options DataflowPipelineOptions))) | |
;; The name of the Google Cloud project we'll start this pipeline in | |
(def project-id "[CLOUD PROJECT ID HERE]") | |
;; First we need to define some options for the pipeline | |
;; Note that we don't create a custom StreamingWordExtractOptions interface | |
;; as they do in the Java example, although Datasplash supports that as well | |
(def pipeline-options | |
{:runner "DataflowPipelineRunner" | |
:job-name "streaming-word-extract" | |
:project project-id | |
:streaming true | |
:stagingLocation (str "gs://[STAGING LOCATION HERE]/jars")}) | |
;; -------------------------------------------------------------------------------- | |
;; Pure functions being used in the transformations further DataflowPipelineOptions | |
;; They can easily be unit tested | |
;; -------------------------------------------------------------------------------- | |
; Tokenizes lines of text into individual words and removes the empty ones | |
(defn extract-words | |
[line] | |
(->> (string/split line #"\s+") | |
(filter not-empty))) | |
;; This is where apply the different transformation steps to the pipeline | |
;; The pipeline isn't run at this point, just configured through a series of | |
;; composed function calls | |
;; The actual functionality of each step is isolated to separate, pure functions | |
;; As you can see, each step has a beend given a name to make it easier to find in the | |
;; Dataflow UI afterwards | |
(defn apply-transforms-to-pipeline | |
[pipeline topic-name bigquery-table] | |
(->> pipeline | |
;; we'll read all the lines from the King Lear PubSub topic | |
(ps/read-from-pubsub topic-name {:name "read-from-pubsub"}) | |
;; here we apply mapcat to flatten the vector returned by extract-words | |
(ds/mapcat extract-words {:name "extract-words"}) | |
;; here we can use Clojure's upper-case directly | |
(ds/map string/upper-case {:name "uppercase"}) | |
;; create the BigQuery row representation of the uppercased word | |
;; here we use an inline anonymous function | |
(ds/map (fn [word] (assoc {} :uppercase_word word)) {:name "create-row"}) | |
;; writing to BigQuery using the schema defined inline in the options | |
(bq/write-bq-table | |
bigquery-table | |
{:schema [{:name "uppercase_word" :type "STRING" :mode "REQUIRED"}] | |
:name "write-to-bigquery" | |
:create-disposition :if-needed | |
:write-disposition :append}))) | |
(defn run-example | |
;; We pass in the topic name and the input file as simple map | |
[{:keys [topic-name input-file bigquery-table]}] | |
;; first we initalize the pipeline using the options | |
(let [pipeline (ds/make-pipeline | |
DataflowPipelineOptions | |
[] | |
pipeline-options) | |
full-topic-path (pubsub-utils/full-topic-path project-id topic-name)] | |
;; set up the topic and subscritp in PubSub if it doesn't exists | |
(pubsub-utils/setup-topic project-id topic-name) | |
;; apply all the transformations to the pipeline and then start it | |
(apply-transforms-to-pipeline pipeline full-topic-path bigquery-table) | |
(ds/run-pipeline pipeline) | |
;; populate the topic with all the words from the kinglear.txt file | |
;; stored in Google Cloud Storage | |
(pubsub-utils/populate-topic project-id topic-name input-file))) | |
(comment | |
(run-example {:topic-name "streaming-word-extract" | |
:input-file "gs://dataflow-samples/shakespeare/kinglear.txt" | |
:bigquery-table (str project-id ":streaming_word_extract.words")})) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment