Skip to content

Instantly share code, notes, and snippets.

@xtreme-sameer-vohra
Created March 9, 2020 23:43
Show Gist options
  • Save xtreme-sameer-vohra/af89ff91fb91557b4a539c5bea5e0898 to your computer and use it in GitHub Desktop.
Save xtreme-sameer-vohra/af89ff91fb91557b4a539c5bea5e0898 to your computer and use it in GitHub Desktop.
baggageclaim stress util
package main
import (
"archive/tar"
"flag"
"os"
"path/filepath"
"sync"
"time"
"bytes"
"encoding/json"
"fmt"
"github.com/klauspost/compress/zstd"
"io"
"io/ioutil"
"log"
"net/http"
)
type VolumeRequest struct {
Handle string `json:"handle"`
Strategy *json.RawMessage `json:"strategy"`
Properties map[string]string `json:"properties"`
Privileged bool `json:"privileged,omitempty"`
}
func encStrategy(strategy map[string]string) *json.RawMessage {
bytes, err := json.Marshal(strategy)
if err != nil {
panic("encStrategy failed")
}
msg := json.RawMessage(bytes)
return &msg
}
func writeToVolume(handle string) {
tgzBuffer := new(bytes.Buffer)
zstdWriter, err := zstd.NewWriter(tgzBuffer)
if err != nil {
fmt.Println("OMGGG zstd :", err)
}
tarWriter := tar.NewWriter(zstdWriter)
fileInfo ,err := os.Stat(*largeFile)
if err != nil {
fmt.Println("OMGGG os stat :", err)
}
err = tarWriter.WriteHeader(&tar.Header{
Name: "some-file",
Mode: 0600,
Size: int64(fileInfo.Size()),
})
file, err := os.Open(*largeFile)
if err != nil {
fmt.Println("OMGGG opening file :", err)
}
fmt.Println(handle," :compressing")
n, err := io.Copy(tarWriter,file)
if err != nil {
fmt.Println("OMGGG copying :", err)
}
if n != int64(fileInfo.Size()) {
fmt.Printf("OMGGG didnt copy file size :%v, copied: %v\n", int64(fileInfo.Size()), n)
}
tarWriter.Close()
zstdWriter.Close()
request, _ := http.NewRequest("PUT", fmt.Sprintf("%s/volumes/%s/stream-in?path=%s",*baggageclaimAddress ,handle, "dest-path"), tgzBuffer)
request.Header.Set("Content-Encoding", "zstd")
fmt.Println(handle," :streaming to baggageclaim")
res, err := http.DefaultClient.Do(request)
if err != nil {
fmt.Println("OMGGG streamin data:", err)
}
defer res.Body.Close()
bodyBytes, err := ioutil.ReadAll(res.Body)
if err != nil {
log.Fatal(err)
}
bodyString := string(bodyBytes)
fmt.Printf("%v: streamed res: %v\n", handle, bodyString)
}
func createVolume(handle string){
deleteVolumeIfExists(handle)
var body io.ReadWriter
properties := map[string]string{
"property-name": "property-value",
}
body = &bytes.Buffer{}
_ = json.NewEncoder(body).Encode(VolumeRequest{
Handle: handle,
Strategy: encStrategy(map[string]string{
"type": "empty",
}),
Properties: properties,
})
request, _ := http.NewRequest("POST", *baggageclaimAddress + "/volumes", body)
res, err := http.DefaultClient.Do(request)
if err != nil {
fmt.Println("OMGGG creating vols :", err)
}
if res.StatusCode != http.StatusCreated {
fmt.Println("OMGGG creating vols statusCode :", res.StatusCode)
}
defer res.Body.Close()
bodyBytes, err := ioutil.ReadAll(res.Body)
if err != nil {
log.Fatal(err)
}
bodyString := string(bodyBytes)
fmt.Printf("%v createdVolume resp: %v\n",handle, bodyString)
}
func readFromVol(handle string){
request, _ := http.NewRequest("PUT", fmt.Sprintf(*baggageclaimAddress + "/volumes/%s/stream-out?path=%s", handle, "dest-path"), nil)
request.Header.Set("Accept-Encoding", string("zstd"))
unpackedDir := filepath.Join(os.TempDir(), "unpacked-dir")
err := os.MkdirAll(unpackedDir, os.ModePerm)
if err != nil {
log.Fatal(err)
}
defer os.RemoveAll(unpackedDir)
res, err := http.DefaultClient.Do(request)
if err != nil {
fmt.Println("OMGGG reading from vol :", err)
}
defer res.Body.Close()
n, err := io.Copy(ioutil.Discard, res.Body)
if err != nil {
fmt.Println("OMGGG reading from vol :", err)
}
fmt.Printf("%v: completed stream IN read: %v bytes \n", handle, n)
}
func deleteVolumeIfExists(handle string){
req, err := http.NewRequest("GET", fmt.Sprintf("%s/volumes/%s",*baggageclaimAddress, handle), nil)
if err != nil {
fmt.Println("OMGGG deleting vol req :", err)
}
res, err := http.DefaultClient.Do(req)
if res.StatusCode == http.StatusOK {
req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/volumes/%s",*baggageclaimAddress, handle), nil)
if err != nil {
fmt.Println("OMGGG deleting vol req :", err)
}
res, err := http.DefaultClient.Do(req)
if err != nil {
fmt.Println("OMGGG deleting vol :", err)
}
if res.StatusCode != http.StatusNoContent {
fmt.Println("OMGGG deleting vol statusCode :", res.StatusCode)
}
}
}
func setupReadTest(){
createVolume(readVol)
writeToVolume(readVol)
}
func runWriteTest(wg *sync.WaitGroup){
for i:=0; i< *numVols; i++ {
handle := fmt.Sprintf("%v-%v", *base, i)
go func(){
createVolume(handle)
writeToVolume(handle)
wg.Done()
}()
time.Sleep(time.Millisecond * 100)
}
}
func runReadTest(wg *sync.WaitGroup){
for i:=0; i< *numVols; i++ {
go func(){
readFromVol(readVol)
wg.Done()
}()
time.Sleep(time.Millisecond * 100)
}
}
var baggageclaimAddress = flag.String("b", "http://localhost:7788", "baggageclaim server address")
var base = flag.String("v", "foo", "volume name")
var largeFile = flag.String("f", "", "path to file that should be compressed and sent to baggageclaim. Generate large file of 200MB [dd if=/dev/urandom of=/tmp/moop count=409600]")
var numVols = flag.Int("n", 10, "number of volumes to create")
var readVol = "some-volume-to-read"
func main (){
flag.Parse()
if *base == "" {
fmt.Println("Missing Volume Name. Usage: bgstress -b=some-base-name")
os.Exit(2)
}
if *largeFile == "" {
fmt.Println("Missing File Path that should be streamed to baggageclaim. Usage: bgstress -l=/some-path-to-file")
os.Exit(2)
}
setupReadTest()
wg := sync.WaitGroup{}
wg.Add(*numVols * 2)
go runWriteTest(&wg)
go runReadTest(&wg)
time.Sleep(time.Second* 1)
wg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment