Skip to content

Instantly share code, notes, and snippets.

@salrashid123
Last active February 22, 2023 22:32
Show Gist options
  • Save salrashid123/3bc475128e2700cc09a878f68d715e95 to your computer and use it in GitHub Desktop.
Save salrashid123/3bc475128e2700cc09a878f68d715e95 to your computer and use it in GitHub Desktop.
using batch api for google cloud storage https://blog.salrashid.dev/articles/2022/blob_zapper/
package main
import (
"bufio"
"bytes"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"mime"
"mime/multipart"
"net/http"
"net/textproto"
"os"
"reflect"
"strings"
"sync"
"time"
storage "cloud.google.com/go/storage"
"github.com/google/uuid"
"golang.org/x/net/context"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"golang.org/x/time/rate"
"google.golang.org/api/googleapi"
"google.golang.org/api/option"
"google.golang.org/api/transport"
)
const (
increment = 100 // batch should have 100 items max https://cloud.google.com/storage/docs/batch#overview
totalFiles = 100000
// https://cloud.google.com/storage/quotas states writes/deletes should stay under 1K/second
// the rate limiter is 10/second that runs each batch. Each batch has 100 each batch, we expect
// 10*100=1K writes into the bucket
maxRequestsPerSecond float64 = 10 // "golang.org/x/time/rate"
burst int = 2
)
var (
bucketName = flag.String("bucketName", "PROJECT_ID-batch-regional-us", "BucketName")
)
type wrapped struct {
base http.RoundTripper
writer *multipart.Writer
}
// https://github.com/jfcote87/google-api-go-client/blob/master/batch/batch.go
type requestData struct {
method string
uri string
header http.Header
body []byte
}
type requestStatus int8
type Request struct {
data *requestData
status requestStatus
resultPtr interface{}
tag interface{}
}
func (r *Request) String() string {
return fmt.Sprintf("%v", r.tag)
}
type Response struct {
Result interface{}
Tag interface{}
Err error
googleapi.ServerResponse
}
func (w wrapped) RoundTrip(r *http.Request) (*http.Response, error) {
metadataHeader := textproto.MIMEHeader{}
metadataHeader.Set("Content-Type", "application/http")
id := uuid.New()
metadataHeader.Set("Content-ID", id.String())
p, err := w.writer.CreatePart(metadataHeader)
if err != nil {
return nil, err
}
part := fmt.Sprintf("%s %s HTTP/1.1\n", r.Method, r.URL.RawPath)
for h, values := range r.Header {
for _, v := range values {
part = part + fmt.Sprintf("%s: %s\n", h, v)
}
}
if r.Method == http.MethodPut || r.Method == http.MethodPost || r.Method == http.MethodPatch {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
fmt.Printf("Error reading body: %v", err)
return nil, err
}
part = part + fmt.Sprintf("Content-Length: %d\n\n", len(body))
part = part + string(body)
}
_, err = p.Write([]byte(part))
if err != nil {
return nil, err
}
if r.Method == http.MethodDelete {
return &http.Response{
StatusCode: http.StatusNoContent,
Status: "204 No Content",
}, nil
}
resp := `{}`
return &http.Response{
StatusCode: http.StatusOK,
Status: "200 OK",
Body: ioutil.NopCloser(strings.NewReader(resp)),
}, nil
}
func main() {
flag.Parse()
ctx := context.Background()
wait := new(sync.WaitGroup)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
limiter := rate.NewLimiter(rate.Limit(maxRequestsPerSecond), burst)
var files []string
batchCounter := 0
for i := 1; i <= totalFiles; i++ {
files = append(files, fmt.Sprintf("%d", i))
if i%increment == 0 {
// Await tokens, marking this error as fatal; we should not continue if the
// rate limiter fails.
if err := limiter.Wait(ctx); err != nil {
fmt.Printf("Error in rate limiter: %v", err)
return
}
// Check context for errors
if ctx.Err() != nil {
panic(ctx.Err())
}
wait.Add(1)
batchCounter++
go func(set []string, counter int) {
defer wait.Done()
fmt.Printf("Deleting filesset batch %v\n", counter)
if len(set) > 100 {
fmt.Printf("Blob fileset cannot be larger than 100; got [%d]", len(set))
return
}
for {
select {
case <-ctx.Done():
return
default:
// acquire delete operations
body := &bytes.Buffer{}
writer := multipart.NewWriter(body)
hc, err := google.DefaultClient(ctx)
if err != nil {
fmt.Printf("could not create default client: %v", err)
return
}
hc.Transport = wrapped{hc.Transport, writer}
storageClient, err := storage.NewClient(ctx,
option.WithHTTPClient(hc),
option.WithTokenSource(oauth2.StaticTokenSource(&oauth2.Token{
AccessToken: "foo",
Expiry: time.Now().Add(24 * time.Hour),
})))
if err != nil {
fmt.Printf("error creating storage.NewClient: %v\n", err)
return
}
defer storageClient.Close()
bkt := storageClient.Bucket(*bucketName)
for _, filename := range set {
obj := bkt.Object(filename)
err := obj.Delete(ctx)
if err != nil {
fmt.Printf("error fake delete %v\n", err)
return
}
// objectAttrsToUpdate := storage.ObjectAttrsToUpdate{
// Metadata: map[string]string{
// "k1": "v1",
// },
// }
// if _, err := obj.Update(ctx, objectAttrsToUpdate); err != nil {
// fmt.Printf("error updating local handler: %v\n", err)
// return
// }
}
// issue batch
c, _, err := transport.NewHTTPClient(ctx)
if err != nil {
fmt.Printf("could not create batch client %v", err)
return
}
writer.Close()
resp, err := c.Post("https://storage.googleapis.com/batch/storage/v1", "multipart/mixed; boundary="+writer.Boundary(), body)
if err != nil {
fmt.Printf("could not post storage batch request %v", err)
return
}
fmt.Printf("batch [%d], Response code: %d\n", counter, resp.StatusCode)
if resp.StatusCode != http.StatusOK {
fmt.Printf("error deleting files in batch: {%d}, with fileset [%v]", counter, set)
resp.Write(os.Stdout)
return
// or cancel all
//cancel()
}
// attempt to extract all responses
// adapted from https://github.com/jfcote87/google-api-go-client/blob/master/batch/batch.go
cType, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
if err != nil {
fmt.Printf("could not parse response content media type %v", err)
return
}
if !strings.HasPrefix(cType, "multipart/") {
fmt.Printf("BatchApi: Invalid Content Type returned %s", cType)
return
}
rq := make([]*Request, increment)
for i := 0; i < increment; i++ {
rq[i] = &Request{}
}
results, err := processBody(ctx, resp.Body, params["boundary"], rq)
if err != nil {
fmt.Printf("BatchApi: could not parse bodyin batch {%d}, with fileset [%v] with error %v", counter, set, err)
return
}
for _, v := range results {
e, ok := v.Err.(*googleapi.Error)
if ok {
fmt.Printf("Error status code %v\n", e.Message)
} else {
// i don't know how to get the response body, if any item responds with
// the following only account for the DELETE request. TODO: figure out why PATCH doesn't work
// and modify the criteria for acceptence for the response code
// if v.ServerResponse.HTTPStatusCode != http.StatusNoContent {
// // somehow get the filename back...
// fmt.Printf("Unable to delete file in batch {%d}, with fileset [%v]", counter, set)
// }
}
}
return
}
}
}(files, batchCounter)
files = []string{}
}
}
wait.Wait()
}
func processBody(ctx context.Context, rc io.Reader, boundary string, requests []*Request) (results []Response, err error) {
results = make([]Response, 0, len(requests))
mr := multipart.NewReader(rc, boundary)
// Http response from batch Do is supposed to be in the same order as results.
for idx, req := range requests {
// Check ct
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
pr, err := mr.NextPart()
if err != nil {
return nil, fmt.Errorf("Unable to parse multipart response(%d): %v", idx, err)
}
results = append(results, parseMimePart(pr, req))
}
return
}
// parseMimePart returns a Response from a mimepart. See documentation at
// https://cloud.google.com/storage/docs/json_api/v1/how-tos/batch#response-to-a-batch-request
func parseMimePart(pr *multipart.Part, rq *Request) Response {
defer pr.Close()
var err error
var res *http.Response
if pr.Header.Get("Content-Type") != "application/http" {
err = fmt.Errorf("Batch Api: Invalid Content Type: %s", pr.Header.Get("Content-Type"))
} else if res, err = http.ReadResponse(bufio.NewReader(pr), nil); err == nil {
defer res.Body.Close()
err = googleapi.CheckResponse(res)
}
if err != nil {
return ErrorResponse(rq, err)
}
result := NewResponse(res.Body, rq)
result.ServerResponse =
googleapi.ServerResponse{
HTTPStatusCode: res.StatusCode,
Header: res.Header,
}
return result
}
// NewResponse creates a response from a stream
func NewResponse(r io.Reader, rq *Request) Response {
var ix interface{}
var err error
if rq.resultPtr != nil {
// Set result to a nil pointer to prevent casting errors on return
ix = reflect.ValueOf(rq.resultPtr).Elem().Interface()
if r != nil {
err = json.NewDecoder(r).Decode(rq.resultPtr)
} else {
err = errors.New("No JSON data returned")
}
//JSON Decode successful
ix = reflect.ValueOf(rq.resultPtr).Elem().Interface()
}
return Response{
Result: ix,
Err: err,
Tag: rq.tag,
}
}
// ErrorResponse creates a response with the passed error
func ErrorResponse(rq *Request, err error) Response {
var ix interface{}
if rq.resultPtr != nil {
// Set result to a nil pointer to prevent casting errors on return
ix = reflect.ValueOf(rq.resultPtr).Elem().Interface()
}
return Response{Result: ix, Err: err, Tag: rq.tag}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment