Skip to content

Instantly share code, notes, and snippets.

@DazWilkin
Last active September 5, 2018 16:30
Show Gist options
  • Save DazWilkin/73333f0888cad18fe582395b0894e727 to your computer and use it in GitHub Desktop.
Save DazWilkin/73333f0888cad18fe582395b0894e727 to your computer and use it in GitHub Desktop.
Cloud Storage "exploder" #2
package somepackage // import "github.com/DazWilkin/go-functions/exploder"
import (
"context"
"io"
"log"
"os"
"cloud.google.com/go/logging"
"cloud.google.com/go/storage"
"github.com/krolaw/zipstream"
"google.golang.org/genproto/googleapis/api/monitoredres"
)
const (
logName = "go-functions"
resourceType = "cloud_function"
)
//Event represents Cloud Functions' incoming data struct
type Event struct {
Bucket string `json:"bucket"`
Name string `json:"name"`
ContentType string `json:"contentType"`
Crc32c string `json:"crc32c"`
Etag string `json:"etag"`
Generation string `json:"generation"`
ID string `json:"id"`
Kind string `json:"kind"`
Md5Hash string `json:"md5Hash"`
MediaLink string `json:"mediaLink"`
Metageneration string `json:"metageneration"`
SelfLink string `json:"selfLink"`
Size string `json:"size"`
StorageClass string `json:"storageClass"`
TimeCreated string `json:"timeCreated"`
TimeStorageClassUpdated string `json:"timeStorageClassUpdated"`
Updated string `json:"updated"`
}
type project struct {
id string
function function
}
type function struct {
name string
region string
sink string
}
// Exploder unzips zip file in a source bucket creating files (with structure) in destination bucket
func Exploder(ctx context.Context, event Event) error {
// BUCKET_DST must be set and not empty
sink := os.Getenv("BUCKET_DST")
if len(sink) == 0 {
log.Fatal("Environment variable `BUCKET_DST` must be set and not empty.")
}
// The remain Environment variables are system-provided and so will have values
p := project{
id: os.Getenv("GCLOUD_PROJECT"),
function: function{
name: os.Getenv("FUNCTION_NAME"),
region: os.Getenv("FUNCTION_REGION"),
sink: sink,
},
}
gcsClient, err := storage.NewClient(ctx)
if err != nil {
log.Fatal(err)
}
defer gcsClient.Close()
logClient, err := logging.NewClient(ctx, p.id)
if err != nil {
log.Fatal(err)
}
defer logClient.Close()
monitoredResource := monitoredres.MonitoredResource{
Type: resourceType,
Labels: map[string]string{
"function_name": p.function.name,
"region": p.function.region,
},
}
commonResource := logging.CommonResource(&monitoredResource)
logger := logClient.Logger(logName, commonResource).StandardLogger(logging.Debug)
// Now we've a logger, we can log details of the triggering event
logger.Printf("[Exploder] Received: (%s) %s", event.Bucket, event.Name)
// Determined by the Function's trigger-resource
srcBucket := gcsClient.Bucket(event.Bucket)
// Statically defined in the code
dstBucket := gcsClient.Bucket(p.function.sink)
gcsSrcObject := srcBucket.Object(event.Name)
gcsSrcReader, err := gcsSrcObject.NewReader(ctx)
if err != nil {
logger.Fatal(err)
}
zipReader := zipstream.NewReader(gcsSrcReader)
for {
fileHeader, err := zipReader.Next()
if err != nil {
if err != io.EOF {
logger.Fatal(err)
}
break // No more files
}
fileInfo := fileHeader.FileInfo()
// FileInfo Name() is the base filename only
// Using fileHeader.Name as this represents the full path/to/name
// This can then be used as the GCS Object Name
// Ignoring directories as these have no analog in GCS
if !fileInfo.IsDir() {
filename := fileHeader.Name
logger.Printf("[Exploder] File: %s", filename)
gcsDstObject := dstBucket.Object(filename)
gcsDstWriter := gcsDstObject.NewWriter(ctx)
_, err = io.Copy(gcsDstWriter, zipReader)
err := gcsDstWriter.Close()
if err != nil {
logger.Fatal(err)
}
}
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment