Skip to content

Instantly share code, notes, and snippets.

@picaso
Last active August 22, 2024 20:46
Show Gist options
  • Save picaso/c94c564314d24c87adeb4abf6e2d6162 to your computer and use it in GitHub Desktop.
Save picaso/c94c564314d24c87adeb4abf6e2d6162 to your computer and use it in GitHub Desktop.
package main
import (
"crypto/md5"
"encoding/hex"
"io/fs"
"log"
"os"
"path/filepath"
"sync"
"time"
)
type files []string
type pipelineConfig struct {
poolSize int
}
func (f files) fileByFile() files {
defer timeTrack(time.Now(), "Line by line")
for _, file := range f {
readAndEncode(file)
}
return f
}
func (f files) chunkedWithoutChannels(wg *sync.WaitGroup) files {
defer timeTrack(time.Now(), "Chunked goroutine without channels")
chunks := chunkBy(f, 10)
for _, chunk := range chunks {
wg.Add(1)
go func() {
defer wg.Done()
for _, file := range chunk {
readAndEncode(file)
}
}()
}
wg.Wait()
return f
}
func (f files) boundedPool(wg *sync.WaitGroup, config *pipelineConfig) files {
defer timeTrack(time.Now(), "buffered channels")
jobs := make(chan string)
result := make(chan string)
spawnExecutorPool(wg, jobs, result, config)
for _, path := range f {
jobs <- path
}
close(jobs)
wg.Wait()
close(result)
return f
}
func spawnExecutorPool(wg *sync.WaitGroup, jobs chan string, result chan string, config *pipelineConfig) {
for i := 0; i < config.poolSize; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for fPath := range jobs {
sum := readAndEncode(fPath)
result <- sum
}
}()
go func() {
for res := range result {
log.Println(res)
}
}()
}
}
func main() {
var wg sync.WaitGroup
config := &pipelineConfig{10}
dir := "resources/"
var paths files
err := filepath.Walk(dir, func(path string, info fs.FileInfo, err error) error {
if !info.IsDir() {
paths = append(paths, path)
}
return nil
})
if err != nil {
log.Fatal(err)
}
paths.boundedPool(&wg, config).chunkedWithoutChannels(&wg).fileByFile()
}
func readAndEncode(file string) string {
content, err := os.ReadFile(file)
if err != nil {
log.Fatal(err)
}
sum := md5.Sum(content)
return hex.EncodeToString(sum[:])
}
func timeTrack(start time.Time, name string) {
elapsed := time.Since(start)
log.Printf("%s took %s", name, elapsed)
}
func chunkBy[T any](items []T, chunkSize int) (chunks [][]T) {
for chunkSize < len(items) {
items, chunks = items[chunkSize:], append(chunks, items[:chunkSize])
}
return append(chunks, items)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment