Last active
August 28, 2017 09:12
-
-
Save torbjornvatn/0734123d6e443275a978649ef6edd98d to your computer and use it in GitHub Desktop.
core.clj for my Trigger Dataflow pipelines with Cloud Functions written in Clojurescript blog post
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns pipeline-example.core | |
(:require [cljs.pprint :as pp] | |
[clojure.string :as string])) | |
(def circular-json (js/require "circular-json-es6")) | |
(def spawn (.-spawn (js/require "node-jre"))) | |
(def __dirname (js* "__dirname")) | |
(defn args | |
"The different arguments needed for starting the pipeline" | |
[file-name import-bucket] | |
[(str "--jobName=processing-" file-name) | |
"--project=[PROJECT ID]" | |
"--runner=DataflowPipelineRunner" | |
"--stagingLocation=gs://[STAGING BUCKET]/jars" | |
(str "--inputFile=" file-name) | |
(str "--importBucket=gs://" import-bucket)]) | |
(defn parse-event | |
[raw-event] | |
(js->clj (.parse circular-json (.stringify circular-json raw-event)) :keywordize-keys true)) | |
(defn execute-jar-file | |
"Spawns a child process using node-jre to launch the pipeline" | |
[file-name import-bucket callback] | |
(let [classpath (clj->js ["pipeline-standalone.jar"]) | |
main-class "pipeline-example.core" | |
proc (spawn | |
classpath | |
main-class | |
(clj->js (args file-name import-bucket)) | |
(clj->js {:cwd __dirname}))] | |
(do ;; Handle different event that can happen in the child process | |
(-> proc | |
(.-stdout) | |
(.on "data" | |
(fn [data] | |
(println (str "stdout: " data))))) | |
(-> proc | |
(.-stderr) | |
(.on "data" | |
(fn [data] | |
(println (str "stderr: " data))))) | |
(-> proc | |
(.on "close" | |
(fn [code] | |
(if (= code 0) | |
(callback nil (str "Success!")) | |
(callback (new js/Error (str "Darn, we got an exit code = " code))))))) | |
(-> proc | |
(.on "error" | |
(fn [err] | |
(println (str "error: " err)) | |
(callback (new js/Error (str "Darn, we got an error: " err))))))))) | |
(defn pipeline-example-trigger | |
[raw-event callback] | |
(let [event (parse-event raw-event) | |
{:keys [name bucket]} (:data event)] | |
(if (and name bucket) ;; Check that we have both the name of the file and the bucket | |
(execute-jar-file name bucket callback) | |
(callback (new js/Error (str "I didn't get a file-name or a bucket in the event: " event)))))) | |
(enable-console-print!) | |
;; The function that gets exported here is the entrypoint used by GCF | |
(set! (.-exports js/module) #js {:pipeline-example-trigger pipeline-example-trigger}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment