Last active
June 14, 2017 22:37
-
-
Save liath/04ad06c52ffd1d714cc26940eb71d837 to your computer and use it in GitHub Desktop.
clojure - fetch a zipped or snappy object from AWS S3 and extract it on the fly
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns s3-fetch-compressed | |
"Pulls file data from S3" | |
(:gen-class) | |
(:require [amazonica.aws.s3 :as s3] | |
[byte-streams :as bs] | |
[byte-transforms :as bt] | |
[clojure.string :as string]) | |
(:import (java.util.zip ZipInputStream))) | |
; For reference: https://github.com/kubo/snzip/blob/master/hadoop-snappy-format.c | |
; Snappy file header '.SNAPPY.........' | |
(def snappyMagicHeader (byte-array (map byte [-126 83 78 65 80 80 89 0 0 0 0 1 0 0 0 1]))) | |
(defn unsnappyStream | |
"Extracts snappy files and uses byte-streams because it's EZ street" | |
[in] | |
(let [bytes (bs/to-byte-buffer in) | |
header (byte-array 16)] | |
(.get bytes header 0 16) | |
(if (not= 0 (bs/compare-bytes header snappyMagicHeader)) | |
(println "Snappy Error - Malformed Header") | |
(loop [output []] | |
(if (pos? (.remaining bytes)) | |
(let [chunkLen (.getInt bytes) | |
chunk (byte-array chunkLen)] | |
(.get bytes chunk 0 chunkLen) | |
(recur (conj output (bs/to-string (bt/decompress chunk :snappy))))) | |
(string/join output)))))) | |
(defn unzipStream | |
"Extracts a zipped InputStream" | |
[in] | |
(with-open [stream (java.util.zip.ZipInputStream. in)] | |
(.getNextEntry stream) | |
(slurp stream))) | |
(defn fetch | |
"Download files and push them into the batch queue" | |
[bucket key creds options] | |
(let [s3object (s3/get-object creds {:bucket-name bucket :key key}) | |
compression (:content-encoding (:object-metadata s3object)) | |
body (apply (cond | |
(= compression "snappy-java") unsnappyStream | |
(= compression "zip") extractStream | |
:else slurp) (:input-stream s3object))] | |
(println body))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment