Skip to content

Instantly share code, notes, and snippets.

@maurorappa
Last active October 10, 2024 07:59
Show Gist options
  • Save maurorappa/a117698264c4120e8f0e37e967baf207 to your computer and use it in GitHub Desktop.
Save maurorappa/a117698264c4120e8f0e37e967baf207 to your computer and use it in GitHub Desktop.
workload simulator for VM overprovisioning testing
package main
import (
"bufio"
"bytes"
"flag"
"fmt"
"github.com/BurntSushi/toml"
"github.com/kavehmz/prime"
"io/ioutil"
"log"
"math/rand"
"net/http"
"os"
"runtime/debug"
"strconv"
"strings"
"sync"
"time"
)
var (
config Bot
verbose bool
id string
stats map[string]int64
masterUrl string
m sync.Mutex
)
type (
Bot struct {
ReportPeriod time.Duration
Workloads []workload
}
workload struct {
Instances int
Size int
Period time.Duration
Detail string
}
)
func init() {
rand.Seed(time.Now().UTC().UnixNano())
stats = make(map[string]int64)
}
func callMaster(server string, action string) (output string) {
res, err := http.Get("http://" + server + "/" + action)
if verbose {
//fmt.Println("http://" + server + "/" + action)
}
var remote []byte
if err != nil {
log.Println(err)
output = ""
} else {
remote, err = ioutil.ReadAll(res.Body)
res.Body.Close()
}
return string(remote)
}
func main() {
flag.BoolVar(&verbose, "v", false, "verbose")
minutes := flag.Int("t", -1, "total running time in minutes")
flag.Parse()
log.Println("Bot running ...")
masterUrl = os.Getenv("MASTER")
if masterUrl == "" {
masterUrl = "localhost:8080"
//log.Fatal("Master URL not defined")
}
id = callMaster(masterUrl, "unique")
fmt.Printf("Bot id is: %s\n", id)
if id == "" {
log.Fatal("Cannot connect to Master")
}
rawConfig := callMaster(masterUrl, "config")
_, err := toml.Decode(rawConfig, &config)
if err != nil {
log.Fatal("failed to decode config: ", err)
}
tickerStat := time.NewTicker(config.ReportPeriod)
go func() {
fmt.Println("starting thread for periodic stats reporting")
time.Sleep(2 * time.Minute)
for {
select {
case <-tickerStat.C:
m.Lock()
url := "stats/" + id + "/" + strconv.FormatInt(stats["cpu"], 10) + "+" + strconv.FormatInt(stats["mem"], 10) + "+" + strconv.FormatInt(stats["disk"], 10)
m.Unlock()
ack := callMaster(masterUrl, url)
if verbose {
fmt.Println(ack)
}
}
}
}()
startJobs()
if *minutes != -1 {
if verbose {
fmt.Printf("this client will run for %d minutes\n", *minutes )
}
time.Sleep(time.Duration(*minutes) * time.Minute)
} else {
if verbose {
fmt.Printf("this client will run forever....\n" )
}
select {}
}
fmt.Printf("Quitting....bye!\n" )
}
func startJobs() {
for _, j := range config.Workloads {
if j.Detail == "cpu" {
r := rand.Intn(15)
time.Sleep(time.Duration(r) * time.Second)
tickerCpu := time.NewTicker(j.Period)
go func(size int) {
fmt.Println("Start CPU thread")
for {
select {
case <-tickerCpu.C:
runCpuJob(uint64(size))
}
}
}(j.Size)
}
if j.Detail == "net" {
r := rand.Intn(15)
time.Sleep(time.Duration(r) * time.Millisecond)
tickerNet := time.NewTicker(j.Period)
go func(size int) {
fmt.Println("Start NET thread")
for {
select {
case <-tickerNet.C:
runNetJob(size)
}
}
}(j.Size)
}
if j.Detail == "mem" {
r := rand.Intn(15)
time.Sleep(time.Duration(r) * time.Second)
tickerMem := time.NewTicker(j.Period)
go func(size int) {
fmt.Println("Start MEM thread")
for {
select {
case <-tickerMem.C:
runMemJob(size)
}
}
}(j.Size)
}
if j.Detail == "disk" {
r := rand.Intn(15)
time.Sleep(time.Duration(r) * time.Second)
tickerDisk := time.NewTicker(j.Period)
go func(size int) {
fmt.Println("Start DISK thread")
for {
select {
case <-tickerDisk.C:
runDiskJob(size)
}
}
}(j.Size)
}
}
}
func runCpuJob(limit uint64) {
if verbose {
fmt.Println("Cpu job")
}
start := time.Now()
_ = prime.Primes(limit)
//p := prime.Primes(limit)
//fmt.Println("Number of primes:", len(p))
duration := time.Since(start)
m.Lock()
stats["cpu"] = duration.Milliseconds()
m.Unlock()
}
func runNetJob(size int) {
garbage := strings.Repeat("beeks", size)
//start := time.Now()
url := "number/" + id + "/" + garbage
_ = callMaster(masterUrl, url)
//duration := time.Since(start)
//stats["net"] = duration.Milliseconds()
}
func runMemJob(size int) {
var buf bytes.Buffer
size = size * 1024 * 1024
if verbose {
fmt.Println("Mem job")
}
start := time.Now()
buf.Grow(size)
for i := 0; i < size; i++ {
_ = buf.WriteByte(97)
}
duration := time.Since(start)
m.Lock()
stats["mem"] = duration.Milliseconds()
m.Unlock()
time.Sleep(time.Duration(10) * time.Second)
buf.Reset()
debug.FreeOSMemory()
}
func runDiskJob(size int) {
tmpFile := "bigfile.txt"
if verbose {
fmt.Println("Disk job")
}
size = size * 4096
garbage := strings.Repeat("beeks", size)
err := os.WriteFile(tmpFile, []byte(garbage), 0644)
if err != nil {
fmt.Println(err)
}
start := time.Now()
f, err := os.Open(tmpFile)
if err != nil {
fmt.Println(err)
}
scanner := bufio.NewScanner(f)
for scanner.Scan() {
scanner.Text()
}
f.Close()
time.Sleep(time.Duration(5) * time.Second)
duration := time.Since(start)
err = os.Remove(tmpFile)
if err != nil {
fmt.Println(err)
}
m.Lock()
stats["disk"] = duration.Milliseconds()
m.Unlock()
}
module simulator
go 1.19
require (
github.com/BurntSushi/toml v1.2.1 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/kavehmz/prime v1.0.0 // indirect
github.com/smallnest/ringbuffer v0.0.0-20210227121335-0a58434b36f2 // indirect
)
MasterUrl = "http://localhost:8080"
ReportPeriod = "30s"
[[Workloads]]
Instances = 1
Size = 100000000
Period = "30s"
Detail = "cpu"
[[Workloads]]
Instances = 1
Size = 100
Period = "1s"
Detail = "net"
[[Workloads]]
Instances = 1
Size = 1000
Period = "1m0s"
Detail = "mem"
[[Workloads]]
Instances = 1
Size = 1
Period = "1m0s"
Detail = "disk"
package main
import (
"bytes"
"flag"
"fmt"
"github.com/gorilla/mux"
"log"
"math/rand"
"net/http"
"os"
// "strconv"
"sync"
"strings"
"time"
"github.com/google/uuid"
"github.com/smallnest/ringbuffer"
"github.com/BurntSushi/toml"
)
var (
listenPort string
access = ringbuffer.New(4100) //37 chars is UID + 'id=' newline = 41 per client
config Bot
verbose bool
output string
configFile = "server.cfg"
f *os.File
clientList []string
m sync.Mutex
)
type (
Bot struct {
ReportPeriod time.Duration
Workloads []workload
}
workload struct {
Instances int
Size int
Period time.Duration
Detail string
}
)
func init() {
rand.Seed(time.Now().UTC().UnixNano())
listenPort = os.Getenv("PORT")
if listenPort == "" {
listenPort = "8080"
}
if !checkCfg() {
fmt.Println("configuration problem")
os.Exit(0)
}
}
func checkCfg() (proceed bool) {
proceed = false
_, notfound := os.Stat(configFile)
if notfound == nil {
cfg, err := toml.DecodeFile(configFile, &config)
if err != nil {
fmt.Println(configFile, " cannot be decoded")
} else {
if len(cfg.Keys()) == 0 {
fmt.Println(configFile, " is empty")
} else {
proceed = true
}
}
} else {
fmt.Println(configFile, " not found")
}
return proceed
}
func generateConfig() {
//main cfg
interval, _ := time.ParseDuration("30s")
config.ReportPeriod = interval
// tests
var tests []workload
interval, _ = time.ParseDuration("30s")
cpu := workload{1, 100000000, interval, "cpu"}
tests = append(tests, cpu)
interval, _ = time.ParseDuration("1s")
net := workload{1, 100, interval, "net"}
tests = append(tests, net)
interval, _ = time.ParseDuration("1m")
mem := workload{1, 1000, interval, "mem"}
tests = append(tests, mem)
interval, _ = time.ParseDuration("1m")
disk := workload{1, 1, interval, "disk"}
tests = append(tests, disk)
//finally
config.Workloads = tests
var tempBuffer bytes.Buffer
_ = toml.NewEncoder(&tempBuffer).Encode(config)
os.WriteFile("sample.cfg", tempBuffer.Bytes(),0644)
fmt.Println("sample.cfg created")
}
func home(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Welcome to the Tester Server")
}
func dumpConfig(w http.ResponseWriter, r *http.Request) {
var tempBuffer bytes.Buffer
errore := toml.NewEncoder(&tempBuffer).Encode(config)
if errore != nil {
log.Fatal(errore)
}
fmt.Fprintf(w, "%s", tempBuffer.String())
}
func unique(w http.ResponseWriter, r *http.Request) {
id := uuid.New()
fmt.Fprintf(w, "%s", id.String())
}
func connected(w http.ResponseWriter, r *http.Request) {
m.Lock()
clients := len(uniqueNonEmptyElementsOf(clientList))
fmt.Fprintf(w, "connected clients: %d\n\n", clients)
for _,ip := range clientList {
fmt.Fprintf(w, "%s\n", ip)
}
m.Unlock()
}
func accessLog(w http.ResponseWriter, r *http.Request) {
l := access.Length()
buf := make([]byte, l)
access.Read(buf)
fmt.Println(string(buf))
fmt.Fprintf(w, "%s", string(buf))
}
func number(w http.ResponseWriter, r *http.Request) {
//fmt.Printf("%s %s %d\n", vars["id"], ip, len([]rune(vars["garbage"])))
rnd := rand.Intn(10000)
fmt.Fprintf(w, "%d", rnd)
}
func stats(w http.ResponseWriter, r *http.Request) {
ip := r.RemoteAddr
vars := mux.Vars(r)
fmt.Fprintf(w, "%s", "ACK")
if access.IsFull() {
if verbose {
fmt.Printf("circular buffer full\n")
}
access.Reset()
}
access.Write([]byte(vars["id"] + ":" + vars["stats"] +"\n"))
if output == "csv" {
currentTime := time.Now()
stats:=strings.Split(vars["stats"],"+")
if len(stats) < 3 {
fmt.Printf("Error parsing stats from %s\n", vars["id"])
} else {
entry := fmt.Sprintf("%s,%s,%s,%s,%s,%s\n",currentTime.Format("2006-1-2 15:4:5"),vars["id"],ip,stats[0],stats[1],stats[2])
fmt.Println(entry)
_,err := f.WriteString(entry)
if err != nil {
fmt.Printf("cannot append to stats.csv file!\n")
fmt.Println(err)
}
}
}
m.Lock()
if ! contains(clientList, ip) {
clientList = append(clientList,ip)
}
m.Unlock()
}
func main() {
flag.BoolVar(&verbose, "v", false, "verbose")
flag.StringVar(&output, "o", "", "output: csv")
flag.StringVar(&configFile, "c", "", "config file")
template := flag.Bool("t", false,"generate a sample config file")
flag.Parse()
if verbose {
log.Println("listening to " + listenPort + "...")
}
if *template {
generateConfig()
os.Exit(0)
}
if output == "csv" {
f, _ = os.OpenFile("stats.csv",os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
/*if err != nil {
log.Println(err)
}
*/
defer f.Close()
_,err := f.WriteString("date,id,ip,cpu,mem,disk\n")
if err != nil {
fmt.Printf("cannot create stats.csv file!\n")
}
}
router := mux.NewRouter().StrictSlash(true)
router.HandleFunc("/", home)
router.HandleFunc("/accesslog", accessLog)
router.HandleFunc("/config", dumpConfig)
router.HandleFunc("/unique", unique)
router.HandleFunc("/connected", connected)
router.HandleFunc("/number/{id}/{garbage}", number)
router.HandleFunc("/stats/{id}/{stats}", stats)
log.Fatal(http.ListenAndServe(":"+listenPort, router))
}
func contains(s []string, str string) bool {
for _, v := range s {
if v == str {
return true
}
}
return false
}
func uniqueNonEmptyElementsOf(s []string) []string {
unique := make(map[string]bool, len(s))
us := make([]string, len(unique))
for _, elem := range s {
if len(elem) != 0 {
if !unique[elem] {
us = append(us, elem)
unique[elem] = true
}
}
}
return us
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment