Created
February 26, 2017 23:05
-
-
Save atroche/f87fdd4c27048d1f9815da088e03040d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns sunshine.less-bloat | |
(:require [datasplash.api :as ds])) | |
;; just used for parsing command line options | |
(ds/defoptions LessBloatOptions | |
;; (these options don't support kebab case) | |
;; dataDir is used for running this against my local data as a way to test quickly: | |
{:dataDir {:type String | |
:default "gs://wikireading-atroche/data" | |
:description "Path where files are stored"}}) | |
(defn json-files->clj [pipeline data-dir dataset-name] | |
(let [fname (str data-dir "/short-json/" (name dataset-name) "*")] | |
;; e.g. /data/short-json/training.json-00028-of-00157 | |
(->> pipeline | |
(ds/read-json-file fname | |
;; name to show in the monitoring interface in Dataflow | |
{:name (str "ReadJSONof" (name dataset-name)) | |
;; parse keys like "property" -> :property | |
:key-fn true})))) | |
(defn group-by-document [instances] | |
(->> instances | |
;; need to go from {doc, prop, values} map to -> [doc {prop, values}] | |
;; tuple for Dataflow's group-by-key to work | |
(ds/map-kv (fn [{:keys [property document value]}] | |
[document {:property property | |
:value value}]) | |
{:name "KeyByDocument"}) | |
(ds/group-by-key {:name "GroupByDocument"}))) | |
(defn less-bloat-pipeline [args] | |
(let [p (ds/make-pipeline 'LessBloatOptions args) | |
{:keys [dataDir]} (ds/get-pipeline-configuration p)] | |
;; we want different files for each subset, | |
;; not to aggregate over whole dataset: | |
(doseq [dataset-name [:training :validation :testing] | |
:let [instances (json-files->clj p dataDir dataset-name)]] | |
(->> instances | |
group-by-document | |
(ds/write-json-file (str dataDir "/" (name dataset-name) ".json") | |
{:num-shards 1}))) | |
;; need to return the pipeline object so it can be run | |
p)) | |
(defn -main | |
[& args] | |
(compile 'sunshine.less-bloat) | |
(ds/run-pipeline (less-bloat-pipeline args))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment