Last active
March 13, 2024 15:52
-
-
Save joshdurbin/235fdd9e8991b3ac8f288ac34850e7a4 to your computer and use it in GitHub Desktop.
Generate random files in GCS
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"cloud.google.com/go/storage" | |
"context" | |
"fmt" | |
"math/rand" | |
"os" | |
"os/signal" | |
"sync" | |
"sync/atomic" | |
"syscall" | |
"time" | |
) | |
const maxCharSizeFile = 32 | |
const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" | |
const workerCount = 250 | |
const queueSize = 100_000 | |
const printGeneratorActivity = false | |
const printWorkerActivity = true | |
const lowProbabilityFileCount = 10_000_000 | |
const lowProbabilityFileCountUpperBoundForRandGenerator = 1_000 | |
const createObjects = true | |
func main() { | |
startTime := time.Now() | |
var totalObjectsCreated int64 | |
ctx, cancel := context.WithCancel(context.Background()) | |
// propagate context cancellation | |
term := make(chan os.Signal, 1) | |
signal.Notify(term, syscall.SIGINT, syscall.SIGTERM) | |
go func() { | |
<-term | |
_, _ = fmt.Fprintf(os.Stderr, "shutting down...\n") | |
cancel() | |
}() | |
// create a storage client | |
client, err := storage.NewClient(ctx) | |
if err != nil { | |
_, _ = fmt.Fprintf(os.Stderr, "error creating storage client: %v\n", err) | |
os.Exit(1) | |
} | |
bucket := client.Bucket("bc-gcs-load-testing") | |
// create a queue and a wait group | |
queue := make(chan string, queueSize) | |
var wg sync.WaitGroup | |
wg.Add(workerCount) | |
// start workers | |
for i := 0; i < workerCount; i++ { | |
go worker(ctx, i+1, queue, &wg, bucket, &totalObjectsCreated) | |
} | |
// start the tree loop | |
treeloop(ctx, queue, 0, 4, "") | |
// close the queue and wait for the workers to finish | |
close(queue) | |
// wait for the workers to finish | |
wg.Wait() | |
// cancel the context | |
cancel() | |
fmt.Printf("created %v objects in %v\n", atomic.LoadInt64(&totalObjectsCreated), time.Since(startTime)) | |
} | |
func treeloop(ctx context.Context, queue chan<- string, depth, maxDepth int, prefix string) { | |
// base condition: Reached the maximum depth, do something with the current prefix | |
if depth == maxDepth { | |
// low probability of creating a large number of files | |
if rand.Intn(lowProbabilityFileCountUpperBoundForRandGenerator) < 1 { | |
for i := 0; i < lowProbabilityFileCount; i++ { | |
path := fmt.Sprintf("%v/%d", prefix, i) | |
select { | |
case queue <- path: | |
if printGeneratorActivity { | |
fmt.Printf("queued object %v\n", path) | |
} | |
case <-ctx.Done(): | |
return | |
} | |
} | |
} | |
// queue objects for the current prefix | |
select { | |
case queue <- prefix: | |
if printGeneratorActivity { | |
fmt.Printf("queued object %v\n", prefix) | |
} | |
return | |
case <-ctx.Done(): | |
return | |
} | |
} | |
for i := 'a'; i <= 'z'; i++ { | |
// recurse with the new prefix | |
if len(prefix) == 0 { | |
treeloop(ctx, queue, depth+1, maxDepth, fmt.Sprintf("%c", i)) | |
} else { | |
treeloop(ctx, queue, depth+1, maxDepth, fmt.Sprintf("%v/%c", prefix, i)) | |
} | |
} | |
} | |
func worker(ctx context.Context, id int, queue <-chan string, wg *sync.WaitGroup, bucket *storage.BucketHandle, totalObjectsCreated *int64) { | |
defer wg.Done() | |
for { | |
select { | |
case msg, ok := <-queue: | |
// if the channel is closed, return | |
if !ok { | |
_, _ = fmt.Fprintf(os.Stderr, "worker %d shutting down due to channel closure\n", id) | |
return | |
} | |
if createObjects { | |
// create a new object | |
writer := bucket.Object(msg).NewWriter(ctx) | |
// write random data to the object | |
b := make([]byte, rand.Intn(maxCharSizeFile)) | |
for i := range b { | |
b[i] = charset[rand.Intn(len(charset))] | |
} | |
// write the data to the object | |
_, err := fmt.Fprintf(writer, string(b), charset) | |
// if there was an error, print it and return | |
if err != nil { | |
_, _ = fmt.Fprintf(os.Stderr, "worker %d encountered error creating object %v: %v\n", id, msg, err) | |
return | |
} | |
// close the writer | |
err = writer.Close() | |
// if there was an error, print it and return | |
if err != nil { | |
_, _ = fmt.Fprintf(os.Stderr, "worker %d encountered error closing writer while creating object %v: %v\n", id, msg, err) | |
return | |
} | |
} | |
// increment the total objects created | |
atomic.AddInt64(totalObjectsCreated, 1) | |
// print that the object was created | |
if printWorkerActivity { | |
fmt.Printf("worker %d created object %v\n", id, msg) | |
} | |
// if the context is done, return | |
case <-ctx.Done(): | |
_, _ = fmt.Fprintf(os.Stderr, "worker %d shutting down due to context cancellation\n", id) | |
return | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment