Skip to content

Instantly share code, notes, and snippets.

@astockwell
Last active April 30, 2019 14:59
Show Gist options
  • Save astockwell/3e5716546b4497a93f410f35311ed333 to your computer and use it in GitHub Desktop.
Save astockwell/3e5716546b4497a93f410f35311ed333 to your computer and use it in GitHub Desktop.
Golang Worker (Toy) Example

Golang Worker (Toy) Example

Demonstrates an example of a work queue fanning-out work items to multiple concurrent workers (goroutines).

The work queue source is lines from a log file in this example, but can be anything. The number of workers is variable and can be adjusted from 1 to 100,000's. As of Go 1.5, the runtime uses all visible CPUs (whatever your operating system considers to be a CPU) to schedule concurrent goroutine execution.

In this example, "work" done on each log file line is simulated by the worker sleeping for a randomized # of seconds from 0-5. When running, you can observe that the workers pull "work" items in an unpredictable order (e.g. you never know which worker will get the next item) and they operate in parallel.

This example also includes 2 variables (dateDict and ipsToInvestigate) that multiple goroutines share and write to (potentially simultaneously), and thus must be protected by a mutex/semaphore (mu).

Try dialing the number of workers (numWorkers) down to 1 or up to 50 and see how execution changes.

Running the Example

  1. Install Golang (https://golang.org/dl/)
  2. Clone the repository/gist
  3. cd into cloned repo
  4. go run main.go
2016-03-01 21:02:22,361 INFO Accessed by 1.1.1.1
2016-03-01 22:02:22,361 INFO Accessed by 1.1.1.1
2016-03-01 23:02:22,361 INFO Accessed by 1.1.1.1
2016-03-02 01:02:22,361 INFO Accessed by 1.1.1.1
2016-03-02 02:02:22,361 INFO Accessed by 1.1.1.1
2016-03-02 03:02:22,361 INFO Accessed by 1.1.1.1
2016-03-03 21:02:22,361 INFO Accessed by 1.1.1.1
2016-03-03 22:02:22,361 INFO Accessed by 1.1.1.1
2016-03-03 23:02:22,361 INFO Accessed by 1.1.1.1
2016-03-04 01:02:22,361 INFO Accessed by 1.1.1.1
2016-03-04 02:02:22,361 INFO Accessed by 1.1.1.1
2016-03-04 03:02:22,361 INFO Accessed by 1.1.1.1
2016-03-05 21:02:22,361 INFO Accessed by 1.1.1.1
2016-03-05 22:02:22,361 INFO Accessed by 1.1.1.1
2016-03-05 23:02:22,361 INFO Accessed by 1.1.1.1
2016-03-06 01:02:22,361 INFO Accessed by 1.1.1.1
2016-03-06 02:02:22,361 INFO Accessed by 1.1.1.1
2016-03-06 03:02:22,361 INFO Accessed by 1.1.1.1
2016-03-07 21:02:22,361 INFO Accessed by 1.1.1.1
2016-03-07 22:02:22,361 INFO Accessed by 1.1.1.1
2016-03-07 23:02:22,361 INFO Accessed by 1.1.1.1
2016-03-08 01:02:22,361 INFO Accessed by 1.1.1.1
2016-03-08 02:02:22,361 INFO Accessed by 1.1.1.1
2016-03-08 03:02:22,361 INFO Accessed by 1.1.1.1
package main
import (
"bufio"
"fmt"
"log"
"math/rand"
"os"
"regexp"
"strings"
"sync"
"time"
)
func main() {
// Get handle for file
file, err := os.Open("example.log")
if err != nil {
// Fail hard if it didn't work
log.Fatal(err)
}
// Ensure file is closed once main() returns (upon program exit)
defer file.Close()
// Number of workers a.k.a. goroutimes/threads (this should be at least the # of CPU cores)
numWorkers := 4
// Mutex to manage the goroutimes/workers
var wg sync.WaitGroup
// Tell mutex how many goroutimes/workers there are to keep track of
wg.Add(numWorkers)
// Channel for fanning-out work items to be processed (cross-goroutine/cross-thread communication)
workc := make(chan string)
// Establish map and slice variables and mutex (semaphore) to protect concurrent access to them from multiple simultaneous goroutines
dateDict := make(map[string]int)
ipsToInvestigate := make([]string, 0)
mu := sync.Mutex{}
// Compile regexp expression for IP matching
numBlock := "(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])"
regexPattern := numBlock + "\\." + numBlock + "\\." + numBlock + "\\." + numBlock
ipRegEx := regexp.MustCompile(regexPattern)
// Start workers
for i := 0; i < numWorkers; i++ {
// Run each worker in a separate goroutine
go func(i int) {
// Loop over whatever comes out of the work channel, will block until channel is closed
for w := range workc {
// Note when something was received by a worker
log.Printf("Worker %d working line %s\n", i, w)
// DO WORK HERE!
// Find date by taking substring of line
date := w[0:10]
// Find any IPs
matches := ipRegEx.FindAllString(w, -1)
// Take the mutex and update the variable(s), release mutex when finished
mu.Lock()
dateDict[date] += 1
ipsToInvestigate = append(ipsToInvestigate, matches...)
mu.Unlock()
// Simulate work that takes a variable amount of time (0s<=time<=5s in this example)
time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
}
log.Printf("Worker %d stopping\n", i)
// Once channel closes, tell mutex that this thread is exiting:
wg.Done()
}(i)
}
// Feed the workers using an I/O buffer
scanner := bufio.NewScanner(file)
// Loop over lines in the file
for scanner.Scan() {
// For each line of text in the file, push the line into the channel (one at a time)
// Lines will be pushed into the channel and pulled out of the channel sequentially, but work may not complete sequentially as multiple goroutines are vying for each work item
workc <- scanner.Text()
}
// Once finished looping over the file lines, close the channel
// This signals to the workers to stop 'working'
// The last value can still be pulled out of a channel even after it's closed
close(workc)
// Make sure there were no errors reading the input file
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
// Wait for any last work to be finished
// Will block until all goroutines/workers have called wg.Done()
// This is critical, because without it the entire program could exit before all the worker goroutines return/finish their processing
wg.Wait()
log.Println("All workers stopped successfully")
// Print the results
fmt.Println("===============\nRESULTS:")
// Loop over map (note that map keys are not sorted/sortable)
for k, v := range dateDict {
fmt.Printf("Date %s had %d occurances\n", k, v)
}
// Operations on string slice
fmt.Printf("IPs (length is %d):\n", len(ipsToInvestigate))
fmt.Println(strings.Join(ipsToInvestigate, ", "))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment