|  | package main | 
        
          |  |  | 
        
          |  | import ( | 
        
          |  | "bufio" | 
        
          |  | "fmt" | 
        
          |  | "log" | 
        
          |  | "math/rand" | 
        
          |  | "os" | 
        
          |  | "regexp" | 
        
          |  | "strings" | 
        
          |  | "sync" | 
        
          |  | "time" | 
        
          |  | ) | 
        
          |  |  | 
        
          |  | func main() { | 
        
          |  | // Get handle for file | 
        
          |  | file, err := os.Open("example.log") | 
        
          |  | if err != nil { | 
        
          |  | // Fail hard if it didn't work | 
        
          |  | log.Fatal(err) | 
        
          |  | } | 
        
          |  | // Ensure file is closed once main() returns (upon program exit) | 
        
          |  | defer file.Close() | 
        
          |  |  | 
        
          |  | // Number of workers a.k.a. goroutimes/threads (this should be at least the # of CPU cores) | 
        
          |  | numWorkers := 4 | 
        
          |  | // Mutex to manage the goroutimes/workers | 
        
          |  | var wg sync.WaitGroup | 
        
          |  | // Tell mutex how many goroutimes/workers there are to keep track of | 
        
          |  | wg.Add(numWorkers) | 
        
          |  | // Channel for fanning-out work items to be processed (cross-goroutine/cross-thread communication) | 
        
          |  | workc := make(chan string) | 
        
          |  |  | 
        
          |  | // Establish map and slice variables and mutex (semaphore) to protect concurrent access to them from multiple simultaneous goroutines | 
        
          |  | dateDict := make(map[string]int) | 
        
          |  | ipsToInvestigate := make([]string, 0) | 
        
          |  | mu := sync.Mutex{} | 
        
          |  |  | 
        
          |  | // Compile regexp expression for IP matching | 
        
          |  | numBlock := "(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])" | 
        
          |  | regexPattern := numBlock + "\\." + numBlock + "\\." + numBlock + "\\." + numBlock | 
        
          |  | ipRegEx := regexp.MustCompile(regexPattern) | 
        
          |  |  | 
        
          |  | // Start workers | 
        
          |  | for i := 0; i < numWorkers; i++ { | 
        
          |  | // Run each worker in a separate goroutine | 
        
          |  | go func(i int) { | 
        
          |  | // Loop over whatever comes out of the work channel, will block until channel is closed | 
        
          |  | for w := range workc { | 
        
          |  | // Note when something was received by a worker | 
        
          |  | log.Printf("Worker %d working line %s\n", i, w) | 
        
          |  |  | 
        
          |  | // DO WORK HERE! | 
        
          |  |  | 
        
          |  | // Find date by taking substring of line | 
        
          |  | date := w[0:10] | 
        
          |  |  | 
        
          |  | // Find any IPs | 
        
          |  | matches := ipRegEx.FindAllString(w, -1) | 
        
          |  |  | 
        
          |  | // Take the mutex and update the variable(s), release mutex when finished | 
        
          |  | mu.Lock() | 
        
          |  | dateDict[date] += 1 | 
        
          |  | ipsToInvestigate = append(ipsToInvestigate, matches...) | 
        
          |  | mu.Unlock() | 
        
          |  |  | 
        
          |  | // Simulate work that takes a variable amount of time (0s<=time<=5s in this example) | 
        
          |  | time.Sleep(time.Duration(rand.Intn(5)) * time.Second) | 
        
          |  | } | 
        
          |  |  | 
        
          |  | log.Printf("Worker %d stopping\n", i) | 
        
          |  | // Once channel closes, tell mutex that this thread is exiting: | 
        
          |  | wg.Done() | 
        
          |  | }(i) | 
        
          |  | } | 
        
          |  |  | 
        
          |  | // Feed the workers using an I/O buffer | 
        
          |  | scanner := bufio.NewScanner(file) | 
        
          |  | // Loop over lines in the file | 
        
          |  | for scanner.Scan() { | 
        
          |  | // For each line of text in the file, push the line into the channel (one at a time) | 
        
          |  | // Lines will be pushed into the channel and pulled out of the channel sequentially, but work may not complete sequentially as multiple goroutines are vying for each work item | 
        
          |  | workc <- scanner.Text() | 
        
          |  | } | 
        
          |  | // Once finished looping over the file lines, close the channel | 
        
          |  | // This signals to the workers to stop 'working' | 
        
          |  | // The last value can still be pulled out of a channel even after it's closed | 
        
          |  | close(workc) | 
        
          |  |  | 
        
          |  | // Make sure there were no errors reading the input file | 
        
          |  | if err := scanner.Err(); err != nil { | 
        
          |  | log.Fatal(err) | 
        
          |  | } | 
        
          |  |  | 
        
          |  | // Wait for any last work to be finished | 
        
          |  | // Will block until all goroutines/workers have called wg.Done() | 
        
          |  | // This is critical, because without it the entire program could exit before all the worker goroutines return/finish their processing | 
        
          |  | wg.Wait() | 
        
          |  | log.Println("All workers stopped successfully") | 
        
          |  |  | 
        
          |  | // Print the results | 
        
          |  | fmt.Println("===============\nRESULTS:") | 
        
          |  |  | 
        
          |  | // Loop over map (note that map keys are not sorted/sortable) | 
        
          |  | for k, v := range dateDict { | 
        
          |  | fmt.Printf("Date %s had %d occurances\n", k, v) | 
        
          |  | } | 
        
          |  |  | 
        
          |  | // Operations on string slice | 
        
          |  | fmt.Printf("IPs (length is %d):\n", len(ipsToInvestigate)) | 
        
          |  | fmt.Println(strings.Join(ipsToInvestigate, ", ")) | 
        
          |  | } |