Skip to content

Instantly share code, notes, and snippets.

@torbjornvatn
Last active August 28, 2017 09:12
Show Gist options
  • Save torbjornvatn/0734123d6e443275a978649ef6edd98d to your computer and use it in GitHub Desktop.
Save torbjornvatn/0734123d6e443275a978649ef6edd98d to your computer and use it in GitHub Desktop.
core.clj for my Trigger Dataflow pipelines with Cloud Functions written in Clojurescript blog post
(ns pipeline-example.core
(:require [cljs.pprint :as pp]
[clojure.string :as string]))
(def circular-json (js/require "circular-json-es6"))
(def spawn (.-spawn (js/require "node-jre")))
(def __dirname (js* "__dirname"))
(defn args
"The different arguments needed for starting the pipeline"
[file-name import-bucket]
[(str "--jobName=processing-" file-name)
"--project=[PROJECT ID]"
"--runner=DataflowPipelineRunner"
"--stagingLocation=gs://[STAGING BUCKET]/jars"
(str "--inputFile=" file-name)
(str "--importBucket=gs://" import-bucket)])
(defn parse-event
[raw-event]
(js->clj (.parse circular-json (.stringify circular-json raw-event)) :keywordize-keys true))
(defn execute-jar-file
"Spawns a child process using node-jre to launch the pipeline"
[file-name import-bucket callback]
(let [classpath (clj->js ["pipeline-standalone.jar"])
main-class "pipeline-example.core"
proc (spawn
classpath
main-class
(clj->js (args file-name import-bucket))
(clj->js {:cwd __dirname}))]
(do ;; Handle different event that can happen in the child process
(-> proc
(.-stdout)
(.on "data"
(fn [data]
(println (str "stdout: " data)))))
(-> proc
(.-stderr)
(.on "data"
(fn [data]
(println (str "stderr: " data)))))
(-> proc
(.on "close"
(fn [code]
(if (= code 0)
(callback nil (str "Success!"))
(callback (new js/Error (str "Darn, we got an exit code = " code)))))))
(-> proc
(.on "error"
(fn [err]
(println (str "error: " err))
(callback (new js/Error (str "Darn, we got an error: " err)))))))))
(defn pipeline-example-trigger
[raw-event callback]
(let [event (parse-event raw-event)
{:keys [name bucket]} (:data event)]
(if (and name bucket) ;; Check that we have both the name of the file and the bucket
(execute-jar-file name bucket callback)
(callback (new js/Error (str "I didn't get a file-name or a bucket in the event: " event))))))
(enable-console-print!)
;; The function that gets exported here is the entrypoint used by GCF
(set! (.-exports js/module) #js {:pipeline-example-trigger pipeline-example-trigger})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment