Skip to content

Instantly share code, notes, and snippets.

@liath
Last active June 14, 2017 22:37
Show Gist options
  • Save liath/04ad06c52ffd1d714cc26940eb71d837 to your computer and use it in GitHub Desktop.
Save liath/04ad06c52ffd1d714cc26940eb71d837 to your computer and use it in GitHub Desktop.
clojure - fetch a zipped or snappy object from AWS S3 and extract it on the fly
(ns s3-fetch-compressed
"Pulls file data from S3"
(:gen-class)
(:require [amazonica.aws.s3 :as s3]
[byte-streams :as bs]
[byte-transforms :as bt]
[clojure.string :as string])
(:import (java.util.zip ZipInputStream)))
; For reference: https://github.com/kubo/snzip/blob/master/hadoop-snappy-format.c
; Snappy file header '.SNAPPY.........'
(def snappyMagicHeader (byte-array (map byte [-126 83 78 65 80 80 89 0 0 0 0 1 0 0 0 1])))
(defn unsnappyStream
"Extracts snappy files and uses byte-streams because it's EZ street"
[in]
(let [bytes (bs/to-byte-buffer in)
header (byte-array 16)]
(.get bytes header 0 16)
(if (not= 0 (bs/compare-bytes header snappyMagicHeader))
(println "Snappy Error - Malformed Header")
(loop [output []]
(if (pos? (.remaining bytes))
(let [chunkLen (.getInt bytes)
chunk (byte-array chunkLen)]
(.get bytes chunk 0 chunkLen)
(recur (conj output (bs/to-string (bt/decompress chunk :snappy)))))
(string/join output))))))
(defn unzipStream
"Extracts a zipped InputStream"
[in]
(with-open [stream (java.util.zip.ZipInputStream. in)]
(.getNextEntry stream)
(slurp stream)))
(defn fetch
"Download files and push them into the batch queue"
[bucket key creds options]
(let [s3object (s3/get-object creds {:bucket-name bucket :key key})
compression (:content-encoding (:object-metadata s3object))
body (apply (cond
(= compression "snappy-java") unsnappyStream
(= compression "zip") extractStream
:else slurp) (:input-stream s3object))]
(println body)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment