Created
February 7, 2018 14:03
-
-
Save raedatoui/a941051afae411593da13ebb46b81116 to your computer and use it in GitHub Desktop.
goroutines to read directory of files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"io/ioutil" | |
"log" | |
"net/url" | |
"os" | |
"sort" | |
"strings" | |
"sync" | |
sshot "github.com/slotix/pageres-go-wrapper" | |
"mvdan.cc/xurls" | |
) | |
func difference(slice1 []string, slice2 []string) []string { | |
var diff []string | |
// Loop two times, first to find slice1 strings not in slice2, | |
// second loop to find slice2 strings not in slice1 | |
for i := 0; i < 2; i++ { | |
for _, s1 := range slice1 { | |
found := false | |
for _, s2 := range slice2 { | |
if s1 == s2 { | |
found = true | |
break | |
} | |
} | |
// String not found. We add it to return slice | |
if !found { | |
diff = append(diff, s1) | |
} | |
} | |
// Swap the slices, only if it was the first loop | |
if i == 0 { | |
slice1, slice2 = slice2, slice1 | |
} | |
} | |
return diff | |
} | |
// Ints returns a unique subset of the int slice provided. | |
func unique(input []string) []string { | |
u := make([]string, 0, len(input)) | |
m := make(map[string]bool) | |
for _, val := range input { | |
if _, ok := m[val]; !ok { | |
m[val] = true | |
u = append(u, val) | |
} | |
} | |
return u | |
} | |
func mapItems(vs []string, f func(string) string) []string { | |
vsm := make([]string, len(vs)) | |
for i, v := range vs { | |
vsm[i] = f(v) | |
} | |
return vsm | |
} | |
func prettyPrint(i string) string { | |
return i + "\n" | |
} | |
type listProcessor func([]string) []string | |
type urlParser func(string) (string, error) | |
func passthruParse(i string) (string, error) { | |
return i, nil | |
} | |
func validateURL(i string) (string, error) { | |
_, err := url.ParseRequestURI(i) | |
if err != nil { | |
if err.Error() == fmt.Sprintf("parse %s: invalid URI for request", i) { | |
ff := strings.ToLower("http://" + i) | |
_, err := url.ParseRequestURI(ff) | |
if err != nil { | |
return "", err | |
} | |
if strings.HasSuffix(ff, "/") { | |
fmt.Println(ff) | |
} | |
return ff, nil | |
} | |
} | |
return i, nil | |
} | |
func wgProcess(input []string) []string { | |
var wg sync.WaitGroup | |
wg.Add(len(input)) | |
output := []string{} | |
parsed := make(chan string) | |
for _, i := range input { | |
go func(i string) { | |
defer wg.Done() | |
parsed <- i | |
}(i) | |
} | |
go func() { | |
for o := range parsed { | |
output = append(output, o) | |
} | |
}() | |
wg.Wait() | |
return output | |
} | |
func chanProcess(input []string) []string { | |
output := []string{} | |
queue := make(chan string, 1) | |
// concurrently produce some data | |
for i := 0; i < len(input); i++ { | |
go func(i int) { | |
queue <- string(i) | |
}(i) | |
} | |
remaining := len(input) | |
for t := range queue { | |
// This loop blocks until a new item is available in the channel. | |
// You can grow your slice here, but must also care to close the | |
// channel, when you decide that you obtained enough data. | |
output = append(output, t) | |
if remaining--; remaining == 0 { | |
close(queue) // do not forget to close the channel | |
} | |
} | |
return output | |
} | |
func loopProcess(input []string) []string { | |
output := []string{} | |
for _, f := range input { | |
output = append(output, f) | |
} | |
return output | |
} | |
func compareProcesses(input []string) { | |
processors := make(map[string]listProcessor) | |
results := make(map[string][]string) | |
processors["wg"] = wgProcess | |
processors["chan"] = chanProcess | |
processors["loop"] = loopProcess | |
for k, p := range processors { | |
results[k] = p(input) | |
} | |
} | |
func readFile(filename string, parser urlParser) []string { | |
b, err := ioutil.ReadFile(filename) | |
if err != nil { | |
fmt.Print(err) | |
} | |
str := string(b) // convert content to a 'string' | |
extracted := xurls.Relaxed().FindAllString(str, -1) | |
// strict := xurls.Strict().FindAllString(str, -1) | |
var wg sync.WaitGroup | |
wg.Add(len(extracted)) | |
output := []string{} | |
parsed := make(chan string) | |
for _, i := range extracted { | |
go func(i string) { | |
defer wg.Done() | |
o, err := parser(i) | |
if err == nil { | |
parsed <- o | |
} | |
}(i) | |
} | |
go func() { | |
for o := range parsed { | |
output = append(output, o) | |
} | |
}() | |
wg.Wait() | |
return output | |
} | |
func readFiles() []string { | |
files, err := ioutil.ReadDir("Notes") | |
if err != nil { | |
log.Fatal(err) | |
} | |
var wg sync.WaitGroup | |
wg.Add(len(files)) | |
parsed := make(chan []string) | |
urls := []string{} | |
for _, file := range files { | |
go func(f os.FileInfo) { | |
defer wg.Done() | |
parsed <- readFile("Notes/"+f.Name(), validateURL) | |
}(file) | |
} | |
go func() { | |
for o := range parsed { | |
urls = append(urls, o...) | |
} | |
}() | |
wg.Wait() | |
urls = unique(urls) | |
sort.Strings(urls) | |
return urls | |
} | |
func captureSite(urls []string) { | |
shotsDir := "shots" | |
os.Mkdir(shotsDir, 0777) | |
params := sshot.Parameters{ | |
Command: "pageres", | |
Sizes: "1024x768", | |
Crop: "--crop", | |
Scale: "--scale 0.9", | |
Timeout: "--timeout 30", | |
Filename: fmt.Sprintf("--filename=%s/<%%= url %%>", shotsDir), | |
UserAgent: "", | |
} | |
sshot.GetShots(urls, params) | |
sshot.DeleteZeroLengthFiles(shotsDir) | |
} | |
func check(e error) { | |
if e != nil { | |
panic(e) | |
} | |
} | |
func main() { | |
// captureSite([]string{ | |
// "https://www.onceuponachef.com/recipes/summer-corn-soup-with-fresh-herbs.html", | |
// }) | |
urls := readFiles() | |
fmt.Printf("Parsed: %d\n", len(urls)) | |
var divided [][]string | |
numCPU := 30 | |
total := len(urls) | |
chunkSize := (total + numCPU - 1) / numCPU | |
for i := 0; i < total; i += chunkSize { | |
end := i + chunkSize | |
if end > total { | |
end = total | |
} | |
divided = append(divided, urls[i:end]) | |
} | |
var wg sync.WaitGroup | |
wg.Add(len(urls)) | |
for k, v := range divided { | |
go func(i int, v []string) { | |
defer wg.Done() | |
f, err := os.Create(fmt.Sprintf("./parsed/dat%d.txt", i)) | |
check(err) | |
defer f.Close() | |
n3, err := f.WriteString(strings.Join(v, "\n")) | |
fmt.Printf("%d wrote %d bytes\n", i, n3) | |
f.Sync() | |
}(k, v) | |
} | |
// go func() { | |
// for u := range checked { | |
// validUrls = append(validUrls, u) | |
// } | |
// }() | |
wg.Wait() | |
// fmt.Printf("%v", mapItems(validUrls, prettyPrint)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment