Skip to content

Instantly share code, notes, and snippets.

@miku
Last active May 1, 2019 15:42
Show Gist options
  • Save miku/ecfa6e78b0c314a5d541 to your computer and use it in GitHub Desktop.
Save miku/ecfa6e78b0c314a5d541 to your computer and use it in GitHub Desktop.
test.nt
test.ldj
// +build OMIT
package main
import (
"crypto/md5"
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
"sync"
)
// walkFiles starts a goroutine to walk the directory tree at root and send the
// path of each regular file on the string channel. It sends the result of the
// walk on the error channel. If done is closed, walkFiles abandons its work.
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
paths := make(chan string)
errc := make(chan error, 1)
go func() { // HL
// Close the paths channel after Walk returns.
defer close(paths) // HL
// No select needed for this send, since errc is buffered.
errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error { // HL
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
select {
case paths <- path: // HL
case <-done: // HL
return errors.New("walk canceled")
}
return nil
})
}()
return paths, errc
}
// A result is the product of reading and summing a file using MD5.
type result struct {
path string
sum [md5.Size]byte
err error
}
// digester reads path names from paths and sends digests of the corresponding
// files on c until either paths or done is closed.
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
for path := range paths { // HLpaths
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
return
}
}
}
// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error. In that case,
// MD5All does not wait for inflight read operations to complete.
func MD5All(root string) (map[string][md5.Size]byte, error) {
// MD5All closes the done channel when it returns; it may do so before
// receiving all the values from c and errc.
done := make(chan struct{})
defer close(done)
paths, errc := walkFiles(done, root)
// Start a fixed number of goroutines to read and digest files.
c := make(chan result) // HLc
var wg sync.WaitGroup
const numDigesters = 20
wg.Add(numDigesters)
for i := 0; i < numDigesters; i++ {
go func() {
digester(done, paths, c) // HLc
wg.Done()
}()
}
go func() {
wg.Wait()
close(c) // HLc
}()
// End of pipeline. OMIT
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
// Check whether the Walk failed.
if err := <-errc; err != nil { // HLerrc
return nil, err
}
return m, nil
}
func main() {
// Calculate the MD5 sum of all files under the specified directory,
// then print the results sorted by path name.
dir := "."
if len(os.Args) >= 2 {
dir = os.Args[1]
}
m, err := MD5All(dir)
if err != nil {
fmt.Println(err)
return
}
var paths []string
for path := range m {
paths = append(paths, path)
}
sort.Strings(paths)
for _, path := range paths {
fmt.Printf("%x %s\n", m[path], path)
}
}
package main
import (
"fmt"
)
func main() {
ch1 := make(chan int)
go pump(ch1)
fmt.Println(<-ch1)
}
// example of a generator
func pump(ch chan int) {
for i := 0; ; i++ {
ch <- i
}
}
package main
import (
"fmt"
)
type Reply struct{}
type Request struct {
a, b int
replyc chan int
}
type binOp func(a, b int) int
func run(op binOp, req *Request) {
req.replyc <- op(req.a, req.b)
}
func server(op binOp, service chan *Request) {
for {
req := <-service
go run(op, req)
}
}
func startServer(op binOp) chan *Request {
reqChan := make(chan *Request)
go server(op, reqChan)
return reqChan
}
func main() {
adder := startServer(func(a, b int) int { return a + b })
const N = 100
var reqs [N]Request
for i := 0; i < N; i++ {
req := &reqs[i]
req.a = i
req.b = i + N
req.replyc = make(chan int)
adder <- req
}
for i := N - 1; i >= 0; i-- {
if <-reqs[i].replyc != N+2*i {
fmt.Println("fail at", i)
} else {
fmt.Println("request", i, "is ok")
}
}
fmt.Println("done")
}
package main
// In main()
// About to sleep in main()
// Beginning longWait()
// Beginning shortWait()
// Exiting shortWait()
// Exiting longWait()
// End main()
import (
"fmt"
"time"
)
func main() {
fmt.Println("In main()")
go longWait()
go shortWait()
fmt.Println("About to sleep in main()")
time.Sleep(10 * 1e9)
fmt.Println("End main()")
}
func longWait() {
fmt.Println("Beginning longWait()")
time.Sleep(5 * 1e9)
fmt.Println("Exiting longWait()")
}
func shortWait() {
fmt.Println("Beginning shortWait()")
time.Sleep(2 * 1e9)
fmt.Println("Exiting shortWait()")
}
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan string)
// remove one or both "go" keywords and get a
// fatal error: all goroutines are asleep - deadlock!
go sendData(ch)
go getData(ch)
time.Sleep(1e9) // time.Sleep(1e5)
fmt.Println()
}
func sendData(ch chan string) {
ch <- "Washington"
ch <- "Tripoli"
ch <- "London"
ch <- "Beijing"
ch <- "Tokio"
}
func getData(ch chan string) {
var input string
for {
input = <-ch
fmt.Printf("%s ", input)
}
}
package main
import (
"fmt"
"runtime"
"time"
)
func main() {
runtime.GOMAXPROCS(4)
ch1 := make(chan int)
ch2 := make(chan int)
go pump1(ch1)
go pump2(ch2)
go suck(ch1, ch2)
time.Sleep(1e9)
}
func pump1(ch chan int) {
for i := 0; ; i++ {
ch <- i * 2
}
}
func pump2(ch chan int) {
for i := 0; ; i++ {
ch <- i + 5
}
}
func suck(ch1, ch2 chan int) {
for {
select {
case v := <-ch1:
fmt.Println("on line 1", v)
case v := <-ch2:
fmt.Println("on line 2", v)
}
}
}
// test various marc access patterns
package main
import (
"crypto/sha512"
"errors"
"fmt"
"io"
"log"
"os"
"runtime"
"strconv"
"sync"
// "time"
)
// recordLength returns the length of the marc record as stored in the leader
func recordLength(reader io.Reader) (length int64, err error) {
var l int
data := make([]byte, 24)
n, err := reader.Read(data)
if err != nil {
return 0, err
} else {
if n != 24 {
errs := fmt.Sprintf("MARC21: invalid leader: expected 24 bytes, read %d", n)
err = errors.New(errs)
} else {
l, err = strconv.Atoi(string(data[0:5]))
if err != nil {
errs := fmt.Sprintf("MARC21: invalid record length: %s", err)
err = errors.New(errs)
}
}
}
return int64(l), err
}
func Worker(queue chan *[]byte, out chan *string, wg *sync.WaitGroup) {
defer wg.Done()
for br := range queue {
result := fmt.Sprintf("%x", sha512.Sum512(*br))
out <- &result
}
}
func Collector(in chan *string, done chan bool) {
for sr := range in {
fmt.Println(*sr)
}
done <- true
}
// func parseRecord(buffer *[]byte) string {
// // fmt.Printf("%x\n", sha512.Sum512(*buffer))
// // return sha512.Sum512(*buffer)
// return fmt.Sprintf("%x", sha512.Sum512(*buffer))
// }
// RecordCount count the number of records in marc file
func RecordCount(filename string) int64 {
// runtime.GOMAXPROCS(runtime.NumCPU())
runtime.GOMAXPROCS(1)
handle, err := os.Open(filename)
if err != nil {
log.Fatalf("%s\n", err)
}
defer func() {
if err := handle.Close(); err != nil {
log.Fatalf("%s\n", err)
}
}()
var i, cumulative int64
// queue := make(chan *[]byte)
// results := make(chan *string)
// done := make(chan bool)
// go Collector(results, done)
// var wg sync.WaitGroup
// for i := 0; i < runtime.NumCPU(); i++ {
// wg.Add(1)
// go Worker(queue, results, &wg)
// }
for {
length, err := recordLength(handle)
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("%s\n", err)
}
i += 1
cumulative += length
handle.Seek(cumulative, 0)
// buf := make([]byte, length-24)
// _, err = handle.Read(buf)
// if err != nil {
// log.Fatalln(err)
// }
// queue <- &buf
}
// close(queue)
// wg.Wait()
// close(results)
// select {
// case <-time.After(1e9):
// break
// case <-done:
// break
// }
return i
}
func main() {
fmt.Println(RecordCount(os.Args[1]))
}
package main
import (
"crypto/md5"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
)
func MD5All(root string) (map[string][md5.Size]byte, error) {
m := make(map[string][md5.Size]byte)
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
data, err := ioutil.ReadFile(path)
if err != nil {
return err
}
m[path] = md5.Sum(data)
return nil
})
if err != nil {
return nil, err
}
return m, nil
}
func main() {
dir := "."
if len(os.Args) >= 2 {
dir = os.Args[1]
}
m, err := MD5All(dir)
if err != nil {
fmt.Println(err)
return
}
var paths []string
for path := range m {
paths = append(paths, path)
}
sort.Strings(paths)
for _, path := range paths {
fmt.Printf("%x %s\n", m[path], path)
}
}
package main
// The MD5All implementation in parallel.go starts a new goroutine for each file.
// In a directory with many large files, this may allocate more memory than is available on the machine.
import (
"crypto/md5"
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
"sync"
)
type result struct {
path string
sum [md5.Size]byte
err error
}
func MD5All(root string) (map[string][md5.Size]byte, error) {
// MD5All closes the done channel when it returns; it may do so before
// receiving all the values from c and errc
done := make(chan struct{})
defer close(done)
c, errc := sumFiles(done, root)
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
// For each regular file, start a goroutine that sums the file and sends
// the result on c. Send the result of the walk on errc.
c := make(chan result)
errc := make(chan error, 1)
go func() {
var wg sync.WaitGroup
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
wg.Add(1)
go func() {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
}
wg.Done()
}()
// Abort the walk if done is closed
select {
case <-done:
return errors.New("walk cancelled")
default:
return nil
}
})
// Walk has returned, so all calls to wg.Add are done. Start a
// goroutine to close c once all the sends are done
go func() {
wg.Wait()
close(c)
}()
errc <- err
}()
return c, errc
}
func main() {
dir := "."
if len(os.Args) >= 2 {
dir = os.Args[1]
}
m, err := MD5All(dir)
if err != nil {
fmt.Println(err)
return
}
var paths []string
for path := range m {
paths = append(paths, path)
}
sort.Strings(paths)
for _, path := range paths {
fmt.Printf("%x %s\n", m[path], path)
}
}
package main
import (
"fmt"
"runtime"
"time"
)
func DoPart(sem chan int) {
fmt.Println("Computing...")
time.Sleep(1e9)
sem <- 1
}
func DoAll() {
NCPU := runtime.NumCPU()
sem := make(chan int, NCPU)
for i := 0; i < NCPU; i++ {
go DoPart(sem)
}
for i := 0; i < NCPU; i++ {
<-sem
}
// All done.
}
func main() {
DoAll()
}
package main
import (
"fmt"
"time"
)
func processChannel(in <-chan int, out chan<- string) {
for inValue := range in {
result := fmt.Sprintf("Hello %d", inValue)
out <- result
}
}
func main() {
receiveChan := make(chan string)
sendChan := make(chan int)
go func() {
for value := range receiveChan {
fmt.Println(value)
}
}()
go processChannel(sendChan, receiveChan)
for i := 0; i < 10; i++ {
sendChan <- i
}
close(sendChan)
time.Sleep(1e7)
}
package main
import (
"fmt"
"sync"
// "runtime"
)
// turn int arguments into channel
func gen(done <-chan struct{}, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-done:
return
}
}
}()
return out
}
// take a channel of ints and square them
func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n:
case <-done:
return
}
}
}()
return out
}
func infiniteCounter() <-chan int {
out := make(chan int)
current := 0
go func() {
for {
out <- current
current += 1
}
}()
return out
}
// func main() {
// runtime.GOMAXPROCS(runtime.NumCPU())
// var wg sync.WaitGroup
// counterC := infiniteCounter()
// for i := 0; i < 10000; i++ {
// wg.Add(1)
// go func(id int, counter <-chan int) {
// defer wg.Done()
// for i := 0; i < 10000; i++ {
// value := <-counter
// fmt.Printf("Worker %d: %d\n", id, value)
// }
// }(i, counterC)
// }
// wg.Wait()
// }
// func main() {
// c := gen(2, 3)
// out := sq(c)
// fmt.Println(<-out)
// fmt.Println(<-out)
// }
// func main() {
// // Set up the pipeline and consume the output.
// for n := range sq(sq(gen(2, 3))) {
// fmt.Println(n) // 16 then 81
// }
// }
// func merge(cs ...<-chan int) <-chan int {
// var wg sync.WaitGroup
// out := make(chan int)
// // Start an output goroutine for each input channel in cs.
// output := func(c <-chan int) {
// for n := range c {
// out <- n
// }
// wg.Done()
// }
// wg.Add(len(cs))
// for _, c := range cs {
// go output(c)
// }
// go func() {
// wg.Wait()
// close(out)
// }()
// return out
// }
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
done := make(chan struct{})
defer close(done)
in := gen(done, 5, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(done, in)
c2 := sq(done, in)
// The merge function converts a list of channels to a single channel
// by starting a goroutine for each inbound channel that copies
// the values to the sole outbound channel.
// for n := range merge(c1, c2) {
// fmt.Println(n)
// }
// This is a resource leak: goroutines consume memory and runtime resources,
// and heap references in goroutine stacks keep data from being garbage
// collected. Goroutines are not garbage collected; they must exit on their own.
// out := merge(c1, c2)
// fmt.Println(<-out) // 4 or 9
// panic(nil)
out := merge(done, c1, c2)
fmt.Println(<-out)
panic(nil)
}
package main
type Empty interface{}
type semaphore chan Empty
// acquire n resources
func (s semaphore) P(n int) {
e := new(Empty)
for i := 0; i < n; i++ {
s <- e
}
}
// release n resources
func (s semaphore) V(n int) {
for i := 0; i < n; i++ {
<-s
}
}
func (s semaphore) Lock() {
s.P(1)
}
func (s semaphore) Unlock() {
s.V(1)
}
func (s semaphore) Wait(n int) {
s.P(n)
}
func (s semaphore) Signal() {
s.V(1)
}
func main() {
N := 4
_ = make(semaphore, N)
}
package main
import (
"fmt"
"runtime"
)
func generate(ch chan int) {
for i := 2; ; i++ {
ch <- i
}
}
func filter(in, out chan int, prime int) {
for {
i := <-in
if i%prime != 0 {
out <- i
}
}
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU() * 2)
ch := make(chan int)
go generate(ch)
for {
prime := <-ch
fmt.Print(prime, " ")
ch1 := make(chan int)
go filter(ch, ch1, prime)
ch = ch1
}
}
package main
import (
"fmt"
"runtime"
)
func generate() chan int {
ch := make(chan int)
go func() {
for i := 2; ; i++ {
ch <- i
}
}()
return ch
}
func filter(in chan int, prime int) chan int {
out := make(chan int)
go func() {
for {
if i := <-in; i%prime != 0 {
out <- i
}
}
}()
return out
}
func sieve() chan int {
runtime.GOMAXPROCS(runtime.NumCPU() * 2)
out := make(chan int)
go func() {
ch := generate()
for {
prime := <-ch
ch = filter(ch, prime)
out <- prime
}
}()
return out
}
func main() {
primes := sieve()
for {
fmt.Println(<-primes)
}
}
package main
import (
"fmt"
"runtime"
"strings"
"time"
)
type Task struct {
Name string
}
func process(task *Task) {
task.Name = strings.ToLower(task.Name)
}
func Worker(in, out chan *Task) {
for {
t := <-in
process(t)
out <- t
}
}
func sendWork(out chan *Task) {
for i := 0; i < 1000000; i++ {
out <- &Task{Name: fmt.Sprintf("HELLO %d!", i)}
}
}
func consumeWork(in chan *Task) {
for task := range in {
fmt.Printf("RX: %+v\n", *task)
}
}
func main() {
runtime.GOMAXPROCS(4)
pending, done := make(chan *Task), make(chan *Task)
N := 16
go sendWork(pending) // put tasks with work on the channel
for i := 0; i < N; i++ {
go Worker(pending, done)
}
go consumeWork(done)
time.Sleep(1e9)
}
package main
import (
"fmt"
"time"
)
func main() {
tick := time.Tick(1e8)
boom := time.After(5e8)
for {
select {
case <-tick:
fmt.Println("tick")
case <-boom:
fmt.Println("BOOM")
return
default:
fmt.Println(".")
time.Sleep(5e7)
}
}
}
package main
// no error handling
// 60s to convert 10M lines (1.3G)
// 10min for GND
// 60min for DBP
// 5h+ for Freebase
import (
"bufio"
"encoding/json"
"fmt"
"log"
"os"
"runtime"
)
type Triple struct {
Subject string
Predicate string
Object string
}
type Work struct {
Text string
Options map[string]interface{}
ReplyChan chan *Triple
}
func CollectTriples(triples chan *Triple) {
for {
triple := <-triples
if triple == nil {
break
}
encoded, err := json.Marshal(triple)
if err != nil {
log.Fatalln(err)
}
fmt.Printf("%s\n", encoded)
}
}
// the leaner, the better, since there is only one collector
func CollectStrings(lines chan *[]byte) {
for {
line := <-lines
if line == nil {
break
}
fmt.Printf("%s\n", string(*line))
}
}
// should do all the heavy lifting
func Worker(workChan chan *Work, resultChan chan *[]byte) {
for {
work := <-workChan
triple := Triple{Subject: "triple", Predicate: "length", Object: fmt.Sprintf("%d", len(work.Text))}
encoded, err := json.Marshal(triple)
if err != nil {
log.Fatalln(err)
}
// resultChan <- &triple
resultChan <- &encoded
}
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
filename := "test.nt"
// fmt.Println(os.Args[0])
file, err := os.Open(filename)
defer file.Close()
if err != nil {
log.Fatalln(err)
}
options := make(map[string]interface{})
queueChan := make(chan *Work)
// resultChan := make(chan *Triple)
resultChan := make(chan *[]byte)
for i := 0; i < runtime.NumCPU(); i++ {
go Worker(queueChan, resultChan)
}
go CollectStrings(resultChan)
reader := bufio.NewReader(file)
for {
b, _, err := reader.ReadLine()
if err != nil || b == nil {
break
}
line := string(b)
work := Work{Text: line, Options: options}
queueChan <- &work
}
}
package main
import (
"crypto/sha1"
"encoding/json"
"fmt"
"os"
"runtime"
"sync"
"time"
)
type Work struct {
Id string
Hash string
}
func Worker(id int, queue chan *Work, results chan *string, wg *sync.WaitGroup) {
defer wg.Done()
for work := range queue {
work.Hash = fmt.Sprintf("%x", sha1.Sum([]byte(work.Id)))
b, err := json.Marshal(work)
if err != nil {
fmt.Fprintln(os.Stderr, err)
continue
}
line := string(b)
results <- &line
}
}
func Collector(results chan *string, done chan bool) {
for line := range results {
fmt.Println(*line)
}
done <- true
}
func main() {
numWorkers := 4 // runtime.NumCPU()
runtime.GOMAXPROCS(numWorkers)
// runtime.GOMAXPROCS(1)
queue := make(chan *Work)
results := make(chan *string)
done := make(chan bool)
go Collector(results, done)
// worker waitgroup
wg := new(sync.WaitGroup)
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go Worker(i, queue, results, wg)
}
// send tasks to the queue
for i := 0; i < 2000000; i++ {
work := Work{Id: fmt.Sprintf("%d", i)}
queue <- &work
}
// wait for the workers to finish
close(queue)
wg.Wait()
// wait for the collector to finish
close(results)
select {
case <-time.After(1e9):
break
case <-done:
break
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment