Skip to content

Instantly share code, notes, and snippets.

@dkincaid
Created July 13, 2012 01:13
Show Gist options
  • Save dkincaid/3102138 to your computer and use it in GitHub Desktop.
Save dkincaid/3102138 to your computer and use it in GitHub Desktop.
Cascalog sales transaction summary
(ns transaction.queries
(:use [cascalog.api])
(:require [cascalog.ops :as c]
[cascalog.tap :as tap]
[cascalog.workflow :as w])
(:import [com.google.common.hash Hashing]
[org.joda.time.format DateTimeFormat]
[cascading.scheme.hadoop TextDelimited])
(:gen-class))
(defn transaction-test [] [
; ?practiceid ?invoiceid ?clientid ?patientid ?txn_date ?quantity ?price ?itemtype ?lineitem ?sublineitem ?transactionnumber
[ "16228" "286688" "6768" "6768-13" "2012-06-02 13:48:06.634000" "1.00" "0.00" "S" "8" "0" "865052" ]
[ "16228" "286688" "6768" "6768-13" "2012-06-02 13:48:06.634000" "1.00" "0.00" "S" "9" "0" "865052" ]
[ "16228" "286688" "6768" "6768-13" "2012-06-02 13:51:04.796000" "1.00" "0.00" "S" "10" "0" "865052" ]
[ "16228" "286689" "21221" "21221-1" "2012-06-02 14:04:39.830000" "1.00" "53.50" "R" "2" "0" "865056" ]
[ "16228" "286689" "21221" "21221-1" "2012-06-02 14:04:39.830000" "1.00" "16.75" "S" "3" "0" "865056" ]
[ "16228" "286689" "21221" "21221-1" "2012-06-02 14:04:39.830000" "1.00" "17.00" "S" "4" "0" "865056" ]
[ "16228" "286689" "21221" "21221-1" "2012-06-02 14:04:39.830000" ".02" "20.00" "I" "5" "0" "865056" ]
[ "16228" "286689" "21221" "21221-1" "2012-06-02 14:04:39.830000" "1.00" "0.00" "S" "6" "0" "865056" ]
[ "16228" "286689" "21221" "21221-1" "2012-06-02 14:04:39.830000" "1.00" "0.00" "S" "7" "0" "865056" ]
[ "16228" "286689" "21221" "21221-1" "2012-06-02 14:12:30.781000" "1.00" "0.00" "S" "8" "0" "865056" ]
[ "12595" "285956" "28101" "28101-5" "2012-06-02 13:32:35.424000" "1.00" "19.50" "S" "2" "0" "769460" ]
[ "12595" "285956" "28101" "28101-5" "2012-06-02 13:32:35.424000" "1.00" "42.50" "S" "3" "0" "769460" ]
[ "12595" "285956" "28101" "28101-5" "2012-06-02 13:32:35.424000" "1.00" "7.81" "I" "4" "0" "769460" ]
[ "12595" "285956" "28101" "28101-5" "2012-06-02 13:32:35.424000" "1.00" "0.00" "G" "5" "0" "769460" ]
[ "12595" "285956" "28101" "28101-5" "2012-06-02 13:34:40.395000" "1.00" "36.00" "S" "5" "1" "769460" ]
[ "12595" "285956" "28101" "28101-5" "2012-06-02 13:32:35.424000" "1.00" "1.50" "S" "5" "2" "769460" ]
[ "12595" "285956" "28101" "28101-5" "2012-06-02 13:32:35.424000" "1.00" "34.00" "S" "6" "0" "769460" ]
[ "12595" "285956" "28101" "28101-5" "2012-06-02 13:32:35.424000" "1.00" "17.60" "S" "7" "0" "769460" ]
[ "12595" "285956" "28101" "28101-5" "2012-06-02 13:35:37.662000" "1.00" "20.50" "I" "8" "0" "769460" ]
[ "12595" "285957" "21003" "21003-5" "2012-07-02 13:29:16.264000" "1.00" "39.00" "S" "7" "0" "769458" ]
[ "12595" "285957" "21003" "21003-5" "2012-07-02 13:29:20.608000" "1.00" "22.00" "S" "8" "0" "769458" ]
[ "12595" "285957" "21003" "21003-5" "2012-07-02 13:29:31.952000" "1.00" "24.00" "I" "9" "0" "769458" ]
[ "12595" "285957" "21003" "21003-5" "2012-07-02 13:30:23.454000" "2.00" "41.50" "I" "10" "0" "769458" ]
[ "12595" "285957" "21003" "21003-5" "2012-07-02 13:31:14.830000" "28.00" "25.86" "I" "11" "0" "769458" ]
[ "12595" "285957" "21003" "21003-6" "2012-07-02 13:33:09.442000" "1.00" "27.20" "I" "10" "0" "769458" ]
[ "12595" "285958" "28177" "28177-1" "2012-07-02 14:09:03.427000" "1.00" "39.00" "S" "8" "0" "769463" ]
[ "12595" "285958" "28177" "28177-1" "****-07-02 14:09:29.318000" "1.00" "34.00" "S" "9" "0" "769463" ]
[ "12595" "285958" "28177" "28177-1" "2012-07-02 14:10:12.928000" "1.00" "24.00" "I" "10" "0" "769463" ]
[ "12595" "285958" "28177" "28177-1" "2012-07-02 14:10:20.521000" "1.00" "19.00" "I" "11" "0" "769463" ]
[ "12595" "285958" "28177" "28177-1" "2012-07-02 14:10:26.287000" "1.00" "0.00" "S" "12" "0" "769463" ]
[ "12595" "285958" "28177" "28177-1" "2012-07-02 14:10:42.444000" "1.00" "0.00" "S" "13" "0" "769463" ]
[ "12595" "285958" "28177" "28177-1" "2012-09-02 14:10:59.163000" "14.00" "16.06" "I" "14" "0" "769463" ]
])
(def project-id 100)
(def hash-function (Hashing/murmur3_128))
(defn hash-string [hashfunc string]
"Calculate the hash value of the given string using the given com.google.common.hash.HashFunction"
(. (. hashfunc hashBytes (. string getBytes)) asLong))
(defn valid-date?
"Check for a valid date format"
[date-string]
(let [re #"\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d\.\d\d\d\d\d\d"]
(if (re-find re date-string)
true
false)))
(def date-formatter (DateTimeFormat/forPattern "yyyy-MM-dd HH:mm:ss.SSSSSS"))
(defn parse-local-date-time [date]
"Returns a Joda Time LocalDateTime object from the given string using the date-formatter format."
(. date-formatter parseLocalDateTime date))
(defn extract-date-part [local-date-time format]
"Extracts a part of a date using the given format."
(. local-date-time toString format))
(defn day-from-local-date-time [date-time]
"Returns the day of the month from the given Joda Time LocalDateTime object."
(extract-date-part date-time "dd"))
(defn month-from-local-date-time [date-time]
"Returns the month number from the given Joda Time LocalDateTime object."
(. date-time getMonthOfYear))
(defn year-from-local-date-time [date-time]
"Returns the year from the given Joda Time LocalDateTime object."
(. date-time getYear))
(defn month-name-from-local-date-time [date-time]
"Returns the month name form the given Joda Time LocalDateTime object."
(extract-date-part date-time "MMMM"))
(defn parse-month-name [date-string]
"Returns the month name from the given date string."
(month-name-from-local-date-time (parse-local-date-time date-string)))
(defn parse-day [date-string]
"Returns the day of the month from the given date string."
(day-from-local-date-time (parse-local-date-time date-string)))
(defn parse-month [date-string]
"Returns the month number from the given date string."
(month-from-local-date-time (parse-local-date-time date-string)))
(defn parse-year [date-string]
"Returns the year from the given date string."
(year-from-local-date-time (parse-local-date-time date-string)))
(defn parse-year-month-monthname-and-day [date-string]
"Returns the year, month number and day of the month in a vector from the given date string."
[(parse-year date-string)
(parse-month date-string)
(parse-day date-string)
(parse-month-name date-string)])
(defmapop add-hash
"Joins the fields together separating them by : and calculates the hash of the joined string."
[& fields]
(hash-string hash-function (clojure.string/join ":" fields)))
(defn count-item-types [items]
"Given a seq of item types (I, S or G) returns the count of inventory items (I) and service items (S)
with the count of group items (G) split equally between inventory and service counts."
(let [ inventory (filter #(= "I" %) items )
service (filter #(= "S" %) items)
group (filter #(= "G" %) items)
add-to-count (int (/ (count group) 2))
service-count (+ (count service) (- (count group) add-to-count))
inventory-count (+ (count inventory) add-to-count)
]
[ inventory-count service-count ]))
(defn total-item-types [items]
"Given a seq of items types and their prices (e.g. [\"I\" 2.50]) returns the total sales of inventory items (I)
and service items (S) with the total of group items (G) split equally between inventory and service."
(let [ inventory (filter #(= "I" (first %)) items)
service (filter #(= "S" (first %)) items)
group (filter #(= "G" (first %)) items)
add-to-total (/ (int (* 100 (/ (reduce #(+ %1 (second %2)) 0 group) 2))) 100)
service-total (+ (reduce #(+ %1 (second %2)) 0 service) add-to-total)
inventory-total (+ (reduce #(+ %1 (second %2)) 0 inventory) add-to-total)]
[ inventory-total service-total]))
(defn map-str [string1 string2]
(str string1 ":" string2))
(defbufferop summary-buffer
"Summarize transaction data.
Inputs: invoiceid, patientid, clientid, price, item-type, day
Outputs: invoice counts, patient counts, client counts, inventory counts, service counts, inventory sales total, service sales total, patient visits"
[transactions]
(let [ [invoiceids patientids clientids prices item-types days] (reduce (partial map conj)
[[] [] [] [] [] []]
transactions)
patient-days (map map-str patientids days)]
[(flatten [(count (distinct invoiceids))
(count (distinct patientids))
(count (distinct clientids))
(count-item-types item-types)
(total-item-types (map vector item-types (map #(Double/parseDouble %) prices)) )
(count (distinct patient-days))])]))
(defn monthly-summary-buffer [transaction-tap]
(<- [?result-id ?projectid ?practiceid ?inventory-total ?service-total ?invoice-count ?patient-visits ?client-count ?patient-count ?month-name ?month ?year ?inventory-counts ?service-counts]
(transaction-tap :#> 11 {0 ?practiceid 1 ?invoiceid 2 ?clientid 3 ?patientid 4 ?txn-date 6 ?price 7 ?item-type})
(valid-date? ?txn-date)
(parse-year-month-monthname-and-day ?txn-date :> ?year ?month ?day ?month-name)
(add-hash project-id ?practiceid ?year ?month :> ?result-id)
(identity project-id :> ?projectid)
(summary-buffer ?invoiceid ?patientid ?clientid ?price ?item-type ?day :> ?invoice-count ?patient-count ?client-count ?inventory-counts ?service-counts ?inventory-total ?service-total ?patient-visits)))
(defn hfs-text-delimited
"Creates an hfs tap using TextDelimited scheme. Delimiter defaults to pipe (|) if not supplied"
([path fields] (hfs-text-delimited path fields "|"))
([path fields delimiter] (tap/hfs-tap (TextDelimited. fields delimiter) path :sinkmode :replace)))
(def output-field-names { :monthly-summary ["?result-id" "?projectid" "?practiceid" "?inventory-total"
"?service-total" "?invoice-count" "?patient-visits" "?client-count"
"?patient-count" "?month-name" "?month" "?year" "?inventory-counts"
"?service-counts"]})
(def monthly-summary-output-fields (w/fields (output-field-names :monthly-summary)))
(defn monthly-summary-text-sink [path] (hfs-text-delimited path monthly-summary-output-fields))
(defn -main []
(?- (monthly-summary-text-sink "/tmp/monthlysummary") (monthly-summary-buffer (transaction-test))))
@dkincaid
Copy link
Author

You'll need the following dependencies in your project.clj:
[joda-time/joda-time "2.1"]
[cascalog "1.9.0"]
[com.google.guava/guava "11.0.2"]
[cascading/cascading-core "2.0.2[cascading/cascading-hadoop "2.0.2"]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment