Last active
September 6, 2016 19:03
-
-
Save arkadijs/082f9ea89a7ca01ccce3d61ec4eb94f9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ( | |
"archive/zip" | |
"fmt" | |
awss3 "github.com/aws/aws-sdk-go/service/s3" | |
"io" | |
) | |
var s3 *awss3.S3 | |
head, err := s3.HeadObject( | |
&awss3.HeadObjectInput{ | |
Bucket: &bucket, | |
Key: &filename, | |
}) | |
if err != nil { | |
fatalf("S3 head object of `s3://%s/%s` failed: %s", bucket, filename, err.Error()) | |
} | |
if trace { | |
printf("%s: %+v", filename, head) | |
} | |
zip := zipReader(s3, bucket, filename, *head.ContentLength) | |
func readAll(p []byte, reader io.ReadCloser) (int, error) { | |
off := 0 | |
for { | |
read, err := reader.Read(p[off:]) | |
off += read | |
if err != nil { | |
if err == io.EOF && off > 0 { | |
err = nil | |
} | |
reader.Close() | |
return off, err | |
} | |
} | |
} | |
type S3Zip struct { | |
S3 *awss3.S3 | |
Bucket string | |
Filename string | |
Size int64 | |
} | |
func (z *S3Zip) ReadAt(p []byte, off int64) (int, error) { | |
requested := len(p) | |
rangeHeader := fmt.Sprintf("bytes=%d-%d", off, off+int64(requested)-1) | |
if zipTrace { | |
printf("S3 GetObject `s3://%s/%s` requested=%d; range %s", z.Bucket, z.Filename, requested, rangeHeader) | |
} | |
obj, err := z.S3.GetObject( | |
&awss3.GetObjectInput{ | |
Bucket: &z.Bucket, | |
Key: &z.Filename, | |
Range: &rangeHeader, | |
}) | |
if err != nil { | |
printf("S3 GetObject for `s3://%s/%s` failed: %s", z.Bucket, z.Filename, err.Error()) | |
return 0, err | |
} | |
read, err := readAll(p, obj.Body) | |
if zipTrace { | |
printf("S3Zip ReadAt: requested: %d; read: %d; err: %v", requested, read, err) | |
} | |
return read, err | |
} | |
const s3ZipPrefetchBufferSize int64 = 4 * 1024 * 1024 | |
type S3ZipPrefetch struct { | |
Zip S3Zip | |
buf [s3ZipPrefetchBufferSize]byte | |
off int64 | |
size int | |
} | |
func (z *S3ZipPrefetch) ReadAt(p []byte, off int64) (int, error) { | |
requested := len(p) | |
if z.size > 0 { | |
if off >= z.off && off+int64(requested) <= z.off+int64(z.size) { | |
if zipTrace { | |
printf("Zip from prefetch buffer: request: { offset: %d, requested: %d }; buffer: { offset: %d, size: %d }", off, requested, z.off, z.size) | |
} | |
copied := copy(p, z.buf[off-z.off:]) | |
var err error = nil | |
if copied < requested { | |
err = io.EOF | |
} | |
return copied, err | |
} | |
} | |
read, err := z.Zip.ReadAt(z.buf[:], off) | |
if zipTrace { | |
printf("Zip from S3: request: { offset: %d, requested: %d }; read: %d; err: %v", off, requested, read, err) | |
} | |
if err == io.EOF && read > 0 { | |
err = nil | |
} | |
if read > 0 { | |
copy(p, z.buf[:read-1]) | |
z.off = off | |
} | |
z.size = read | |
if requested < read { | |
read = requested | |
} | |
return read, err | |
} | |
func zipReader(s3 *awss3.S3, bucket string, filename string, size int64) *zip.Reader { | |
s3Zip := &S3ZipPrefetch{Zip: S3Zip{S3: s3, Bucket: bucket, Filename: filename, Size: size}} | |
reader, err := zip.NewReader(s3Zip, size) | |
if err != nil { | |
fatalf("ZIP init for `s3://%s/%s` failed: %s", bucket, filename, err.Error()) | |
} | |
return reader | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment