Skip to content

Instantly share code, notes, and snippets.

@shantanoo-desai
Last active June 15, 2021 19:53
Show Gist options
  • Save shantanoo-desai/1b56ebddd40862d62849adf7413b1f14 to your computer and use it in GitHub Desktop.
Save shantanoo-desai/1b56ebddd40862d62849adf7413b1f14 to your computer and use it in GitHub Desktop.
basic OPC-UA Server/Client with Python3
import time
from opcua import Client
from opcua import ua
try:
from IPython import embed
except ImportError:
import code
def embed():
vars = globals()
vars.update(locals())
shell = code.InteractiveConsole(vars)
shell.interact()
class SubHandler(object):
def datachange_notification(self, node, val, data):
# apply logic HERE
print("New Change: ", node, val)
def event_notification(self, event):
print("New Event: ", event)
if __name__ == "__main__":
sim_client = Client('opc.tcp://127.0.0.1:4840/simserver')
try:
sim_client.connect()
sim_client.load_type_definitions()
ns_array = sim_client.get_namespace_array()
print("Namespaces: ", ns_array)
objects = sim_client.get_objects_node()
print("Objects node: ", objects)
print("children: ", objects.get_children())
sim_client.get_namespace_index(ns_array[2])
temp_var = sim_client.get_node('ns=2;i=2')
press_var = sim_client.get_node('ns=2;i=3')
time_var = sim_client.get_node('ns=2;i=4')
handler = SubHandler()
# set subscription for 500ms here to get updated data
sub = sim_client.create_subscription(500, handler)
temp_handle = sub.subscribe_data_change(temp_var)
press_handle = sub.subscribe_data_change(press_var)
time_handle = sub.subscribe_data_change(time_var)
time.sleep(0.2)
sub.subscribe_events()
while 1:
# poll every second for subscriptions
time.sleep(1.0)
finally:
sim_client.disconnect()
from opcua import Server
from random import randint
import datetime
import time
server = Server()
url = 'opc.tcp://127.0.0.1:4840/simserver'
server.set_endpoint(url)
namespace = "https://simserver.dummy.robotic.org"
add_ns = server.register_namespace(namespace)
node = server.get_objects_node()
print("Object Node: ", node)
Param = node.add_object(add_ns, "Parameters")
print("Added Object: ", Param)
temp = Param.add_variable(add_ns, "Temperature", 0)
pressure = Param.add_variable(add_ns, "Pressure", 0)
time_stamp = Param.add_variable(add_ns, "Time", 0)
temp.set_writable()
pressure.set_writable()
time_stamp.set_writable()
print(f"{temp}, {pressure}, {time_stamp}")
server.start()
while True:
try:
new_temp = randint(10, 50)
new_pressure = randint(200, 990)
new_time = datetime.datetime.now()
temp.set_value(new_temp)
pressure.set_value(new_pressure)
time_stamp.set_value(new_time)
print(f"update: {new_temp}, {new_pressure}, {new_time}")
time.sleep(1.0)
except KeyboardInterrupt:
server.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment