Last active
February 22, 2023 22:32
-
-
Save salrashid123/3bc475128e2700cc09a878f68d715e95 to your computer and use it in GitHub Desktop.
using batch api for google cloud storage https://blog.salrashid.dev/articles/2022/blob_zapper/
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"bytes" | |
"encoding/json" | |
"errors" | |
"flag" | |
"fmt" | |
"io" | |
"io/ioutil" | |
"mime" | |
"mime/multipart" | |
"net/http" | |
"net/textproto" | |
"os" | |
"reflect" | |
"strings" | |
"sync" | |
"time" | |
storage "cloud.google.com/go/storage" | |
"github.com/google/uuid" | |
"golang.org/x/net/context" | |
"golang.org/x/oauth2" | |
"golang.org/x/oauth2/google" | |
"golang.org/x/time/rate" | |
"google.golang.org/api/googleapi" | |
"google.golang.org/api/option" | |
"google.golang.org/api/transport" | |
) | |
const ( | |
increment = 100 // batch should have 100 items max https://cloud.google.com/storage/docs/batch#overview | |
totalFiles = 100000 | |
// https://cloud.google.com/storage/quotas states writes/deletes should stay under 1K/second | |
// the rate limiter is 10/second that runs each batch. Each batch has 100 each batch, we expect | |
// 10*100=1K writes into the bucket | |
maxRequestsPerSecond float64 = 10 // "golang.org/x/time/rate" | |
burst int = 2 | |
) | |
var ( | |
bucketName = flag.String("bucketName", "PROJECT_ID-batch-regional-us", "BucketName") | |
) | |
type wrapped struct { | |
base http.RoundTripper | |
writer *multipart.Writer | |
} | |
// https://github.com/jfcote87/google-api-go-client/blob/master/batch/batch.go | |
type requestData struct { | |
method string | |
uri string | |
header http.Header | |
body []byte | |
} | |
type requestStatus int8 | |
type Request struct { | |
data *requestData | |
status requestStatus | |
resultPtr interface{} | |
tag interface{} | |
} | |
func (r *Request) String() string { | |
return fmt.Sprintf("%v", r.tag) | |
} | |
type Response struct { | |
Result interface{} | |
Tag interface{} | |
Err error | |
googleapi.ServerResponse | |
} | |
func (w wrapped) RoundTrip(r *http.Request) (*http.Response, error) { | |
metadataHeader := textproto.MIMEHeader{} | |
metadataHeader.Set("Content-Type", "application/http") | |
id := uuid.New() | |
metadataHeader.Set("Content-ID", id.String()) | |
p, err := w.writer.CreatePart(metadataHeader) | |
if err != nil { | |
return nil, err | |
} | |
part := fmt.Sprintf("%s %s HTTP/1.1\n", r.Method, r.URL.RawPath) | |
for h, values := range r.Header { | |
for _, v := range values { | |
part = part + fmt.Sprintf("%s: %s\n", h, v) | |
} | |
} | |
if r.Method == http.MethodPut || r.Method == http.MethodPost || r.Method == http.MethodPatch { | |
body, err := ioutil.ReadAll(r.Body) | |
if err != nil { | |
fmt.Printf("Error reading body: %v", err) | |
return nil, err | |
} | |
part = part + fmt.Sprintf("Content-Length: %d\n\n", len(body)) | |
part = part + string(body) | |
} | |
_, err = p.Write([]byte(part)) | |
if err != nil { | |
return nil, err | |
} | |
if r.Method == http.MethodDelete { | |
return &http.Response{ | |
StatusCode: http.StatusNoContent, | |
Status: "204 No Content", | |
}, nil | |
} | |
resp := `{}` | |
return &http.Response{ | |
StatusCode: http.StatusOK, | |
Status: "200 OK", | |
Body: ioutil.NopCloser(strings.NewReader(resp)), | |
}, nil | |
} | |
func main() { | |
flag.Parse() | |
ctx := context.Background() | |
wait := new(sync.WaitGroup) | |
ctx, cancel := context.WithCancel(context.Background()) | |
defer cancel() | |
limiter := rate.NewLimiter(rate.Limit(maxRequestsPerSecond), burst) | |
var files []string | |
batchCounter := 0 | |
for i := 1; i <= totalFiles; i++ { | |
files = append(files, fmt.Sprintf("%d", i)) | |
if i%increment == 0 { | |
// Await tokens, marking this error as fatal; we should not continue if the | |
// rate limiter fails. | |
if err := limiter.Wait(ctx); err != nil { | |
fmt.Printf("Error in rate limiter: %v", err) | |
return | |
} | |
// Check context for errors | |
if ctx.Err() != nil { | |
panic(ctx.Err()) | |
} | |
wait.Add(1) | |
batchCounter++ | |
go func(set []string, counter int) { | |
defer wait.Done() | |
fmt.Printf("Deleting filesset batch %v\n", counter) | |
if len(set) > 100 { | |
fmt.Printf("Blob fileset cannot be larger than 100; got [%d]", len(set)) | |
return | |
} | |
for { | |
select { | |
case <-ctx.Done(): | |
return | |
default: | |
// acquire delete operations | |
body := &bytes.Buffer{} | |
writer := multipart.NewWriter(body) | |
hc, err := google.DefaultClient(ctx) | |
if err != nil { | |
fmt.Printf("could not create default client: %v", err) | |
return | |
} | |
hc.Transport = wrapped{hc.Transport, writer} | |
storageClient, err := storage.NewClient(ctx, | |
option.WithHTTPClient(hc), | |
option.WithTokenSource(oauth2.StaticTokenSource(&oauth2.Token{ | |
AccessToken: "foo", | |
Expiry: time.Now().Add(24 * time.Hour), | |
}))) | |
if err != nil { | |
fmt.Printf("error creating storage.NewClient: %v\n", err) | |
return | |
} | |
defer storageClient.Close() | |
bkt := storageClient.Bucket(*bucketName) | |
for _, filename := range set { | |
obj := bkt.Object(filename) | |
err := obj.Delete(ctx) | |
if err != nil { | |
fmt.Printf("error fake delete %v\n", err) | |
return | |
} | |
// objectAttrsToUpdate := storage.ObjectAttrsToUpdate{ | |
// Metadata: map[string]string{ | |
// "k1": "v1", | |
// }, | |
// } | |
// if _, err := obj.Update(ctx, objectAttrsToUpdate); err != nil { | |
// fmt.Printf("error updating local handler: %v\n", err) | |
// return | |
// } | |
} | |
// issue batch | |
c, _, err := transport.NewHTTPClient(ctx) | |
if err != nil { | |
fmt.Printf("could not create batch client %v", err) | |
return | |
} | |
writer.Close() | |
resp, err := c.Post("https://storage.googleapis.com/batch/storage/v1", "multipart/mixed; boundary="+writer.Boundary(), body) | |
if err != nil { | |
fmt.Printf("could not post storage batch request %v", err) | |
return | |
} | |
fmt.Printf("batch [%d], Response code: %d\n", counter, resp.StatusCode) | |
if resp.StatusCode != http.StatusOK { | |
fmt.Printf("error deleting files in batch: {%d}, with fileset [%v]", counter, set) | |
resp.Write(os.Stdout) | |
return | |
// or cancel all | |
//cancel() | |
} | |
// attempt to extract all responses | |
// adapted from https://github.com/jfcote87/google-api-go-client/blob/master/batch/batch.go | |
cType, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type")) | |
if err != nil { | |
fmt.Printf("could not parse response content media type %v", err) | |
return | |
} | |
if !strings.HasPrefix(cType, "multipart/") { | |
fmt.Printf("BatchApi: Invalid Content Type returned %s", cType) | |
return | |
} | |
rq := make([]*Request, increment) | |
for i := 0; i < increment; i++ { | |
rq[i] = &Request{} | |
} | |
results, err := processBody(ctx, resp.Body, params["boundary"], rq) | |
if err != nil { | |
fmt.Printf("BatchApi: could not parse bodyin batch {%d}, with fileset [%v] with error %v", counter, set, err) | |
return | |
} | |
for _, v := range results { | |
e, ok := v.Err.(*googleapi.Error) | |
if ok { | |
fmt.Printf("Error status code %v\n", e.Message) | |
} else { | |
// i don't know how to get the response body, if any item responds with | |
// the following only account for the DELETE request. TODO: figure out why PATCH doesn't work | |
// and modify the criteria for acceptence for the response code | |
// if v.ServerResponse.HTTPStatusCode != http.StatusNoContent { | |
// // somehow get the filename back... | |
// fmt.Printf("Unable to delete file in batch {%d}, with fileset [%v]", counter, set) | |
// } | |
} | |
} | |
return | |
} | |
} | |
}(files, batchCounter) | |
files = []string{} | |
} | |
} | |
wait.Wait() | |
} | |
func processBody(ctx context.Context, rc io.Reader, boundary string, requests []*Request) (results []Response, err error) { | |
results = make([]Response, 0, len(requests)) | |
mr := multipart.NewReader(rc, boundary) | |
// Http response from batch Do is supposed to be in the same order as results. | |
for idx, req := range requests { | |
// Check ct | |
select { | |
case <-ctx.Done(): | |
return nil, ctx.Err() | |
default: | |
} | |
pr, err := mr.NextPart() | |
if err != nil { | |
return nil, fmt.Errorf("Unable to parse multipart response(%d): %v", idx, err) | |
} | |
results = append(results, parseMimePart(pr, req)) | |
} | |
return | |
} | |
// parseMimePart returns a Response from a mimepart. See documentation at | |
// https://cloud.google.com/storage/docs/json_api/v1/how-tos/batch#response-to-a-batch-request | |
func parseMimePart(pr *multipart.Part, rq *Request) Response { | |
defer pr.Close() | |
var err error | |
var res *http.Response | |
if pr.Header.Get("Content-Type") != "application/http" { | |
err = fmt.Errorf("Batch Api: Invalid Content Type: %s", pr.Header.Get("Content-Type")) | |
} else if res, err = http.ReadResponse(bufio.NewReader(pr), nil); err == nil { | |
defer res.Body.Close() | |
err = googleapi.CheckResponse(res) | |
} | |
if err != nil { | |
return ErrorResponse(rq, err) | |
} | |
result := NewResponse(res.Body, rq) | |
result.ServerResponse = | |
googleapi.ServerResponse{ | |
HTTPStatusCode: res.StatusCode, | |
Header: res.Header, | |
} | |
return result | |
} | |
// NewResponse creates a response from a stream | |
func NewResponse(r io.Reader, rq *Request) Response { | |
var ix interface{} | |
var err error | |
if rq.resultPtr != nil { | |
// Set result to a nil pointer to prevent casting errors on return | |
ix = reflect.ValueOf(rq.resultPtr).Elem().Interface() | |
if r != nil { | |
err = json.NewDecoder(r).Decode(rq.resultPtr) | |
} else { | |
err = errors.New("No JSON data returned") | |
} | |
//JSON Decode successful | |
ix = reflect.ValueOf(rq.resultPtr).Elem().Interface() | |
} | |
return Response{ | |
Result: ix, | |
Err: err, | |
Tag: rq.tag, | |
} | |
} | |
// ErrorResponse creates a response with the passed error | |
func ErrorResponse(rq *Request, err error) Response { | |
var ix interface{} | |
if rq.resultPtr != nil { | |
// Set result to a nil pointer to prevent casting errors on return | |
ix = reflect.ValueOf(rq.resultPtr).Elem().Interface() | |
} | |
return Response{Result: ix, Err: err, Tag: rq.tag} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment