Created
March 9, 2020 23:43
-
-
Save xtreme-sameer-vohra/af89ff91fb91557b4a539c5bea5e0898 to your computer and use it in GitHub Desktop.
baggageclaim stress util
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"archive/tar" | |
"flag" | |
"os" | |
"path/filepath" | |
"sync" | |
"time" | |
"bytes" | |
"encoding/json" | |
"fmt" | |
"github.com/klauspost/compress/zstd" | |
"io" | |
"io/ioutil" | |
"log" | |
"net/http" | |
) | |
type VolumeRequest struct { | |
Handle string `json:"handle"` | |
Strategy *json.RawMessage `json:"strategy"` | |
Properties map[string]string `json:"properties"` | |
Privileged bool `json:"privileged,omitempty"` | |
} | |
func encStrategy(strategy map[string]string) *json.RawMessage { | |
bytes, err := json.Marshal(strategy) | |
if err != nil { | |
panic("encStrategy failed") | |
} | |
msg := json.RawMessage(bytes) | |
return &msg | |
} | |
func writeToVolume(handle string) { | |
tgzBuffer := new(bytes.Buffer) | |
zstdWriter, err := zstd.NewWriter(tgzBuffer) | |
if err != nil { | |
fmt.Println("OMGGG zstd :", err) | |
} | |
tarWriter := tar.NewWriter(zstdWriter) | |
fileInfo ,err := os.Stat(*largeFile) | |
if err != nil { | |
fmt.Println("OMGGG os stat :", err) | |
} | |
err = tarWriter.WriteHeader(&tar.Header{ | |
Name: "some-file", | |
Mode: 0600, | |
Size: int64(fileInfo.Size()), | |
}) | |
file, err := os.Open(*largeFile) | |
if err != nil { | |
fmt.Println("OMGGG opening file :", err) | |
} | |
fmt.Println(handle," :compressing") | |
n, err := io.Copy(tarWriter,file) | |
if err != nil { | |
fmt.Println("OMGGG copying :", err) | |
} | |
if n != int64(fileInfo.Size()) { | |
fmt.Printf("OMGGG didnt copy file size :%v, copied: %v\n", int64(fileInfo.Size()), n) | |
} | |
tarWriter.Close() | |
zstdWriter.Close() | |
request, _ := http.NewRequest("PUT", fmt.Sprintf("%s/volumes/%s/stream-in?path=%s",*baggageclaimAddress ,handle, "dest-path"), tgzBuffer) | |
request.Header.Set("Content-Encoding", "zstd") | |
fmt.Println(handle," :streaming to baggageclaim") | |
res, err := http.DefaultClient.Do(request) | |
if err != nil { | |
fmt.Println("OMGGG streamin data:", err) | |
} | |
defer res.Body.Close() | |
bodyBytes, err := ioutil.ReadAll(res.Body) | |
if err != nil { | |
log.Fatal(err) | |
} | |
bodyString := string(bodyBytes) | |
fmt.Printf("%v: streamed res: %v\n", handle, bodyString) | |
} | |
func createVolume(handle string){ | |
deleteVolumeIfExists(handle) | |
var body io.ReadWriter | |
properties := map[string]string{ | |
"property-name": "property-value", | |
} | |
body = &bytes.Buffer{} | |
_ = json.NewEncoder(body).Encode(VolumeRequest{ | |
Handle: handle, | |
Strategy: encStrategy(map[string]string{ | |
"type": "empty", | |
}), | |
Properties: properties, | |
}) | |
request, _ := http.NewRequest("POST", *baggageclaimAddress + "/volumes", body) | |
res, err := http.DefaultClient.Do(request) | |
if err != nil { | |
fmt.Println("OMGGG creating vols :", err) | |
} | |
if res.StatusCode != http.StatusCreated { | |
fmt.Println("OMGGG creating vols statusCode :", res.StatusCode) | |
} | |
defer res.Body.Close() | |
bodyBytes, err := ioutil.ReadAll(res.Body) | |
if err != nil { | |
log.Fatal(err) | |
} | |
bodyString := string(bodyBytes) | |
fmt.Printf("%v createdVolume resp: %v\n",handle, bodyString) | |
} | |
func readFromVol(handle string){ | |
request, _ := http.NewRequest("PUT", fmt.Sprintf(*baggageclaimAddress + "/volumes/%s/stream-out?path=%s", handle, "dest-path"), nil) | |
request.Header.Set("Accept-Encoding", string("zstd")) | |
unpackedDir := filepath.Join(os.TempDir(), "unpacked-dir") | |
err := os.MkdirAll(unpackedDir, os.ModePerm) | |
if err != nil { | |
log.Fatal(err) | |
} | |
defer os.RemoveAll(unpackedDir) | |
res, err := http.DefaultClient.Do(request) | |
if err != nil { | |
fmt.Println("OMGGG reading from vol :", err) | |
} | |
defer res.Body.Close() | |
n, err := io.Copy(ioutil.Discard, res.Body) | |
if err != nil { | |
fmt.Println("OMGGG reading from vol :", err) | |
} | |
fmt.Printf("%v: completed stream IN read: %v bytes \n", handle, n) | |
} | |
func deleteVolumeIfExists(handle string){ | |
req, err := http.NewRequest("GET", fmt.Sprintf("%s/volumes/%s",*baggageclaimAddress, handle), nil) | |
if err != nil { | |
fmt.Println("OMGGG deleting vol req :", err) | |
} | |
res, err := http.DefaultClient.Do(req) | |
if res.StatusCode == http.StatusOK { | |
req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/volumes/%s",*baggageclaimAddress, handle), nil) | |
if err != nil { | |
fmt.Println("OMGGG deleting vol req :", err) | |
} | |
res, err := http.DefaultClient.Do(req) | |
if err != nil { | |
fmt.Println("OMGGG deleting vol :", err) | |
} | |
if res.StatusCode != http.StatusNoContent { | |
fmt.Println("OMGGG deleting vol statusCode :", res.StatusCode) | |
} | |
} | |
} | |
func setupReadTest(){ | |
createVolume(readVol) | |
writeToVolume(readVol) | |
} | |
func runWriteTest(wg *sync.WaitGroup){ | |
for i:=0; i< *numVols; i++ { | |
handle := fmt.Sprintf("%v-%v", *base, i) | |
go func(){ | |
createVolume(handle) | |
writeToVolume(handle) | |
wg.Done() | |
}() | |
time.Sleep(time.Millisecond * 100) | |
} | |
} | |
func runReadTest(wg *sync.WaitGroup){ | |
for i:=0; i< *numVols; i++ { | |
go func(){ | |
readFromVol(readVol) | |
wg.Done() | |
}() | |
time.Sleep(time.Millisecond * 100) | |
} | |
} | |
var baggageclaimAddress = flag.String("b", "http://localhost:7788", "baggageclaim server address") | |
var base = flag.String("v", "foo", "volume name") | |
var largeFile = flag.String("f", "", "path to file that should be compressed and sent to baggageclaim. Generate large file of 200MB [dd if=/dev/urandom of=/tmp/moop count=409600]") | |
var numVols = flag.Int("n", 10, "number of volumes to create") | |
var readVol = "some-volume-to-read" | |
func main (){ | |
flag.Parse() | |
if *base == "" { | |
fmt.Println("Missing Volume Name. Usage: bgstress -b=some-base-name") | |
os.Exit(2) | |
} | |
if *largeFile == "" { | |
fmt.Println("Missing File Path that should be streamed to baggageclaim. Usage: bgstress -l=/some-path-to-file") | |
os.Exit(2) | |
} | |
setupReadTest() | |
wg := sync.WaitGroup{} | |
wg.Add(*numVols * 2) | |
go runWriteTest(&wg) | |
go runReadTest(&wg) | |
time.Sleep(time.Second* 1) | |
wg.Wait() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment