Last active
June 12, 2019 18:26
-
-
Save kung-foo/b184a432de2e3063f891f96d42b25795 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package jotunheim | |
import ( | |
"context" | |
"errors" | |
"fmt" | |
"sync" | |
"sync/atomic" | |
"github.com/gopcua/opcua" | |
"github.com/gopcua/opcua/ua" | |
) | |
var ( | |
ErrSlowConsumer = errors.New("slow consumer. messages dropped.") | |
) | |
type ErrHandler func(*opcua.Client, *opcua.Subscription, error) | |
type DataChangeMessage struct { | |
*ua.DataValue | |
Error error | |
NodeID *ua.NodeID | |
} | |
type NodeMonitor struct { | |
client *opcua.Client | |
notifyCh chan *DataChangeMessage | |
internalNotifyCh chan *opcua.PublishNotificationData | |
sub *opcua.Subscription | |
mu sync.RWMutex | |
handleMap map[uint32]*ua.NodeID | |
nodeMap map[string]uint32 | |
nextClientHandle uint32 | |
errHandlerCB ErrHandler | |
} | |
func NewNodeMonitor(client *opcua.Client, notifyCh chan *DataChangeMessage) (*NodeMonitor, error) { | |
var err error | |
m := &NodeMonitor{ | |
client: client, | |
notifyCh: notifyCh, | |
internalNotifyCh: make(chan *opcua.PublishNotificationData), // TODO(jca): size? | |
handleMap: make(map[uint32]*ua.NodeID), | |
nodeMap: make(map[string]uint32), | |
nextClientHandle: 0, | |
} | |
if m.sub, err = client.Subscribe(&opcua.SubscriptionParameters{ | |
Notifs: m.internalNotifyCh, | |
}); err != nil { | |
return nil, err | |
} | |
return m, nil | |
} | |
func (m *NodeMonitor) SetErrorHandler(cb ErrHandler) { | |
m.errHandlerCB = cb | |
} | |
func (m *NodeMonitor) Count() int { | |
m.mu.RLock() | |
n := len(m.nodeMap) | |
m.mu.RUnlock() | |
return n | |
} | |
func (m *NodeMonitor) Add(nodes ...string) error { | |
var err error | |
nodeIDs := make([]*ua.NodeID, len(nodes)) | |
for i, node := range nodes { | |
if nodeIDs[i], err = ua.ParseNodeID(node); err != nil { | |
return err | |
} | |
} | |
return m.AddNodes(nodeIDs...) | |
} | |
func (m *NodeMonitor) AddNodes(nodes ...*ua.NodeID) error { | |
toAdd := make([]*ua.MonitoredItemCreateRequest, 0) | |
m.mu.Lock() | |
for _, node := range nodes { | |
if _, ok := m.nodeMap[node.String()]; ok { | |
continue | |
} | |
handle := atomic.AddUint32(&m.nextClientHandle, 1) | |
toAdd = append(toAdd, opcua.NewMonitoredItemCreateRequestWithDefaults(node, ua.AttributeIDValue, handle)) | |
m.handleMap[handle] = node | |
m.nodeMap[node.String()] = handle | |
} | |
m.mu.Unlock() | |
resp, err := m.sub.Monitor(ua.TimestampsToReturnBoth, toAdd...) | |
if err != nil { | |
return err | |
} | |
if resp.ResponseHeader.ServiceResult != ua.StatusOK { | |
return resp.ResponseHeader.ServiceResult | |
} | |
return nil | |
} | |
func (m *NodeMonitor) RemoveNodes(nodes ...*ua.NodeID) error { | |
toRemove := make([]uint32, len(nodes)) | |
m.mu.Lock() | |
for i, node := range nodes { | |
handle, ok := m.nodeMap[node.String()] | |
if !ok { | |
m.mu.Unlock() | |
return fmt.Errorf("node not found: %s", node) | |
} | |
delete(m.nodeMap, node.String()) | |
delete(m.handleMap, handle) | |
toRemove[i] = handle | |
} | |
m.mu.Unlock() | |
resp, err := m.sub.Unmonitor(toRemove...) | |
if err != nil { | |
return err | |
} | |
if resp.ResponseHeader.ServiceResult != ua.StatusOK { | |
return resp.ResponseHeader.ServiceResult | |
} | |
return nil | |
} | |
func (m *NodeMonitor) Unsubscribe() error { | |
m.mu.Lock() | |
m.handleMap = make(map[uint32]*ua.NodeID) | |
m.nodeMap = make(map[string]uint32) | |
m.mu.Unlock() | |
// note, we are _not_ reseting `nextClientHandle` | |
if m.sub != nil { | |
return m.sub.Cancel() | |
} | |
return nil | |
} | |
func (m *NodeMonitor) waitForMessages(ctx context.Context) { | |
for { | |
select { | |
case <-ctx.Done(): | |
return | |
case msg := <-m.internalNotifyCh: | |
if msg.Error != nil { | |
if m.errHandlerCB != nil { | |
m.errHandlerCB(m.client, m.sub, msg.Error) | |
} | |
continue | |
} | |
if msg.SubscriptionID != m.sub.SubscriptionID { | |
panic("wtf!?") | |
} | |
switch v := msg.Value.(type) { | |
case *ua.DataChangeNotification: | |
for _, item := range v.MonitoredItems { | |
m.mu.RLock() | |
nid, ok := m.handleMap[item.ClientHandle] | |
m.mu.RUnlock() | |
out := &DataChangeMessage{} | |
if !ok { | |
out.Error = fmt.Errorf("handle %d not found", item.ClientHandle) | |
} else { | |
out.NodeID = nid | |
out.DataValue = item.Value | |
} | |
select { | |
case m.notifyCh <- out: | |
default: | |
// slow consumer | |
if m.errHandlerCB != nil { | |
m.errHandlerCB(m.client, m.sub, ErrSlowConsumer) | |
} | |
} | |
} | |
default: | |
if m.errHandlerCB != nil { | |
m.errHandlerCB(m.client, m.sub, fmt.Errorf("unknown message type: %T", msg.Value)) | |
} | |
} | |
} | |
} | |
} | |
func (m *NodeMonitor) Subscribe(ctx context.Context) { | |
go m.waitForMessages(ctx) | |
m.sub.Run(ctx) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment