Skip to content

Instantly share code, notes, and snippets.

@apfohl
Created January 8, 2025 20:59
Show Gist options
  • Save apfohl/52d79dc6953785904ff88c29970c5a03 to your computer and use it in GitHub Desktop.
Save apfohl/52d79dc6953785904ff88c29970c5a03 to your computer and use it in GitHub Desktop.
Concurrency with agent pattern
package main
import (
"fmt"
"image"
"image/color"
"io"
"os"
"strings"
"sync"
"testing"
)
type Agent[TCommand any, TReply any] interface {
Tell(TCommand) TReply
}
type commandWithReply[TCommand any, TReply any] struct {
command TCommand
reply chan TReply
}
type StatefulAgent[TState any, TCommand any, TReply any] struct {
state TState
channel chan commandWithReply[TCommand, TReply]
processor func(*TState, TCommand) TReply
}
func (agent *StatefulAgent[TState, TCommand, TReply]) Tell(command TCommand) TReply {
reply := make(chan TReply)
agent.channel <- commandWithReply[TCommand, TReply]{command, reply}
return <-reply
}
func (agent *StatefulAgent[TState, TCommand, TReply]) run() {
for cmd := range agent.channel {
cmd.reply <- agent.processor(&agent.state, cmd.command)
}
}
func StartAgent[TState any, TCommand any, TReply any](initialState TState, processor func(*TState, TCommand) TReply) Agent[TCommand, TReply] {
agent := StatefulAgent[TState, TCommand, TReply]{
state: initialState,
channel: make(chan commandWithReply[TCommand, TReply]),
processor: processor,
}
go agent.run()
return &agent
}
type Fetcher interface {
// Fetch returns the body of URL and
// a slice of URLs found on that page.
Fetch(url string) (body string, urls []string, err error)
}
// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher, agent Agent[Command, *string], wg *sync.WaitGroup) {
defer wg.Done()
tell := agent.Tell(ExistsCommand{Key: url})
if tell != nil {
return
}
if depth <= 0 {
return
}
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("found: %s %q\n", url, body)
agent.Tell(AddCommand{Key: url})
for _, u := range urls {
wg.Add(1)
go Crawl(u, depth-1, fetcher, agent, wg)
}
return
}
type Command interface{}
type AddCommand struct {
Key string
Value string
}
type ExistsCommand struct {
Key string
}
func processor(state *map[string]string, command Command) *string {
switch cmd := command.(type) {
case AddCommand:
(*state)[cmd.Key] = cmd.Value
return &cmd.Value
case ExistsCommand:
value, exists := (*state)[cmd.Key]
if exists {
return &value
}
return nil
default:
panic("unknown command")
}
}
func TestCrawler(t *testing.T) {
var wg sync.WaitGroup
agent := StartAgent(make(map[string]string), processor)
wg.Add(1)
go Crawl("https://golang.org/", 4, fetcher, agent, &wg)
wg.Wait()
}
// fakeFetcher is Fetcher that returns canned results.
type fakeFetcher map[string]*fakeResult
type fakeResult struct {
body string
urls []string
}
func (f fakeFetcher) Fetch(url string) (string, []string, error) {
if res, ok := f[url]; ok {
return res.body, res.urls, nil
}
return "", nil, fmt.Errorf("not found: %s", url)
}
// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{
"https://golang.org/": &fakeResult{
"The Go Programming Language",
[]string{
"https://golang.org/pkg/",
"https://golang.org/cmd/",
},
},
"https://golang.org/pkg/": &fakeResult{
"Packages",
[]string{
"https://golang.org/",
"https://golang.org/cmd/",
"https://golang.org/pkg/fmt/",
"https://golang.org/pkg/os/",
},
},
"https://golang.org/pkg/fmt/": &fakeResult{
"Package fmt",
[]string{
"https://golang.org/",
"https://golang.org/pkg/",
},
},
"https://golang.org/pkg/os/": &fakeResult{
"Package os",
[]string{
"https://golang.org/",
"https://golang.org/pkg/",
},
},
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment