Last active
June 27, 2018 09:55
-
-
Save nfroidure/2030aa0f709a0a3995d5a36c0318dd4a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"fmt" | |
"log" | |
"math" | |
"os" | |
"path/filepath" | |
"strconv" | |
"strings" | |
"sync" | |
"time" | |
) | |
type FilesMap struct { | |
daMap map[string]*os.File | |
mux sync.RWMutex | |
} | |
type Message struct { | |
serial string | |
t time.Time | |
data string | |
signal float64 | |
} | |
func readMessage(text string) (Message, error) { | |
fields := strings.Split(text, ",") | |
timestamp, err := strconv.ParseUint(fields[1], 10, 64) | |
t := time.Unix(int64(timestamp), 0).UTC() | |
if err != nil { | |
return Message{}, err | |
} | |
signal, err := strconv.ParseFloat(fields[3], 64) | |
if err != nil { | |
return Message{}, err | |
} | |
return Message{serial: fields[0], t: t, data: fields[2], signal: signal}, nil | |
} | |
func ensureFileIsOpen(filesMap *FilesMap, path string) error { | |
(*filesMap).mux.RLock() | |
_, entryExists := (*filesMap).daMap[path] | |
(*filesMap).mux.RUnlock() | |
if entryExists { | |
return nil | |
} | |
err := os.MkdirAll(path, 0744) | |
if err != nil { | |
return err | |
} | |
file, err := os.OpenFile(path+"/data", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) | |
if err != nil { | |
return err | |
} | |
(*filesMap).mux.Lock() | |
mapIsHeavy := len((*filesMap).daMap) > 4000 | |
if mapIsHeavy { | |
closeFiles(filesMap) | |
} | |
(*filesMap).daMap[path] = file | |
(*filesMap).mux.Unlock() | |
return nil | |
} | |
func closeFiles(filesMap *FilesMap) error { | |
for k, v := range (*filesMap).daMap { | |
err := v.Close() | |
delete((*filesMap).daMap, k) | |
if err != nil { | |
return err | |
} | |
} | |
return nil | |
} | |
func processFile(filesMap *FilesMap, path string) error { | |
file, err := os.Open(path) | |
if err != nil { | |
return err | |
} | |
defer file.Close() | |
scanner := bufio.NewScanner(file) | |
for scanner.Scan() { | |
line := scanner.Text() | |
if "" == line { | |
continue | |
} | |
msg, err := readMessage(line) | |
if err != nil { | |
return err | |
} | |
path := fmt.Sprintf( | |
"./%02d/%02d/%02d/%02d", | |
msg.t.Year(), | |
msg.t.Month(), | |
msg.t.Day(), | |
msg.t.Hour(), | |
) | |
err = ensureFileIsOpen(filesMap, path) | |
if err != nil { | |
return err | |
} | |
(*filesMap).mux.RLock() | |
fmt.Fprintf((*filesMap).daMap[path], "%s\n", line) | |
(*filesMap).mux.RUnlock() | |
} | |
if err := scanner.Err(); err != nil { | |
return err | |
} | |
return nil | |
} | |
func main() { | |
filesMap := FilesMap{ | |
daMap: make(map[string]*os.File), | |
} | |
paths := make([]string, 0) | |
root := "./backup" | |
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { | |
if err != nil { | |
return err | |
} | |
if root == path { | |
return nil | |
} | |
paths = append(paths, path) | |
return nil | |
}) | |
var jobs sync.WaitGroup | |
totalSize := len(paths) | |
batchSize := 400 | |
for i := int(math.Floor(float64((totalSize)/batchSize))) + 1; i > 0; i-- { | |
for k := int(math.Min(float64(batchSize*i), float64(totalSize))) - 1; k >= batchSize*(i-1); k-- { | |
jobs.Add(1) | |
go func(i int) { | |
fmt.Printf("Processing: %s # %d (%d/%d)\n", paths[k], i, k, totalSize) | |
err = processFile(&filesMap, paths[k]) | |
if err != nil { | |
fmt.Println(fmt.Errorf("error during my impressive processing : %v", err)) | |
} | |
jobs.Done() | |
}(i) | |
jobs.Wait() | |
} | |
} | |
closeFiles(&filesMap) | |
if err != nil { | |
fmt.Printf("Got error: %s\n", err) | |
log.Fatal(err) | |
panic(err) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
go run bin/explode.go 92.12s user 94.08s system 99% cpu 3:06.50 total
with go routinesgo run bin/explode.go 99.40s user 114.79s system 99% cpu 3:34.78 total
without