Created
September 13, 2016 00:28
-
-
Save natdm/958c989b4c738afbf8ab11760171ad09 to your computer and use it in GitHub Desktop.
Playing with file concurrency
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
/* | |
This isn't meant to be the idiomatic way to read files and copy them - this is a learning exercise | |
for myself. I wanted to be able to read multiple chunks of a file and write them concurrently to a | |
new file | |
I don't really handle all errors with log.Fatalln. It's just done here in the interest of keeping the focus on concurrency. | |
Example output: | |
2016/09/12 16:25:18 running on 4 CPUs | |
2016/09/12 16:25:18 Read at position 0 | |
2016/09/12 16:25:18 Read at position 10000 | |
2016/09/12 16:25:18 Read at position 20000 | |
2016/09/12 16:25:18 Read at position 30000 | |
2016/09/12 16:25:18 Wrote at position 40000 | |
2016/09/12 16:25:18 Wrote at position 20000 | |
2016/09/12 16:25:18 Wrote at position 0 | |
2016/09/12 16:25:18 Read at position 40000 | |
2016/09/12 16:25:18 Wrote at position 10000 | |
2016/09/12 16:25:18 Read at position 50000 | |
2016/09/12 16:25:18 Read at position 60000 | |
2016/09/12 16:25:18 Read at position 70000 | |
2016/09/12 16:25:18 Wrote at position 30000 | |
... | |
2016/09/12 16:25:18 56.969711ms | |
You can see the read is done sequentially, but the write is done at first chance. Not in order. | |
*/ | |
import ( | |
"io" | |
"log" | |
"os" | |
"runtime" | |
"sync" | |
"time" | |
"flag" | |
) | |
/* | |
Program steps: | |
1. Set max CPU | |
2. Open the "in" file via os.Open (does NOT read the file) | |
3. Create an "out" file | |
4. Range over the "in" file and concurrently send bit slices to a channel | |
(I believe this is a generator) | |
5. Each time a slice of bits is sent, spawn a new thread to write the sent bits to the new file | |
6. Wait until all bytes are read and close the main func. | |
*/ | |
func main() { | |
flg := getFlags() | |
procs := runtime.NumCPU() | |
runtime.GOMAXPROCS(procs) | |
log.Printf("running on %v CPUs\n", procs) | |
start := time.Now() | |
in, err := os.Open(flg["in"]) | |
if err != nil { | |
log.Fatalln(err.Error()) | |
} | |
defer in.Close() | |
out, err := os.Create(flg["out"]) | |
if err != nil { | |
log.Fatalln(err.Error()) | |
} | |
defer out.Close() | |
var wg sync.WaitGroup | |
for b := range read(in) { | |
wg.Add(1) | |
go func(b partial) { | |
defer wg.Done() | |
_, err := out.WriteAt(b.data, b.off) | |
if err != nil { | |
log.Fatalln(err.Error()) | |
} | |
log.Printf("Wrote at position %v\n", b.off) | |
}(b) | |
} | |
wg.Wait() | |
log.Println("Done in", time.Now().Sub(start)) | |
} | |
//partial is a part of a file | |
type partial struct { | |
data []byte //the data bytes of a file | |
off int64 //The offset of where the data is located in the file | |
} | |
//read accepts a reader and sends bits of that file, via partials, in to a read | |
// only channel of partials. | |
func read(f io.Reader) <-chan partial { | |
out := make(chan partial, 5) | |
go func() { | |
defer close(out) | |
var marker int64 | |
inc := 10000 | |
for { | |
p := make([]byte, inc) | |
_, err := f.Read(p) | |
if err != nil { | |
if err == io.EOF { | |
return | |
} | |
log.Fatalln(err.Error()) | |
} | |
out <- partial{p, marker} | |
log.Printf("Read at position %v\n", marker) | |
marker += int64(inc) | |
} | |
}() | |
return out | |
} | |
//getFlags is an external function just to keep the logic focused on concurrency and not flag parsing. | |
func getFlags() map[string]string { | |
inDef := "" | |
outDef := "out" | |
in := flag.String("in", inDef, "file to be copied") | |
out := flag.String("out", outDef, "name of file to create (it's wise to use the same extension)") | |
flag.Parse() | |
if *in == inDef { | |
log.Fatalln("Need an original file (use the --in flag)") | |
} | |
if *out == outDef { | |
log.Println("No out file specified. Sending to 'out' with no extension.") | |
} | |
m := make(map[string]string) | |
m["in"] = *in | |
m["out"] = *out | |
return m | |
} |
What is your error? Race condition?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
thank you for teach go code.
i have a problem when i follow your code with file size ~3 GB. sometime it error.
but if not use concurrent it's don't have a error.
sorry for my english.