Created
July 19, 2019 22:44
-
-
Save micha/0c0fadd65d63afbf602d72634b4a395e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns adzerk.instant-counts-updater.api | |
(:require | |
[clj-statsd :as s] | |
[cheshire.core :as json] | |
[clojure.java.io :as io] | |
[clojure.string :as string] | |
[clojure.tools.logging :as log] | |
[amazonica.aws.s3 :as s3] | |
[amazonica.aws.sqs :as sqs] | |
[amazonica.aws.dynamodbv2 :as ddb] | |
[adzerk.instant-counts-updater.tx :as tx | |
:refer [deftx abort!! retry!!]] | |
[adzerk.instant-counts-updater.config :as config] | |
[adzerk.instant-counts-updater.db :as db | |
:refer [<< with-tx QUERY INSERT-MULTI! EXECUTE!]] | |
[adzerk.instant-counts-updater.util :as util | |
:refer [csharp->yyyy-mm-dd timed with-timing guard tagvec]]) | |
(:import | |
[java.sql SQLException] | |
[clojure.lang ExceptionInfo] | |
[java.util.zip GZIPInputStream] | |
[com.amazonaws AbortedException] | |
[com.amazonaws.services.dynamodbv2.model | |
InternalServerErrorException | |
TransactionCanceledException | |
TransactionInProgressException | |
ConditionalCheckFailedException | |
IdempotentParameterMismatchException | |
ProvisionedThroughputExceededException] | |
[com.amazonaws.services.s3.model AmazonS3Exception])) | |
;; setup ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
(s/setup "127.0.0.1" 8125) | |
(declare ^:dynamic *tags*) | |
(declare ^:dynamic *on-success*) | |
(declare records-in-queue?) | |
(def ingesters (atom 0)) | |
(def lock-table config/IC_FILE_LOCK_TABLE) | |
(def ingest-queue config/IC_INGEST_QUEUE_URL) | |
(def max-backlog (read-string config/IC_MAX_RECORD_BACKLOG)) | |
;; datadog helpers ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
(defmacro with-api-context | |
[metric paused? & body] | |
`(binding [*tags* (atom {:ic.entity "none"}), *on-success* []] | |
(with-let [s# (some->> (try ~@body | |
(catch AbortedException ex# | |
(when-not (~paused?) (log/error ex#) :aborted)) | |
(catch Throwable ex# (log/error ex#) :error)))] | |
(s/increment ~metric 1 1 (tagvec (assoc @*tags* :ic.status s#))) | |
(when (= :success s#) (doseq [f# *on-success*] (f)))))) | |
(defn on-success | |
[f] | |
(set! *on-success* (conj *on-success* f))) | |
;; misc ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
(defn post-switchover? | |
"Responsibility for writing records to dynamodb is determined by the switch- | |
over date and the timestamp in the filename. If the filename's timestamp is | |
not before the agreed-upon switchover date we handle it, otherwise it's the | |
responsibility of the old sindri service." | |
[filename] | |
(some-> (config/parse-filename filename) | |
:time | |
(.compareTo config/IC_SWITCHOVER_TIME) | |
pos?)) | |
;; S3 helpers ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
(defn s3-input-stream | |
"Given an s3 get result returns the content input stream, gunzipped." | |
[{:keys [input-stream] {:keys [content-encoding]} :object-metadata}] | |
(if (not= content-encoding "gzip") input-stream (GZIPInputStream. input-stream))) | |
(defn s3-reader | |
"..." | |
[bucket key] | |
(try (io/reader (s3-input-stream (s3/get-object :bucket-name bucket :key key))) | |
(catch AmazonS3Exception ex2 | |
(if (= "NoSuchKey" (.getErrorCode ex2)) (abort!! :notfound) (retry!! :s3error ex2))))) | |
;; dynamo helpers ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
(defn dynamo-lock-complete? | |
[table-name s3path] | |
(try (= -1 (:ic_lease (:item (ddb/get-item :consistent-read true | |
:table-name table-name | |
:key {:s3path s3path})))) | |
(catch ProvisionedThroughputExceededException _ (retry!! :ddbthrottled)))) | |
(defn dynamo-lock-commit! | |
[table-name s3path tx-uid tx-expire] | |
(try (ddb/update-item | |
:table-name table-name | |
:key {:s3path s3path} | |
:condition-expression "ic_claimed = :v1 AND ic_lease = :v2" | |
:update-expression "SET ic_lease = :v3" | |
:expression-attribute-values {":v1" tx-uid ":v2" tx-expire ":v3" -1}) | |
(catch ConditionalCheckFailedException _ (retry!! :txfailed)) | |
(catch ProvisionedThroughputExceededException _ (retry!! :ddbthrottled)))) | |
(defn dynamo-lock-begin! | |
[table-name s3path & {:keys [ttl] :or {ttl (* 1000 60 5)}}] | |
(if (dynamo-lock-complete? table-name s3path) | |
(abort!! :txcomplete) | |
(let [now (System/currentTimeMillis) | |
uid (util/thread-uuid) | |
expire (+ now ttl)] | |
(try (ddb/update-item | |
:table-name table-name | |
:key {:s3path s3path} | |
:condition-expression "attribute_not_exists(ic_lease) OR ic_lease BETWEEN :v0 AND :v2" | |
:update-expression "SET ic_claimed = :v1, ic_lease = :v3" | |
:expression-attribute-values {":v0" 0 ":v1" uid ":v2" now ":v3" expire}) | |
#(dynamo-lock-commit! table-name s3path uid expire) | |
(catch ConditionalCheckFailedException _ (retry!! :txfailed)) | |
(catch ProvisionedThroughputExceededException _ (retry!! :ddbthrottled)))))) | |
(defn verify-not-duplicate! | |
[impression-id] | |
(let [nowtime (int (/ (System/currentTimeMillis) 1000)) | |
expires (+ nowtime (* 24 60 60))] | |
(try (ddb/put-item | |
:table-name config/IC_CLICK_DEDUP | |
:condition-expression "attribute_not_exists(Expires) OR Expires < :v1" | |
:expression-attribute-values {":v1" nowtime} | |
:item {:UniqueId impression-id :Expires expires}) | |
(catch ConditionalCheckFailedException _ (s/increment :ic.clickdeduped 1)) | |
(catch ProvisionedThroughputExceededException _ (retry!! :ddbthrottled))))) | |
(defn update-dynamo-counts! | |
[{:keys [token entitytype entityid createdon impressions clicks conversions passbacks revenue] :as job}] | |
(let [tx-expr "ADD Impressions :v1, Clicks :v2, Conversions :v3, Passbacks :v4, Revenue :v5" | |
tx-vals {":v1" impressions ":v2" clicks ":v3" conversions ":v4" passbacks ":v5" revenue}] | |
(try ((timed ddb/transact-write-items :ic.ddbtransaction.time) | |
:client-request-token token | |
:transact-items (case entitytype | |
"ad" | |
[{:update {:table-name config/IC_AD_TOTALS | |
:key {:PassCreativeId entityid} | |
:update-expression tx-expr | |
:expression-attribute-values tx-vals}} | |
{:update {:table-name config/IC_AD_TOTALS_DAILY | |
:key {:PassCreativeId entityid :Date createdon} | |
:update-expression tx-expr | |
:expression-attribute-values tx-vals}}] | |
"adpart" | |
(let [[adid part] (string/split entityid #":" 2) | |
now-ms (System/currentTimeMillis)] | |
[{:update {:table-name config/IC_AD_PARTITION_TOTALS | |
:key {:Ad adid :Partition part} | |
:update-expression (str tx-expr " SET Precedence = :w1") | |
:expression-attribute-values (merge tx-vals {":w1" now-ms})}} | |
{:update {:table-name config/IC_AD_PARTITION_TOTALS_DAILY | |
:key {:AdPartition entityid :Date createdon} | |
:update-expression tx-expr | |
:expression-attribute-values tx-vals}}]) | |
"flight" | |
[{:update {:table-name config/IC_FLIGHT_TOTALS | |
:key {:PassId entityid} | |
:update-expression tx-expr | |
:expression-attribute-values tx-vals}} | |
{:update {:table-name config/IC_FLIGHT_TOTALS_DAILY | |
:key {:PassId entityid :Date createdon} | |
:update-expression tx-expr | |
:expression-attribute-values tx-vals}}] | |
"campaign" | |
[{:update {:table-name config/IC_CAMPAIGN_TOTALS | |
:key {:CampaignId entityid} | |
:update-expression tx-expr | |
:expression-attribute-values tx-vals}} | |
{:update {:table-name config/IC_CAMPAIGN_TOTALS_DAILY | |
:key {:CampaignId entityid :Date createdon} | |
:update-expression tx-expr | |
:expression-attribute-values tx-vals}}] | |
"advertiser" | |
[{:update {:table-name config/IC_ADVERTISER_TOTALS | |
:key {:AdvertiserId entityid} | |
:update-expression tx-expr | |
:expression-attribute-values tx-vals}} | |
{:update {:table-name config/IC_ADVERTISER_TOTALS_DAILY | |
:key {:AdvertiserId entityid :Date createdon} | |
:update-expression tx-expr | |
:expression-attribute-values tx-vals}}] | |
"network" | |
[{:update {:table-name config/IC_NETWORK_TOTALS_DAILY | |
:key {:NetworkId entityid :Date createdon} | |
:update-expression tx-expr | |
:expression-attribute-values tx-vals}}])) | |
(catch ProvisionedThroughputExceededException _ (retry!! :ddbthrottled)) | |
(catch TransactionCanceledException ex | |
(let [reasons (set (map #(.getCode %) (.getCancellationReasons ex)))] | |
(cond (reasons "ProvisionedThroughputExceeded") (retry!! :ddbthrottled) | |
(reasons "ThrottlingError") (retry!! :ddbthrottled) | |
(reasons "TransactionConflict") (retry!! :ddbtxconflict) | |
:else (retry!! :ddberror ex))))))) | |
;; postgres helpers ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
(defn backlog-exceeded? | |
[] | |
(not (try (QUERY ["SELECT * FROM check_backlog(?::int)" max-backlog]) | |
(catch SQLException ex | |
(when (not= "BCKLG" (.getSQLState ex)) (throw ex)))))) | |
(defn still-working? | |
[] | |
(try (not (QUERY ["SELECT * FROM check_working()"])) | |
(catch SQLException ex | |
(or (= "WORKN" (.getSQLState ex)) (throw ex))))) | |
(defn start-sql-job! | |
[] | |
(try (first (QUERY ["SELECT * FROM get_job()"])) | |
(catch SQLException ex (retry!! :sqlerror ex)))) | |
(defn complete-sql-job! | |
[{:keys [id]}] | |
(try (first (QUERY ["SELECT * FROM complete_job(?::bigint)" id])) | |
(catch SQLException ex (retry!! :sqlerror ex)))) | |
(defn insert-and-aggregate! | |
[rows] | |
(try (let [tmp-table (str "tmp" (.getId (Thread/currentThread)))] | |
(QUERY ["SELECT * FROM create_temp_table(?::text)" tmp-table]) | |
(INSERT-MULTI! tmp-table rows) | |
(QUERY ["SELECT * FROM update_initial_aggregated_counts(?::text)" tmp-table])) | |
(catch SQLException ex (retry!! :sqlerror ex)))) | |
(defn valid-event? | |
[d] | |
(let [event? (= (:Meta:schema d) "event") | |
not-click? (not (contains? d :ClickCount)) | |
correction-id (:IsCorrection d) | |
valid-ua? (:IsValidUA d false) | |
id (:ImpressionId d) | |
unique-id (if correction-id (str id ":" correction-id) id) | |
unique-click? (delay (verify-not-duplicate! unique-id))] | |
(and event? (or not-click? (and valid-ua? @unique-click?))))) | |
(defn make-rows | |
[events counts text] | |
(if-let [d (guard (json/parse-string text true))] | |
(when (valid-event? d) | |
(let [part (when-let [p (:EcpmPartition d)] | |
(str (:CreativePassId d) ":" p)) | |
base (merge {:impressions (:ImpressionCount d 0) | |
:clicks (:ClickCount d 0) | |
:conversions (:ConversionCount d 0) | |
:revenue (:Revenue d 0) | |
:passbacks 0 | |
:createdon (csharp->yyyy-mm-dd (:CreatedOn d))} | |
(when (= (:EventId d) 500) {:impressions -1 :passbacks 1})) | |
make-row #(merge base {:entitytype %1 :entityid %2})] | |
(swap! events inc) | |
(swap! counts #(+ % (if part 6 5))) | |
[(when part (make-row "adpart" part)) | |
(make-row "ad" (:CreativePassId d)) | |
(make-row "network" (:NetworkId d)) | |
(make-row "flight" (:PassId d)) | |
(make-row "campaign" (:CampaignId d)) | |
(make-row "advertiser" (:BrandId d))])) | |
(do (s/increment :ic.json.error) | |
(log/errorf "could not parse json: %s" text)))) | |
(defn ingest-records! | |
[lines] | |
(with-timing :ic.ingestrecords.time | |
(let [events (atom 0), counts (atom 0)] | |
(some->> (seq lines) | |
(mapcat (partial make-rows events counts)) | |
(filter identity) | |
(seq) | |
(#(do (insert-and-aggregate! %) {:events @events :counts @counts})))))) | |
;; SQS helpers ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
(defn get-sqs-job! | |
[queue-url] | |
(try (when-let | |
[{:keys [attributes body] :as job} | |
(first (:messages (sqs/receive-message | |
:queue-url queue-url | |
:wait-time-seconds 20 | |
:max-number-of-messages 1)))] | |
(assoc job :body (guard (json/parse-string body true)) :queue-url queue-url)) | |
(catch Throwable ex (retry!! :sqserror ex)))) | |
(defn complete-sqs-job! | |
[job] | |
(try (sqs/delete-message job) | |
(catch Throwable ex (retry!! :sqserror ex)))) | |
;; try-with-resources macros ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
(defmacro with-sql-tx | |
[& body] | |
`(try (with-tx :read-committed ~@body) | |
(catch SQLException ex# (log/error ex#) :sqlerror))) | |
(deftx with-dynamo-lock | |
(begin [bucket key] (dynamo-lock-begin! lock-table (str bucket "/" key))) | |
(commit [dynamo-lock-commit! _] (dynamo-lock-commit!))) | |
(deftx with-update-job | |
(begin [] (start-sql-job!)) | |
(commit [job _] (complete-sql-job! job))) | |
(deftx with-ingest-job | |
(begin [] (get-sqs-job! ingest-queue)) | |
(commit [job _] (complete-sqs-job! job))) | |
(deftx with-ingesters | |
(begin [] (swap! ingesters inc)) | |
(commit [_ _] (swap! ingesters dec)) | |
(rollback [_ _] (swap! ingesters dec))) | |
;; API ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
(defn records-in-queue? [] (with-tx :read-committed (still-working?))) | |
(defn files-ingesting? [] (< 0 @ingesters)) | |
(defn status-report [] (with-tx :read-committed (QUERY ["SELECT * FROM status_report"]))) | |
(defn reset-stats! [] (with-tx :read-committed (QUERY ["SELECT * FROM reset_stats()"]))) | |
(defn update-dynamo! | |
[paused?] | |
(with-api-context :ic.update paused? | |
(when-not (paused?) | |
(with-sql-tx | |
(with-update-job :as {:keys [entitytype] :as job} | |
(swap! *tags* assoc :ic.entity entitytype) | |
(update-dynamo-counts! job)))))) | |
(defn s3-download-and-ingest! | |
[paused?] | |
(with-api-context :ic.ingest paused? | |
(if (paused?) | |
(when (zero? @ingesters) (reset-stats!)) | |
(with-sql-tx | |
(when-not (backlog-exceeded?) | |
(with-ingest-job :as {{:keys [bucket key]} :body} | |
(on-success #(log/info :ingested bucket key)) | |
(with-ingesters :as _ | |
(when-not (and bucket key) (abort!! :malformed)) | |
(when-not (post-switchover? key) (abort!! :switchover)) | |
(with-dynamo-lock bucket key :as _ | |
(with-open [rdr (s3-reader bucket key)] | |
(let [counts (ingest-records! (line-seq rdr))] | |
(s/increment :ic.ingestcounts (:counts counts)) | |
(s/increment :ic.ingestevents (:events counts)))))))))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment