Last active
September 2, 2020 16:37
-
-
Save risentveber/05748385092674e45c3022115cba754a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package dynamicqueue | |
import ( | |
"sync" | |
) | |
type DynamicQueue interface { | |
Add(item interface{}) | |
Stop() | |
} | |
type QueueHandler interface { | |
OnItem(item interface{}) // will be run in parallel | |
} | |
func NewDynamicQueue(workersCount int, handler QueueHandler) DynamicQueue { | |
q := &dynamicQueue{ | |
handler: handler, | |
newItems: make(chan interface{}), | |
} | |
q.itemsToProcess = NewDynamicallyBufferedChannel(q.newItems) | |
q.WaitGroup.Add(workersCount) | |
for i := 0; i < workersCount; i++ { | |
go q.worker() | |
} | |
return q | |
} | |
type dynamicQueue struct { | |
newItems chan interface{} | |
itemsToProcess <- chan interface{} | |
handler QueueHandler | |
sync.WaitGroup | |
} | |
func (dq *dynamicQueue) worker() { | |
defer dq.Done() | |
for item := range dq.itemsToProcess { | |
dq.handler.OnItem(item) | |
} | |
} | |
func (dq *dynamicQueue) Add(item interface{}) { | |
dq.newItems <- item | |
} | |
func (dq *dynamicQueue) Stop() { | |
close(dq.newItems) | |
dq.Wait() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment