Created
July 13, 2012 01:13
-
-
Save dkincaid/3102138 to your computer and use it in GitHub Desktop.
Cascalog sales transaction summary
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns transaction.queries | |
(:use [cascalog.api]) | |
(:require [cascalog.ops :as c] | |
[cascalog.tap :as tap] | |
[cascalog.workflow :as w]) | |
(:import [com.google.common.hash Hashing] | |
[org.joda.time.format DateTimeFormat] | |
[cascading.scheme.hadoop TextDelimited]) | |
(:gen-class)) | |
(defn transaction-test [] [ | |
; ?practiceid ?invoiceid ?clientid ?patientid ?txn_date ?quantity ?price ?itemtype ?lineitem ?sublineitem ?transactionnumber | |
[ "16228" "286688" "6768" "6768-13" "2012-06-02 13:48:06.634000" "1.00" "0.00" "S" "8" "0" "865052" ] | |
[ "16228" "286688" "6768" "6768-13" "2012-06-02 13:48:06.634000" "1.00" "0.00" "S" "9" "0" "865052" ] | |
[ "16228" "286688" "6768" "6768-13" "2012-06-02 13:51:04.796000" "1.00" "0.00" "S" "10" "0" "865052" ] | |
[ "16228" "286689" "21221" "21221-1" "2012-06-02 14:04:39.830000" "1.00" "53.50" "R" "2" "0" "865056" ] | |
[ "16228" "286689" "21221" "21221-1" "2012-06-02 14:04:39.830000" "1.00" "16.75" "S" "3" "0" "865056" ] | |
[ "16228" "286689" "21221" "21221-1" "2012-06-02 14:04:39.830000" "1.00" "17.00" "S" "4" "0" "865056" ] | |
[ "16228" "286689" "21221" "21221-1" "2012-06-02 14:04:39.830000" ".02" "20.00" "I" "5" "0" "865056" ] | |
[ "16228" "286689" "21221" "21221-1" "2012-06-02 14:04:39.830000" "1.00" "0.00" "S" "6" "0" "865056" ] | |
[ "16228" "286689" "21221" "21221-1" "2012-06-02 14:04:39.830000" "1.00" "0.00" "S" "7" "0" "865056" ] | |
[ "16228" "286689" "21221" "21221-1" "2012-06-02 14:12:30.781000" "1.00" "0.00" "S" "8" "0" "865056" ] | |
[ "12595" "285956" "28101" "28101-5" "2012-06-02 13:32:35.424000" "1.00" "19.50" "S" "2" "0" "769460" ] | |
[ "12595" "285956" "28101" "28101-5" "2012-06-02 13:32:35.424000" "1.00" "42.50" "S" "3" "0" "769460" ] | |
[ "12595" "285956" "28101" "28101-5" "2012-06-02 13:32:35.424000" "1.00" "7.81" "I" "4" "0" "769460" ] | |
[ "12595" "285956" "28101" "28101-5" "2012-06-02 13:32:35.424000" "1.00" "0.00" "G" "5" "0" "769460" ] | |
[ "12595" "285956" "28101" "28101-5" "2012-06-02 13:34:40.395000" "1.00" "36.00" "S" "5" "1" "769460" ] | |
[ "12595" "285956" "28101" "28101-5" "2012-06-02 13:32:35.424000" "1.00" "1.50" "S" "5" "2" "769460" ] | |
[ "12595" "285956" "28101" "28101-5" "2012-06-02 13:32:35.424000" "1.00" "34.00" "S" "6" "0" "769460" ] | |
[ "12595" "285956" "28101" "28101-5" "2012-06-02 13:32:35.424000" "1.00" "17.60" "S" "7" "0" "769460" ] | |
[ "12595" "285956" "28101" "28101-5" "2012-06-02 13:35:37.662000" "1.00" "20.50" "I" "8" "0" "769460" ] | |
[ "12595" "285957" "21003" "21003-5" "2012-07-02 13:29:16.264000" "1.00" "39.00" "S" "7" "0" "769458" ] | |
[ "12595" "285957" "21003" "21003-5" "2012-07-02 13:29:20.608000" "1.00" "22.00" "S" "8" "0" "769458" ] | |
[ "12595" "285957" "21003" "21003-5" "2012-07-02 13:29:31.952000" "1.00" "24.00" "I" "9" "0" "769458" ] | |
[ "12595" "285957" "21003" "21003-5" "2012-07-02 13:30:23.454000" "2.00" "41.50" "I" "10" "0" "769458" ] | |
[ "12595" "285957" "21003" "21003-5" "2012-07-02 13:31:14.830000" "28.00" "25.86" "I" "11" "0" "769458" ] | |
[ "12595" "285957" "21003" "21003-6" "2012-07-02 13:33:09.442000" "1.00" "27.20" "I" "10" "0" "769458" ] | |
[ "12595" "285958" "28177" "28177-1" "2012-07-02 14:09:03.427000" "1.00" "39.00" "S" "8" "0" "769463" ] | |
[ "12595" "285958" "28177" "28177-1" "****-07-02 14:09:29.318000" "1.00" "34.00" "S" "9" "0" "769463" ] | |
[ "12595" "285958" "28177" "28177-1" "2012-07-02 14:10:12.928000" "1.00" "24.00" "I" "10" "0" "769463" ] | |
[ "12595" "285958" "28177" "28177-1" "2012-07-02 14:10:20.521000" "1.00" "19.00" "I" "11" "0" "769463" ] | |
[ "12595" "285958" "28177" "28177-1" "2012-07-02 14:10:26.287000" "1.00" "0.00" "S" "12" "0" "769463" ] | |
[ "12595" "285958" "28177" "28177-1" "2012-07-02 14:10:42.444000" "1.00" "0.00" "S" "13" "0" "769463" ] | |
[ "12595" "285958" "28177" "28177-1" "2012-09-02 14:10:59.163000" "14.00" "16.06" "I" "14" "0" "769463" ] | |
]) | |
(def project-id 100) | |
(def hash-function (Hashing/murmur3_128)) | |
(defn hash-string [hashfunc string] | |
"Calculate the hash value of the given string using the given com.google.common.hash.HashFunction" | |
(. (. hashfunc hashBytes (. string getBytes)) asLong)) | |
(defn valid-date? | |
"Check for a valid date format" | |
[date-string] | |
(let [re #"\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d\.\d\d\d\d\d\d"] | |
(if (re-find re date-string) | |
true | |
false))) | |
(def date-formatter (DateTimeFormat/forPattern "yyyy-MM-dd HH:mm:ss.SSSSSS")) | |
(defn parse-local-date-time [date] | |
"Returns a Joda Time LocalDateTime object from the given string using the date-formatter format." | |
(. date-formatter parseLocalDateTime date)) | |
(defn extract-date-part [local-date-time format] | |
"Extracts a part of a date using the given format." | |
(. local-date-time toString format)) | |
(defn day-from-local-date-time [date-time] | |
"Returns the day of the month from the given Joda Time LocalDateTime object." | |
(extract-date-part date-time "dd")) | |
(defn month-from-local-date-time [date-time] | |
"Returns the month number from the given Joda Time LocalDateTime object." | |
(. date-time getMonthOfYear)) | |
(defn year-from-local-date-time [date-time] | |
"Returns the year from the given Joda Time LocalDateTime object." | |
(. date-time getYear)) | |
(defn month-name-from-local-date-time [date-time] | |
"Returns the month name form the given Joda Time LocalDateTime object." | |
(extract-date-part date-time "MMMM")) | |
(defn parse-month-name [date-string] | |
"Returns the month name from the given date string." | |
(month-name-from-local-date-time (parse-local-date-time date-string))) | |
(defn parse-day [date-string] | |
"Returns the day of the month from the given date string." | |
(day-from-local-date-time (parse-local-date-time date-string))) | |
(defn parse-month [date-string] | |
"Returns the month number from the given date string." | |
(month-from-local-date-time (parse-local-date-time date-string))) | |
(defn parse-year [date-string] | |
"Returns the year from the given date string." | |
(year-from-local-date-time (parse-local-date-time date-string))) | |
(defn parse-year-month-monthname-and-day [date-string] | |
"Returns the year, month number and day of the month in a vector from the given date string." | |
[(parse-year date-string) | |
(parse-month date-string) | |
(parse-day date-string) | |
(parse-month-name date-string)]) | |
(defmapop add-hash | |
"Joins the fields together separating them by : and calculates the hash of the joined string." | |
[& fields] | |
(hash-string hash-function (clojure.string/join ":" fields))) | |
(defn count-item-types [items] | |
"Given a seq of item types (I, S or G) returns the count of inventory items (I) and service items (S) | |
with the count of group items (G) split equally between inventory and service counts." | |
(let [ inventory (filter #(= "I" %) items ) | |
service (filter #(= "S" %) items) | |
group (filter #(= "G" %) items) | |
add-to-count (int (/ (count group) 2)) | |
service-count (+ (count service) (- (count group) add-to-count)) | |
inventory-count (+ (count inventory) add-to-count) | |
] | |
[ inventory-count service-count ])) | |
(defn total-item-types [items] | |
"Given a seq of items types and their prices (e.g. [\"I\" 2.50]) returns the total sales of inventory items (I) | |
and service items (S) with the total of group items (G) split equally between inventory and service." | |
(let [ inventory (filter #(= "I" (first %)) items) | |
service (filter #(= "S" (first %)) items) | |
group (filter #(= "G" (first %)) items) | |
add-to-total (/ (int (* 100 (/ (reduce #(+ %1 (second %2)) 0 group) 2))) 100) | |
service-total (+ (reduce #(+ %1 (second %2)) 0 service) add-to-total) | |
inventory-total (+ (reduce #(+ %1 (second %2)) 0 inventory) add-to-total)] | |
[ inventory-total service-total])) | |
(defn map-str [string1 string2] | |
(str string1 ":" string2)) | |
(defbufferop summary-buffer | |
"Summarize transaction data. | |
Inputs: invoiceid, patientid, clientid, price, item-type, day | |
Outputs: invoice counts, patient counts, client counts, inventory counts, service counts, inventory sales total, service sales total, patient visits" | |
[transactions] | |
(let [ [invoiceids patientids clientids prices item-types days] (reduce (partial map conj) | |
[[] [] [] [] [] []] | |
transactions) | |
patient-days (map map-str patientids days)] | |
[(flatten [(count (distinct invoiceids)) | |
(count (distinct patientids)) | |
(count (distinct clientids)) | |
(count-item-types item-types) | |
(total-item-types (map vector item-types (map #(Double/parseDouble %) prices)) ) | |
(count (distinct patient-days))])])) | |
(defn monthly-summary-buffer [transaction-tap] | |
(<- [?result-id ?projectid ?practiceid ?inventory-total ?service-total ?invoice-count ?patient-visits ?client-count ?patient-count ?month-name ?month ?year ?inventory-counts ?service-counts] | |
(transaction-tap :#> 11 {0 ?practiceid 1 ?invoiceid 2 ?clientid 3 ?patientid 4 ?txn-date 6 ?price 7 ?item-type}) | |
(valid-date? ?txn-date) | |
(parse-year-month-monthname-and-day ?txn-date :> ?year ?month ?day ?month-name) | |
(add-hash project-id ?practiceid ?year ?month :> ?result-id) | |
(identity project-id :> ?projectid) | |
(summary-buffer ?invoiceid ?patientid ?clientid ?price ?item-type ?day :> ?invoice-count ?patient-count ?client-count ?inventory-counts ?service-counts ?inventory-total ?service-total ?patient-visits))) | |
(defn hfs-text-delimited | |
"Creates an hfs tap using TextDelimited scheme. Delimiter defaults to pipe (|) if not supplied" | |
([path fields] (hfs-text-delimited path fields "|")) | |
([path fields delimiter] (tap/hfs-tap (TextDelimited. fields delimiter) path :sinkmode :replace))) | |
(def output-field-names { :monthly-summary ["?result-id" "?projectid" "?practiceid" "?inventory-total" | |
"?service-total" "?invoice-count" "?patient-visits" "?client-count" | |
"?patient-count" "?month-name" "?month" "?year" "?inventory-counts" | |
"?service-counts"]}) | |
(def monthly-summary-output-fields (w/fields (output-field-names :monthly-summary))) | |
(defn monthly-summary-text-sink [path] (hfs-text-delimited path monthly-summary-output-fields)) | |
(defn -main [] | |
(?- (monthly-summary-text-sink "/tmp/monthlysummary") (monthly-summary-buffer (transaction-test)))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
You'll need the following dependencies in your project.clj:
[joda-time/joda-time "2.1"]
[cascalog "1.9.0"]
[com.google.guava/guava "11.0.2"]
[cascading/cascading-core "2.0.2[cascading/cascading-hadoop "2.0.2"]