Created
November 25, 2014 01:28
-
-
Save acthp/d60cd29e4b23a4c39326 to your computer and use it in GitHub Desktop.
experimental clojure wrapper for SparkSQL
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns try-spark.spark | |
(:refer-clojure :exclude [map filter reduce first name take min distinct max count]) | |
(:require [clojure.core :as core]) | |
(:require [clojure.reflect :as r]) | |
(:import org.apache.spark.sql.api.java.JavaSchemaRDD)) | |
; Parameter classes that we have to wrap in order to | |
; call rdd methods with plain clojure functions. | |
(def func-types | |
(into #{} | |
(core/map #(symbol (str "org.apache.spark.api.java.function." %)) | |
["Function" "DoubleFunction" "FlatMapFunction2" "FlatMapFunction" | |
"Function2" "DoubleFlatMapFunction" "VoidFunction"]))) | |
; Arities of the 'call' methods of the above function classes. | |
(def func-arities | |
(zipmap | |
func-types | |
(core/map | |
#(core/count (:parameter-types | |
(core/first (core/filter (fn [m] (= (:name m) 'call)) | |
(:members (r/reflect (Class/forName (str %)))))))) | |
func-types))) | |
; true if the given method should be wrapped. Skipping private and | |
; inner class methods. | |
(defn should-wrap [m] | |
(and | |
((:flags m) :public) | |
(= (.indexOf (str (:name m)) "$") -1))) | |
(defn arg-list [n] | |
(core/map #(symbol (str "x" %)) (range n))) | |
(defn wrap-param [param type] | |
(if-let [interface (func-types type)] ; Wrap a fn parameter in a proxy class. | |
(let [args (arg-list (func-arities interface))] ; Build arg list for the proxy. | |
`(proxy [~type] [] (~'call [~@args] (~param ~@args)))) | |
param)) | |
(defn wrap-arity [{:keys [name parameter-types]}] | |
(let [params (core/map #(symbol (str "arg" %)) (range (core/count parameter-types))) | |
param-list (conj (vec params) 'rdd) | |
call-list (cons 'rdd (core/map wrap-param params parameter-types)) | |
interop (symbol (str "." name))] | |
`(~param-list (~interop ~@call-list)))) | |
(defn valid-arity [overloads] | |
(apply = (core/map #(core/map func-types (:parameter-types %)) overloads))) | |
; We can wrap an arity if func-types appear in | |
; the same positions for all overloads. If this isn't | |
; true, we will need run-time checks of parameter types. | |
; For now, throw if we would need run-time checks. | |
(defn wrap-member [name overloads] | |
(let [arities (group-by #(core/count (:parameter-types %)) overloads) | |
pick-first (core/map (fn [[name arity-overloads]] | |
(when (not (valid-arity arity-overloads)) | |
(throw (Exception. (str "Arity with ambiguous types:" name arity-overloads)))) | |
(core/first arity-overloads)) arities) ; Assuming they're all the same, use the first. | |
declare-arities (core/map wrap-arity pick-first)] | |
`(defn ~name ~@declare-arities))) | |
(def rdd-methods | |
(let [rdd-methods (->> (r/reflect JavaSchemaRDD) | |
(:members) | |
(core/filter should-wrap) | |
(group-by :name))] | |
(core/map #(apply wrap-member %) rdd-methods))) | |
(defmacro wrap-rdd-methods [] | |
`(do | |
~@rdd-methods)) | |
(wrap-rdd-methods) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment