Last active
September 5, 2018 16:30
-
-
Save DazWilkin/73333f0888cad18fe582395b0894e727 to your computer and use it in GitHub Desktop.
Cloud Storage "exploder" #2
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package somepackage // import "github.com/DazWilkin/go-functions/exploder" | |
import ( | |
"context" | |
"io" | |
"log" | |
"os" | |
"cloud.google.com/go/logging" | |
"cloud.google.com/go/storage" | |
"github.com/krolaw/zipstream" | |
"google.golang.org/genproto/googleapis/api/monitoredres" | |
) | |
const ( | |
logName = "go-functions" | |
resourceType = "cloud_function" | |
) | |
//Event represents Cloud Functions' incoming data struct | |
type Event struct { | |
Bucket string `json:"bucket"` | |
Name string `json:"name"` | |
ContentType string `json:"contentType"` | |
Crc32c string `json:"crc32c"` | |
Etag string `json:"etag"` | |
Generation string `json:"generation"` | |
ID string `json:"id"` | |
Kind string `json:"kind"` | |
Md5Hash string `json:"md5Hash"` | |
MediaLink string `json:"mediaLink"` | |
Metageneration string `json:"metageneration"` | |
SelfLink string `json:"selfLink"` | |
Size string `json:"size"` | |
StorageClass string `json:"storageClass"` | |
TimeCreated string `json:"timeCreated"` | |
TimeStorageClassUpdated string `json:"timeStorageClassUpdated"` | |
Updated string `json:"updated"` | |
} | |
type project struct { | |
id string | |
function function | |
} | |
type function struct { | |
name string | |
region string | |
sink string | |
} | |
// Exploder unzips zip file in a source bucket creating files (with structure) in destination bucket | |
func Exploder(ctx context.Context, event Event) error { | |
// BUCKET_DST must be set and not empty | |
sink := os.Getenv("BUCKET_DST") | |
if len(sink) == 0 { | |
log.Fatal("Environment variable `BUCKET_DST` must be set and not empty.") | |
} | |
// The remain Environment variables are system-provided and so will have values | |
p := project{ | |
id: os.Getenv("GCLOUD_PROJECT"), | |
function: function{ | |
name: os.Getenv("FUNCTION_NAME"), | |
region: os.Getenv("FUNCTION_REGION"), | |
sink: sink, | |
}, | |
} | |
gcsClient, err := storage.NewClient(ctx) | |
if err != nil { | |
log.Fatal(err) | |
} | |
defer gcsClient.Close() | |
logClient, err := logging.NewClient(ctx, p.id) | |
if err != nil { | |
log.Fatal(err) | |
} | |
defer logClient.Close() | |
monitoredResource := monitoredres.MonitoredResource{ | |
Type: resourceType, | |
Labels: map[string]string{ | |
"function_name": p.function.name, | |
"region": p.function.region, | |
}, | |
} | |
commonResource := logging.CommonResource(&monitoredResource) | |
logger := logClient.Logger(logName, commonResource).StandardLogger(logging.Debug) | |
// Now we've a logger, we can log details of the triggering event | |
logger.Printf("[Exploder] Received: (%s) %s", event.Bucket, event.Name) | |
// Determined by the Function's trigger-resource | |
srcBucket := gcsClient.Bucket(event.Bucket) | |
// Statically defined in the code | |
dstBucket := gcsClient.Bucket(p.function.sink) | |
gcsSrcObject := srcBucket.Object(event.Name) | |
gcsSrcReader, err := gcsSrcObject.NewReader(ctx) | |
if err != nil { | |
logger.Fatal(err) | |
} | |
zipReader := zipstream.NewReader(gcsSrcReader) | |
for { | |
fileHeader, err := zipReader.Next() | |
if err != nil { | |
if err != io.EOF { | |
logger.Fatal(err) | |
} | |
break // No more files | |
} | |
fileInfo := fileHeader.FileInfo() | |
// FileInfo Name() is the base filename only | |
// Using fileHeader.Name as this represents the full path/to/name | |
// This can then be used as the GCS Object Name | |
// Ignoring directories as these have no analog in GCS | |
if !fileInfo.IsDir() { | |
filename := fileHeader.Name | |
logger.Printf("[Exploder] File: %s", filename) | |
gcsDstObject := dstBucket.Object(filename) | |
gcsDstWriter := gcsDstObject.NewWriter(ctx) | |
_, err = io.Copy(gcsDstWriter, zipReader) | |
err := gcsDstWriter.Close() | |
if err != nil { | |
logger.Fatal(err) | |
} | |
} | |
} | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment