-
-
Save refset/4bdf8d0d04c518c5cb8b07d768005260 to your computer and use it in GitHub Desktop.
Retain N Checkpoints
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns excel.api.s3 | |
(:require [cognitect.aws.client.api :as aws] | |
[integrant.core :as ig] | |
[clojure.tools.logging :as log])) | |
(defmethod ig/init-key ::client | |
[_ _] | |
(create-s3-client)) | |
;; Code below may end up being a periodic background process for limiting s3 | |
;; objects | |
;; Using a retention policy is possibly undesirable as the service essentially | |
;; needs to be able to retrieve _some_ checkpoint even if's quite stale, a | |
;; better blanket policy for the long term is n most recent checkpoints | |
(defn build-object-list | |
([s3-client bucket token] | |
(build-object-list s3-client bucket nil token)) | |
([s3-client bucket prefix token] | |
(lazy-seq | |
(let [response (aws/invoke s3-client | |
{:op :ListObjectsV2 | |
:request (cond-> {:Bucket bucket} | |
token (assoc :ContinuationToken token) | |
prefix (assoc :Prefix prefix))})] | |
(if (:IsTruncated response) | |
(concat (:Contents response) (build-object-list s3-client bucket prefix (:NextContinuationToken response))) | |
(:Contents response)))))) | |
(defn list-all-objects | |
([s3-client bucket] | |
(build-object-list s3-client bucket nil)) | |
([s3-client bucket prefix] | |
(build-object-list s3-client bucket prefix nil))) | |
(defn delete-objects | |
[s3-client bucket objects] | |
(aws/invoke s3-client | |
{:op :DeleteObjects | |
:request | |
{:Bucket bucket}})) | |
(defn checkpoint-key | |
[{:keys [Key]}] | |
(re-find #"checkpoint-\d+-\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}-\d{2}:\d{2}" Key)) | |
(defn delete-all-objects | |
[s3-client bucket objects] | |
(when (seq objects) | |
(let [[target rest] (split-at 1000 objects) | |
response (aws/invoke | |
s3-client | |
{:op :DeleteObjects | |
:request | |
{:Bucket bucket | |
:Delete | |
{:Objects (map #(select-keys % [:Key]) target)}}})] | |
(if (:Error response) | |
(throw (ex-info "delete-all-objects failed" response)) | |
(log/infof "Deleted %d objects from %s" (count target) bucket)) | |
(when (seq rest) | |
(delete-all-objects s3-client bucket rest))))) | |
(def checkpoint-retention 10) | |
(defn remove-stale-checkpoints | |
[s3-client bucket] | |
(let [all-objects (list-all-objects s3-client bucket) | |
grouped-objects (->> all-objects | |
(group-by checkpoint-key) | |
(map #(hash-map :key (key %) :objects (val %))) | |
(sort-by :key)) | |
stale-objects (->> grouped-objects | |
(drop-last checkpoint-retention) | |
(mapcat :objects))] | |
(when (seq stale-objects) | |
(delete-all-objects s3-client bucket stale-objects) | |
(log/infof "Removed %d stale checkpoints" | |
(- (count grouped-objects) | |
checkpoint-retention))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment