Last active
April 7, 2016 05:41
-
-
Save maiah/28652fd2b121ef2634e4b9911338bf44 to your computer and use it in GitHub Desktop.
Syncing and Cancellable gophers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"errors" | |
"sync" | |
"time" | |
) | |
func main() { | |
sm := NewSyncedMessage("abc") | |
f1 := NewFilter() | |
f1.Do = func() { | |
resultCh := make(chan string) | |
go func() { | |
resultCh <- "def" | |
}() | |
select { | |
case result := <-resultCh: | |
sm.Update(func(message string) (string, error) { | |
return message + result, nil | |
}) | |
case <-f1.CancelCh: | |
} | |
close(resultCh) | |
f1.Done = true | |
} | |
f2 := NewFilter() | |
f2.Do = func() { | |
resultCh := make(chan string) | |
go func() { | |
time.Sleep(1 * time.Millisecond) // simulate delay | |
resultCh <- "ghi" | |
}() | |
select { | |
case <-resultCh: | |
sm.Update(func(message string) (string, error) { | |
//return message + result, nil | |
return message, errors.New("ERROR ON GHI") // Simulate error | |
}) | |
case <-f2.CancelCh: | |
} | |
close(resultCh) | |
f2.Done = true | |
} | |
f3 := NewFilter() | |
f3.Do = func() { | |
resultCh := make(chan string) | |
go func() { | |
time.Sleep(2 * time.Millisecond) // simulate delay | |
resultCh <- "jkl" | |
}() | |
select { | |
case result := <-resultCh: | |
sm.Update(func(message string) (string, error) { | |
return message + result, nil | |
}) | |
case <-f3.CancelCh: | |
} | |
close(resultCh) | |
f3.Done = true | |
} | |
result, err := Process(sm, f1, f2, f3) | |
println("Final result is " + result) | |
if err != nil { | |
println("Got error: " + err.Error()) | |
} | |
} | |
type MessageResult struct { | |
message string | |
err error | |
} | |
type SyncedMessage struct { | |
message string | |
mtx *sync.Mutex | |
resultCh chan MessageResult | |
} | |
func NewSyncedMessage(msg string) *SyncedMessage { | |
return &SyncedMessage{ | |
message: msg, | |
mtx: &sync.Mutex{}, | |
resultCh: make(chan MessageResult), | |
} | |
} | |
func (sm *SyncedMessage) Update(fn func(string) (string, error)) { | |
sm.mtx.Lock() | |
result, err := fn(sm.message) | |
sm.message = result | |
sm.resultCh <- MessageResult{result, err} | |
sm.mtx.Unlock() | |
} | |
type Filter struct { | |
Do func() | |
Done bool | |
CancelCh chan bool | |
} | |
func NewFilter() *Filter { | |
return &Filter{CancelCh: make(chan bool)} | |
} | |
func Process(sm *SyncedMessage, filters ...*Filter) (string, error) { | |
for _, f := range filters { | |
go f.Do() | |
} | |
var finalResult MessageResult | |
count := 0 | |
for result := range sm.resultCh { | |
finalResult = result | |
count++ | |
if result.err == nil { | |
if count == len(filters) { | |
break // Exit loop | |
} | |
} else { | |
for _, filter := range filters { | |
if !filter.Done { | |
filter.CancelCh <- true | |
} | |
} | |
break // Exit loop | |
} | |
} | |
return finalResult.message, finalResult.err | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment