|
package strbuf |
|
|
|
import ( |
|
"fmt" |
|
"sync" |
|
) |
|
|
|
// CallbackMatch stores the match condition |
|
type CallbackMatch struct { |
|
matcher func(string, string) bool |
|
condition string |
|
callback func(string) |
|
async bool |
|
} |
|
|
|
// Match returns if the string is a match or not |
|
func (cbm *CallbackMatch) Match(line string) bool { |
|
return cbm.matcher(line, cbm.condition) |
|
} |
|
|
|
// Call executes the callback |
|
func (cbm *CallbackMatch) Call(line string) { |
|
if cbm.async { |
|
go cbm.callback(line) |
|
return |
|
} |
|
cbm.callback(line) |
|
} |
|
|
|
// LineCallback calls back functions if it finds lines |
|
type LineCallback struct { |
|
callbackLock *sync.RWMutex |
|
callbacks []*CallbackMatch |
|
|
|
running bool |
|
lines chan string |
|
} |
|
|
|
// Add adds the callback to the list. |
|
// Doesn't protect against duplicates. |
|
func (lcb *LineCallback) Add( |
|
matcher func(string, string) bool, |
|
condition string, |
|
callback func(string), |
|
async bool, |
|
) { |
|
lcb.callbackLock.Lock() |
|
defer lcb.callbackLock.Unlock() |
|
|
|
lcb.callbacks = append(lcb.callbacks, &CallbackMatch{ |
|
matcher: matcher, |
|
condition: condition, |
|
callback: callback, |
|
async: async, |
|
}) |
|
} |
|
|
|
// AddCallback adds the callback to the list. |
|
// Doesn't protect against duplicates. |
|
func (lcb *LineCallback) AddCallback(callback *CallbackMatch) { |
|
lcb.callbackLock.Lock() |
|
defer lcb.callbackLock.Unlock() |
|
|
|
lcb.callbacks = append(lcb.callbacks, callback) |
|
} |
|
|
|
// Line executes the LineCallback checks |
|
func (lcb *LineCallback) Line(line string) { |
|
lcb.callbackLock.RLock() |
|
defer lcb.callbackLock.RUnlock() |
|
|
|
if lcb.lines != nil { |
|
if lcb.running { |
|
// can panic c: |
|
lcb.lines <- line |
|
} |
|
return |
|
} |
|
|
|
for i := 0; i < len(lcb.callbacks); i++ { |
|
if lcb.callbacks[i].Match(line) { |
|
lcb.callbacks[i].Call(line) |
|
} |
|
} |
|
} |
|
|
|
// service is a worker to process incoming lines |
|
func (lcb *LineCallback) service() { |
|
for line := range lcb.lines { |
|
func() { |
|
lcb.callbackLock.RLock() |
|
defer lcb.callbackLock.RUnlock() |
|
|
|
for i := 0; i < len(lcb.callbacks); i++ { |
|
if lcb.callbacks[i].Match(line) { |
|
lcb.callbacks[i].Call(line) |
|
} |
|
} |
|
}() |
|
} |
|
} |
|
|
|
// Start starts the LineCallback as a worker to process linmes async |
|
func (lcb *LineCallback) Start(buffersize int) error { |
|
lcb.callbackLock.Lock() |
|
defer lcb.callbackLock.Unlock() |
|
|
|
if lcb.running { |
|
return fmt.Errorf("LineCallback service has already running") |
|
} |
|
|
|
lcb.running = true |
|
lcb.lines = make(chan string, buffersize) |
|
|
|
go lcb.service() |
|
|
|
return nil |
|
} |
|
|
|
// Stop stops the LineCallback |
|
func (lcb *LineCallback) Stop() error { |
|
lcb.callbackLock.Lock() |
|
defer lcb.callbackLock.Unlock() |
|
|
|
if !lcb.running { |
|
return fmt.Errorf("LineCallback service has already stopped") |
|
} |
|
|
|
// Can panic c: |
|
// Don't null this out, as we need it to determine if our object was set up |
|
// as a worker or not |
|
close(lcb.lines) |
|
|
|
return nil |
|
} |
|
|
|
// Close stops the LineCallback. Synonymous with LineCallback::Stop() |
|
// This exists so that LineCallback can satasify an io.Closer |
|
func (lcb *LineCallback) Close() error { |
|
return lcb.Stop() |
|
} |
|
|
|
// NewLineCallback creates a new LineCallback manager |
|
func NewLineCallback() *LineCallback { |
|
return &LineCallback{ |
|
callbacks: []*CallbackMatch{}, |
|
callbackLock: &sync.RWMutex{}, |
|
} |
|
} |