Skip to content

Instantly share code, notes, and snippets.

@natdm
Created September 13, 2016 00:28
Show Gist options
  • Save natdm/958c989b4c738afbf8ab11760171ad09 to your computer and use it in GitHub Desktop.
Save natdm/958c989b4c738afbf8ab11760171ad09 to your computer and use it in GitHub Desktop.
Playing with file concurrency
package main
/*
This isn't meant to be the idiomatic way to read files and copy them - this is a learning exercise
for myself. I wanted to be able to read multiple chunks of a file and write them concurrently to a
new file
I don't really handle all errors with log.Fatalln. It's just done here in the interest of keeping the focus on concurrency.
Example output:
2016/09/12 16:25:18 running on 4 CPUs
2016/09/12 16:25:18 Read at position 0
2016/09/12 16:25:18 Read at position 10000
2016/09/12 16:25:18 Read at position 20000
2016/09/12 16:25:18 Read at position 30000
2016/09/12 16:25:18 Wrote at position 40000
2016/09/12 16:25:18 Wrote at position 20000
2016/09/12 16:25:18 Wrote at position 0
2016/09/12 16:25:18 Read at position 40000
2016/09/12 16:25:18 Wrote at position 10000
2016/09/12 16:25:18 Read at position 50000
2016/09/12 16:25:18 Read at position 60000
2016/09/12 16:25:18 Read at position 70000
2016/09/12 16:25:18 Wrote at position 30000
...
2016/09/12 16:25:18 56.969711ms
You can see the read is done sequentially, but the write is done at first chance. Not in order.
*/
import (
"io"
"log"
"os"
"runtime"
"sync"
"time"
"flag"
)
/*
Program steps:
1. Set max CPU
2. Open the "in" file via os.Open (does NOT read the file)
3. Create an "out" file
4. Range over the "in" file and concurrently send bit slices to a channel
(I believe this is a generator)
5. Each time a slice of bits is sent, spawn a new thread to write the sent bits to the new file
6. Wait until all bytes are read and close the main func.
*/
func main() {
flg := getFlags()
procs := runtime.NumCPU()
runtime.GOMAXPROCS(procs)
log.Printf("running on %v CPUs\n", procs)
start := time.Now()
in, err := os.Open(flg["in"])
if err != nil {
log.Fatalln(err.Error())
}
defer in.Close()
out, err := os.Create(flg["out"])
if err != nil {
log.Fatalln(err.Error())
}
defer out.Close()
var wg sync.WaitGroup
for b := range read(in) {
wg.Add(1)
go func(b partial) {
defer wg.Done()
_, err := out.WriteAt(b.data, b.off)
if err != nil {
log.Fatalln(err.Error())
}
log.Printf("Wrote at position %v\n", b.off)
}(b)
}
wg.Wait()
log.Println("Done in", time.Now().Sub(start))
}
//partial is a part of a file
type partial struct {
data []byte //the data bytes of a file
off int64 //The offset of where the data is located in the file
}
//read accepts a reader and sends bits of that file, via partials, in to a read
// only channel of partials.
func read(f io.Reader) <-chan partial {
out := make(chan partial, 5)
go func() {
defer close(out)
var marker int64
inc := 10000
for {
p := make([]byte, inc)
_, err := f.Read(p)
if err != nil {
if err == io.EOF {
return
}
log.Fatalln(err.Error())
}
out <- partial{p, marker}
log.Printf("Read at position %v\n", marker)
marker += int64(inc)
}
}()
return out
}
//getFlags is an external function just to keep the logic focused on concurrency and not flag parsing.
func getFlags() map[string]string {
inDef := ""
outDef := "out"
in := flag.String("in", inDef, "file to be copied")
out := flag.String("out", outDef, "name of file to create (it's wise to use the same extension)")
flag.Parse()
if *in == inDef {
log.Fatalln("Need an original file (use the --in flag)")
}
if *out == outDef {
log.Println("No out file specified. Sending to 'out' with no extension.")
}
m := make(map[string]string)
m["in"] = *in
m["out"] = *out
return m
}
@chonlatee
Copy link

thank you for teach go code.

i have a problem when i follow your code with file size ~3 GB. sometime it error.

but if not use concurrent it's don't have a error.

sorry for my english.

@natdm
Copy link
Author

natdm commented Mar 23, 2017

What is your error? Race condition?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment