Skip to content

Instantly share code, notes, and snippets.

@kung-foo
Last active June 12, 2019 18:26
Show Gist options
  • Save kung-foo/b184a432de2e3063f891f96d42b25795 to your computer and use it in GitHub Desktop.
Save kung-foo/b184a432de2e3063f891f96d42b25795 to your computer and use it in GitHub Desktop.
package jotunheim
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"github.com/gopcua/opcua"
"github.com/gopcua/opcua/ua"
)
var (
ErrSlowConsumer = errors.New("slow consumer. messages dropped.")
)
type ErrHandler func(*opcua.Client, *opcua.Subscription, error)
type DataChangeMessage struct {
*ua.DataValue
Error error
NodeID *ua.NodeID
}
type NodeMonitor struct {
client *opcua.Client
notifyCh chan *DataChangeMessage
internalNotifyCh chan *opcua.PublishNotificationData
sub *opcua.Subscription
mu sync.RWMutex
handleMap map[uint32]*ua.NodeID
nodeMap map[string]uint32
nextClientHandle uint32
errHandlerCB ErrHandler
}
func NewNodeMonitor(client *opcua.Client, notifyCh chan *DataChangeMessage) (*NodeMonitor, error) {
var err error
m := &NodeMonitor{
client: client,
notifyCh: notifyCh,
internalNotifyCh: make(chan *opcua.PublishNotificationData), // TODO(jca): size?
handleMap: make(map[uint32]*ua.NodeID),
nodeMap: make(map[string]uint32),
nextClientHandle: 0,
}
if m.sub, err = client.Subscribe(&opcua.SubscriptionParameters{
Notifs: m.internalNotifyCh,
}); err != nil {
return nil, err
}
return m, nil
}
func (m *NodeMonitor) SetErrorHandler(cb ErrHandler) {
m.errHandlerCB = cb
}
func (m *NodeMonitor) Count() int {
m.mu.RLock()
n := len(m.nodeMap)
m.mu.RUnlock()
return n
}
func (m *NodeMonitor) Add(nodes ...string) error {
var err error
nodeIDs := make([]*ua.NodeID, len(nodes))
for i, node := range nodes {
if nodeIDs[i], err = ua.ParseNodeID(node); err != nil {
return err
}
}
return m.AddNodes(nodeIDs...)
}
func (m *NodeMonitor) AddNodes(nodes ...*ua.NodeID) error {
toAdd := make([]*ua.MonitoredItemCreateRequest, 0)
m.mu.Lock()
for _, node := range nodes {
if _, ok := m.nodeMap[node.String()]; ok {
continue
}
handle := atomic.AddUint32(&m.nextClientHandle, 1)
toAdd = append(toAdd, opcua.NewMonitoredItemCreateRequestWithDefaults(node, ua.AttributeIDValue, handle))
m.handleMap[handle] = node
m.nodeMap[node.String()] = handle
}
m.mu.Unlock()
resp, err := m.sub.Monitor(ua.TimestampsToReturnBoth, toAdd...)
if err != nil {
return err
}
if resp.ResponseHeader.ServiceResult != ua.StatusOK {
return resp.ResponseHeader.ServiceResult
}
return nil
}
func (m *NodeMonitor) RemoveNodes(nodes ...*ua.NodeID) error {
toRemove := make([]uint32, len(nodes))
m.mu.Lock()
for i, node := range nodes {
handle, ok := m.nodeMap[node.String()]
if !ok {
m.mu.Unlock()
return fmt.Errorf("node not found: %s", node)
}
delete(m.nodeMap, node.String())
delete(m.handleMap, handle)
toRemove[i] = handle
}
m.mu.Unlock()
resp, err := m.sub.Unmonitor(toRemove...)
if err != nil {
return err
}
if resp.ResponseHeader.ServiceResult != ua.StatusOK {
return resp.ResponseHeader.ServiceResult
}
return nil
}
func (m *NodeMonitor) Unsubscribe() error {
m.mu.Lock()
m.handleMap = make(map[uint32]*ua.NodeID)
m.nodeMap = make(map[string]uint32)
m.mu.Unlock()
// note, we are _not_ reseting `nextClientHandle`
if m.sub != nil {
return m.sub.Cancel()
}
return nil
}
func (m *NodeMonitor) waitForMessages(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case msg := <-m.internalNotifyCh:
if msg.Error != nil {
if m.errHandlerCB != nil {
m.errHandlerCB(m.client, m.sub, msg.Error)
}
continue
}
if msg.SubscriptionID != m.sub.SubscriptionID {
panic("wtf!?")
}
switch v := msg.Value.(type) {
case *ua.DataChangeNotification:
for _, item := range v.MonitoredItems {
m.mu.RLock()
nid, ok := m.handleMap[item.ClientHandle]
m.mu.RUnlock()
out := &DataChangeMessage{}
if !ok {
out.Error = fmt.Errorf("handle %d not found", item.ClientHandle)
} else {
out.NodeID = nid
out.DataValue = item.Value
}
select {
case m.notifyCh <- out:
default:
// slow consumer
if m.errHandlerCB != nil {
m.errHandlerCB(m.client, m.sub, ErrSlowConsumer)
}
}
}
default:
if m.errHandlerCB != nil {
m.errHandlerCB(m.client, m.sub, fmt.Errorf("unknown message type: %T", msg.Value))
}
}
}
}
}
func (m *NodeMonitor) Subscribe(ctx context.Context) {
go m.waitForMessages(ctx)
m.sub.Run(ctx)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment