Skip to content

Instantly share code, notes, and snippets.

@yujp
Last active November 13, 2019 14:51
Show Gist options
  • Select an option

  • Save yujp/3c47e34720b1d5814fdb9a2ca950bb09 to your computer and use it in GitHub Desktop.

Select an option

Save yujp/3c47e34720b1d5814fdb9a2ca950bb09 to your computer and use it in GitHub Desktop.
package main
// copyright 2019 yu
// example of pararell uploading on http
import (
"fmt"
"io"
"log"
"math"
"net/http"
"os"
"sync"
"time"
"github.com/google/uuid"
)
const (
exitSuccess = 0
exitFailure = 1
)
const endPoint = "https://..."
func main() {
if len(os.Args) < 2 {
fmt.Printf("Usage %s <file>\n", os.Args[0])
os.Exit(exitFailure)
}
filePath := os.Args[1]
fin, err := os.Open(filePath)
if err != nil {
fmt.Printf("failed to open the file: %s\n", filePath)
os.Exit(exitFailure)
}
finfo, err := fin.Stat()
if err != nil {
fmt.Printf("failed to get the fileinfo: %v\n", err)
os.Exit(exitFailure)
}
// chunkSize := int64(5 * 1024 * 1024)
// if err := upload(fin, finfo.Size(), chunkSize); err != nil {
// fmt.Printf("failed to upload the file: %v\n", err)
// os.Exit(exitFailure)
// }
chunkSize := int64(1024 * 1024)
maxProcs := 10
if err := uploadInParallel(fin, finfo.Size(), chunkSize, maxProcs); err != nil {
fmt.Printf("failed to upload the file: %v\n", err)
os.Exit(exitFailure)
}
os.Exit(exitSuccess)
}
func genUUID() (string, error) {
u, err := uuid.NewRandom()
if err != nil {
return "", err
}
return u.String(), nil
}
func upload(r io.ReaderAt, length, chunkSize int64) error {
nbChunks := int64(math.Ceil(float64(length) / float64(chunkSize)))
sessionID, err := genUUID()
if err != nil {
return err
}
min := func(a, b int64) int64 {
if a < b {
return a
}
return b
}
for i := int64(0); i < nbChunks; i++ {
start := i * chunkSize
end := min(length, (i+1)*chunkSize) - 1
sr := io.NewSectionReader(r, int64(start), chunkSize)
req, err := http.NewRequest(http.MethodPut, endPoint, sr)
if err != nil {
return err
}
req.Header.Set("Session-Id", sessionID)
req.Header.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, length))
req.Header.Set("Content-Type", "text/plain; charset=UTF-8")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
resp.Body.Close()
}
return nil
}
func uploadInParallel(reader io.ReaderAt, length, chunkSize int64, maxProcs int) error {
sessionID, err := genUUID()
if err != nil {
return err
}
nbChunks := int64(math.Ceil(float64(length / chunkSize)))
type Result struct {
ProcessIndex int64
Error error
StatusCode int
}
sender := func(result chan<- Result) {
min := func(a, b int64) int64 {
if a < b {
return a
}
return b
}
sem := make(chan struct{}, maxProcs)
wg := &sync.WaitGroup{}
defer func() {
wg.Wait()
close(result)
}()
for i := int64(0); i < nbChunks; i++ {
wg.Add(1)
sem <- struct{}{} // ensure
go func(procIndex int64) {
defer func() {
wg.Done()
<-sem // release
}()
log.Printf("Info: proc(%d) start\n", procIndex)
start := procIndex * chunkSize
end := min(length, (procIndex+1)*chunkSize) - 1
sr := io.NewSectionReader(reader, int64(start), int64(end))
req, err := http.NewRequest(http.MethodPut, endPoint, sr)
if err != nil {
result <- Result{procIndex, err, 0}
return
}
req.Header.Set("Session-Id", sessionID)
req.Header.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, length))
req.Header.Set("Content-Type", "text/plain; charset=UTF-8")
resp, err := http.DefaultClient.Do(req)
if err != nil {
result <- Result{procIndex, err, 0}
return
}
resp.Body.Close()
result <- Result{procIndex, nil, resp.StatusCode}
}(i)
}
}
receiver := func(done chan<- bool, results <-chan Result) {
for {
select {
case r := <-results:
if r.Error != nil {
log.Printf("%v", r.Error)
done <- false // finished with error
return
}
if r.StatusCode == 0 {
done <- true // finished with no error
return
}
log.Printf("info: Result(%d): %d\n", r.ProcessIndex, r.StatusCode)
case <-time.After(1 * time.Second):
//fmt.Println("timed out")
}
}
}
done := make(chan bool)
results := make(chan Result, 3)
go sender(results)
go receiver(done, results)
<-done
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment