Created
January 16, 2019 09:34
-
-
Save nullne/8854e64b98f5316078bf92fee00b072e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"context" | |
"crypto/md5" | |
"errors" | |
"flag" | |
"fmt" | |
"io" | |
"math/rand" | |
"net/http" | |
"os" | |
"os/signal" | |
"runtime/debug" | |
"strings" | |
"sync" | |
"syscall" | |
"time" | |
"github.com/golang/glog" | |
minio "github.com/minio/minio-go" | |
"github.com/minio/minio/pkg/sys" | |
"github.com/prometheus/client_golang/prometheus" | |
"github.com/prometheus/client_golang/prometheus/promhttp" | |
_ "net/http/pprof" | |
) | |
var ( | |
fWrite = flag.Int("w", 10, "routine number for writing to one bucket") | |
fRead = flag.Int("r", 10, "routine number for reading") | |
fOnlyRead = flag.Bool("only-read", false, "will write at the begging though") | |
fSize = flag.Int("size", 1024*64, "unit is byte") | |
fMemory = flag.Int("mem", 64, "unit is G") | |
fDrives = flag.Int("drive", 12, "drive numbers in each set") | |
freshRatio = flag.Float64("fresh-ratio", 0.3, "") | |
fWriteTimeout = flag.Duration("write-timeout", time.Second*5, "") | |
fReadTimeout = flag.Duration("read-timeout", time.Second*5, "") | |
fEndpoint = flag.String("endpoint", "minio-rr:9000", "") | |
fLocation = flag.String("location", "Beijing", "") | |
fBuckets = flag.String("buckets", "snoopy", "muliple buckets should be separated by comma") | |
) | |
var ( | |
ctrOperationTimes = prometheus.NewCounterVec( | |
prometheus.CounterOpts{ | |
Namespace: "minio", | |
Name: "operation_times", | |
Help: "operation times", | |
}, | |
[]string{"type", "success"}, | |
) | |
hgReadDuration = prometheus.NewHistogram( | |
prometheus.HistogramOpts{ | |
Namespace: "minio", | |
Name: "read_duration", | |
Help: "read duration", | |
Buckets: []float64{.001, .003, .005, .1, .5, 1, 2, 3, 4}, | |
}) | |
hgWriteDuration = prometheus.NewHistogram( | |
prometheus.HistogramOpts{ | |
Namespace: "minio", | |
Name: "write_duration", | |
Help: "write duration", | |
Buckets: []float64{.001, .003, .005, .1, .5, 1, 2, 3, 4}, | |
}) | |
) | |
func validFlags() error { | |
glog.V(1).Infof("routine number to write is %d, to read is %d (only read is %v)", *fWrite, *fRead, *fOnlyRead) | |
glog.V(1).Infof("write timeout is %v, read is %v", *fWriteTimeout, *fReadTimeout) | |
glog.V(1).Infof("node memory is %vG, drive numbers is %v, file size is %v, fresh ratios is %0.2f", *fMemory, *fDrives, *fSize, *freshRatio) | |
glog.V(1).Infof("endpoint is %v, location is %v, buckets are %v", *fEndpoint, *fLocation, *fBuckets) | |
return nil | |
} | |
func main() { | |
initPrometheus() | |
buckets := strings.Split(*fBuckets, ",") | |
if err := initBuckets(*fEndpoint, buckets, *fLocation); err != nil { | |
glog.Exit(err) | |
} | |
op := newObjectPool(*fMemory*1024*1024*1024, *fSize, *fDrives, *freshRatio) | |
ctx, cancel := context.WithCancel(context.Background()) | |
var wg sync.WaitGroup | |
wctx, wcancel := context.WithCancel(ctx) | |
for j, bucket := range buckets { | |
for i := 0; i < *fWrite; i++ { | |
wg.Add(1) | |
go writeWorker(wctx, &wg, op, i+j, bucket) | |
} | |
} | |
if *fOnlyRead { | |
op.waitUntilFull(ctx) | |
wcancel() | |
} | |
for i := 0; i < *fRead; i++ { | |
wg.Add(1) | |
go readWorker(ctx, &wg, op, i) | |
} | |
signalCh := make(chan os.Signal, 1) | |
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) | |
{ | |
for { | |
select { | |
case <-signalCh: | |
cancel() | |
glog.Info("will exist by signal, wait all routines finished") | |
wg.Wait() | |
return | |
} | |
} | |
} | |
} | |
type object struct { | |
bucket string | |
key string | |
md5sum string | |
} | |
type objectPool struct { | |
objects []*object | |
lock sync.RWMutex | |
freshSize int | |
maxSize int | |
} | |
func newObjectPool(memory, size, drives int, freshRatio float64) *objectPool { | |
op := objectPool{ | |
// roughly calculated | |
maxSize: memory / (2 * size / drives), | |
} | |
op.freshSize = int(float64(op.maxSize) * freshRatio) | |
return &op | |
} | |
func (o *objectPool) waitUntilFull(ctx context.Context) { | |
ch := make(chan struct{}) | |
go func() { | |
defer close(ch) | |
for { | |
select { | |
case <-ctx.Done(): | |
return | |
case <-time.Tick(5 * time.Second): | |
o.lock.Lock() | |
length := len(o.objects) | |
o.lock.Unlock() | |
glog.V(3).Infof("object pool size cur: %d, max: %d", length, o.maxSize) | |
if length == o.maxSize { | |
return | |
} | |
} | |
} | |
}() | |
<-ch | |
glog.V(1).Info("the object pool is full") | |
} | |
func (o *objectPool) put(bucket, key, md5sum string) { | |
o.lock.Lock() | |
defer o.lock.Unlock() | |
if len(o.objects) < o.maxSize { | |
o.objects = append(o.objects, &object{bucket: bucket, key: key, md5sum: md5sum}) | |
} else { | |
idx := rand.Intn(o.freshSize) | |
o.objects[idx] = &object{bucket: bucket, key: key, md5sum: md5sum} | |
} | |
} | |
func (o *objectPool) get() *object { | |
o.lock.RLock() | |
defer o.lock.RUnlock() | |
if len(o.objects) == 0 { | |
return nil | |
} | |
idx := rand.Intn(len(o.objects)) | |
return o.objects[idx] | |
} | |
func sampleFile(size int) *bytes.Reader { | |
content := make([]byte, size) | |
for i := 0; i < size; i += (size / 1024) { | |
content[i] = byte(rand.Intn(10)) | |
} | |
return bytes.NewReader(content) | |
} | |
func readWorker(ctx context.Context, wg *sync.WaitGroup, op *objectPool, number int) { | |
glog.V(4).Infof("read worker %d starts", number) | |
defer glog.V(4).Infof("read worker %d exits", number) | |
client, err := newClient(*fEndpoint) | |
if err != nil { | |
glog.Errorf("failed to created client", err) | |
} | |
defer wg.Done() | |
for { | |
select { | |
case <-ctx.Done(): | |
return | |
default: | |
} | |
func() { | |
object := op.get() | |
if object == nil { | |
time.Sleep(time.Second) | |
return | |
} | |
h := md5.New() | |
now := time.Now() | |
ctx2, cancel := context.WithTimeout(context.Background(), *fReadTimeout) | |
obj, err := client.GetObjectWithContext(ctx2, object.bucket, object.key, minio.GetObjectOptions{}) | |
if err != nil { | |
ctrOperationTimes.With(prometheus.Labels{"type": "read", "success": "failed"}).Inc() | |
glog.Errorf("get object %s in bucket %s failed: %v", object.key, object.bucket, err) | |
return | |
} | |
defer obj.Close() | |
_, err = io.Copy(h, obj) | |
if err != nil { | |
ctrOperationTimes.With(prometheus.Labels{"type": "read", "success": "failed"}).Inc() | |
glog.Errorf("copy object %s in bucket %s failed: %v", object.key, object.bucket, err) | |
return | |
} | |
cancel() | |
hgReadDuration.Observe(float64(time.Since(now).Nanoseconds()) / float64(time.Second)) | |
if md5sum := fmt.Sprintf("%x", h.Sum(nil)); md5sum != object.md5sum { | |
ctrOperationTimes.With(prometheus.Labels{"type": "read", "success": "failed"}).Inc() | |
glog.Errorf("md5 of object %s in bucket %s mismatch, wanna: %s, got %s", object.key, object.bucket, object.md5sum, md5sum) | |
return | |
} | |
ctrOperationTimes.With(prometheus.Labels{"type": "read", "success": "success"}).Inc() | |
}() | |
} | |
} | |
func writeWorker(ctx context.Context, wg *sync.WaitGroup, op *objectPool, number int, bucket string) { | |
glog.V(4).Infof("write worker %d starts", number) | |
defer glog.V(4).Infof("write worker %d exits", number) | |
defer wg.Done() | |
client, err := newClient(*fEndpoint) | |
if err != nil { | |
glog.Errorf("failed to created client", err) | |
} | |
for { | |
select { | |
case <-ctx.Done(): | |
return | |
default: | |
} | |
id, err := getMediaIdentifier() | |
if err != nil { | |
glog.Errorf("failed to get identifier: %v", err) | |
continue | |
} | |
objectName := fmt.Sprintf("%s/%s/%s", id[0:2], id[2:4], id) | |
reader := sampleFile(*fSize) | |
// md5 | |
h := md5.New() | |
_, err = io.Copy(h, reader) | |
if err != nil { | |
glog.Errorf("failed to calculate md5") | |
continue | |
} | |
md5sum := fmt.Sprintf("%x", h.Sum(nil)) | |
now := time.Now() | |
// upload | |
reader.Seek(0, 0) | |
ctx2, cancel := context.WithTimeout(context.Background(), *fWriteTimeout) | |
size, err := client.PutObjectWithContext(ctx2, bucket, objectName, reader, reader.Size(), minio.PutObjectOptions{}) | |
if err != nil { | |
ctrOperationTimes.With(prometheus.Labels{"type": "write", "success": "failed"}).Inc() | |
glog.Errorf("put object %s in bucket %s failed: %v", objectName, bucket, err) | |
continue | |
} | |
cancel() | |
hgWriteDuration.Observe(float64(time.Since(now).Nanoseconds()) / float64(time.Second)) | |
// upload successful | |
op.put(bucket, objectName, md5sum) | |
ctrOperationTimes.With(prometheus.Labels{"type": "write", "success": "success"}).Inc() | |
glog.V(5).Infof("successfully upload file %s with size %d to %s", objectName, size, bucket) | |
} | |
} | |
func getMediaIdentifier() (string, error) { | |
alphabet := "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567" // Based on RFC 4648 Base32 alphabet | |
imageName := "" | |
bs := make([]byte, 30) | |
_, err := rand.Read(bs) | |
if err != nil { | |
return "", err | |
} | |
for _, b := range bs { | |
imageName += string(alphabet[(int(b) % len(alphabet))]) | |
} | |
return imageName, nil | |
} | |
func initBuckets(endpoint string, buckets []string, location string) error { | |
accessKeyID := os.Getenv("MINIO_ACCESS_KEY") | |
secretAccessKey := os.Getenv("MINIO_SECRET_KEY") | |
if accessKeyID == "" || secretAccessKey == "" { | |
return errors.New("should export MINIO_ACCESS_KEY and MINIO_SECRET_KEY first") | |
} | |
minioClient, err := minio.New(endpoint, accessKeyID, secretAccessKey, false) | |
if err != nil { | |
return err | |
} | |
for _, bucketName := range buckets { | |
err = minioClient.MakeBucket(bucketName, location) | |
if err != nil { | |
// Check to see if we already own this bucket (which happens if you run this twice) | |
exists, err := minioClient.BucketExists(bucketName) | |
if err == nil && exists { | |
glog.Infof("We already own %s", bucketName) | |
} else { | |
return err | |
} | |
} else { | |
glog.Infof("Successfully created %s", bucketName) | |
} | |
} | |
return nil | |
} | |
func newClient(endpoint string) (*minio.Client, error) { | |
accessKeyID := os.Getenv("MINIO_ACCESS_KEY") | |
secretAccessKey := os.Getenv("MINIO_SECRET_KEY") | |
if accessKeyID == "" || secretAccessKey == "" { | |
return nil, errors.New("should export MINIO_ACCESS_KEY and MINIO_SECRET_KEY first") | |
} | |
return minio.New(endpoint, accessKeyID, secretAccessKey, false) | |
} | |
func initClientNBuckets(endpoint string, buckets []string, location string) (*minio.Client, error) { | |
accessKeyID := os.Getenv("MINIO_ACCESS_KEY") | |
secretAccessKey := os.Getenv("MINIO_SECRET_KEY") | |
if accessKeyID == "" || secretAccessKey == "" { | |
return nil, errors.New("should export MINIO_ACCESS_KEY and MINIO_SECRET_KEY first") | |
} | |
minioClient, err := minio.New(endpoint, accessKeyID, secretAccessKey, false) | |
if err != nil { | |
return nil, err | |
} | |
for _, bucketName := range buckets { | |
err = minioClient.MakeBucket(bucketName, location) | |
if err != nil { | |
// Check to see if we already own this bucket (which happens if you run this twice) | |
exists, err := minioClient.BucketExists(bucketName) | |
if err == nil && exists { | |
glog.Infof("We already own %s", bucketName) | |
} else { | |
return nil, err | |
} | |
} else { | |
glog.Infof("Successfully created %s", bucketName) | |
} | |
} | |
return minioClient, nil | |
} | |
func setMaxResources() (err error) { | |
// Set the Go runtime max threads threshold to 90% of kernel setting. | |
// Do not return when an error when encountered since it is not a crucial task. | |
sysMaxThreads, mErr := sys.GetMaxThreads() | |
if mErr == nil { | |
minioMaxThreads := (sysMaxThreads * 90) / 100 | |
// Only set max threads if it is greater than the default one | |
if minioMaxThreads > 10000 { | |
debug.SetMaxThreads(minioMaxThreads) | |
} | |
} | |
var maxLimit uint64 | |
// Set open files limit to maximum. | |
if _, maxLimit, err = sys.GetMaxOpenFileLimit(); err != nil { | |
return err | |
} | |
if err = sys.SetMaxOpenFileLimit(maxLimit, maxLimit); err != nil { | |
return err | |
} | |
// Set max memory limit as current memory limit. | |
if _, maxLimit, err = sys.GetMaxMemoryLimit(); err != nil { | |
return err | |
} | |
err = sys.SetMaxMemoryLimit(maxLimit, maxLimit) | |
return err | |
} | |
func initPrometheus() { | |
prometheus.MustRegister(ctrOperationTimes) | |
prometheus.MustRegister(hgReadDuration) | |
prometheus.MustRegister(hgWriteDuration) | |
http.Handle("/metrics", promhttp.Handler()) | |
go http.ListenAndServe(":2112", nil) | |
} | |
func init() { | |
flag.Parse() | |
if err := validFlags(); err != nil { | |
glog.Fatal(err) | |
} | |
fmt.Println(*fOnlyRead) | |
rand.Seed(int64(time.Now().Nanosecond())) | |
if err := setMaxResources(); err != nil { | |
glog.Warningf("failed to set max resources", err) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment