Last active
June 15, 2021 19:53
-
-
Save shantanoo-desai/1b56ebddd40862d62849adf7413b1f14 to your computer and use it in GitHub Desktop.
basic OPC-UA Server/Client with Python3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from opcua import Client | |
from opcua import ua | |
try: | |
from IPython import embed | |
except ImportError: | |
import code | |
def embed(): | |
vars = globals() | |
vars.update(locals()) | |
shell = code.InteractiveConsole(vars) | |
shell.interact() | |
class SubHandler(object): | |
def datachange_notification(self, node, val, data): | |
# apply logic HERE | |
print("New Change: ", node, val) | |
def event_notification(self, event): | |
print("New Event: ", event) | |
if __name__ == "__main__": | |
sim_client = Client('opc.tcp://127.0.0.1:4840/simserver') | |
try: | |
sim_client.connect() | |
sim_client.load_type_definitions() | |
ns_array = sim_client.get_namespace_array() | |
print("Namespaces: ", ns_array) | |
objects = sim_client.get_objects_node() | |
print("Objects node: ", objects) | |
print("children: ", objects.get_children()) | |
sim_client.get_namespace_index(ns_array[2]) | |
temp_var = sim_client.get_node('ns=2;i=2') | |
press_var = sim_client.get_node('ns=2;i=3') | |
time_var = sim_client.get_node('ns=2;i=4') | |
handler = SubHandler() | |
# set subscription for 500ms here to get updated data | |
sub = sim_client.create_subscription(500, handler) | |
temp_handle = sub.subscribe_data_change(temp_var) | |
press_handle = sub.subscribe_data_change(press_var) | |
time_handle = sub.subscribe_data_change(time_var) | |
time.sleep(0.2) | |
sub.subscribe_events() | |
while 1: | |
# poll every second for subscriptions | |
time.sleep(1.0) | |
finally: | |
sim_client.disconnect() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from opcua import Server | |
from random import randint | |
import datetime | |
import time | |
server = Server() | |
url = 'opc.tcp://127.0.0.1:4840/simserver' | |
server.set_endpoint(url) | |
namespace = "https://simserver.dummy.robotic.org" | |
add_ns = server.register_namespace(namespace) | |
node = server.get_objects_node() | |
print("Object Node: ", node) | |
Param = node.add_object(add_ns, "Parameters") | |
print("Added Object: ", Param) | |
temp = Param.add_variable(add_ns, "Temperature", 0) | |
pressure = Param.add_variable(add_ns, "Pressure", 0) | |
time_stamp = Param.add_variable(add_ns, "Time", 0) | |
temp.set_writable() | |
pressure.set_writable() | |
time_stamp.set_writable() | |
print(f"{temp}, {pressure}, {time_stamp}") | |
server.start() | |
while True: | |
try: | |
new_temp = randint(10, 50) | |
new_pressure = randint(200, 990) | |
new_time = datetime.datetime.now() | |
temp.set_value(new_temp) | |
pressure.set_value(new_pressure) | |
time_stamp.set_value(new_time) | |
print(f"update: {new_temp}, {new_pressure}, {new_time}") | |
time.sleep(1.0) | |
except KeyboardInterrupt: | |
server.stop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment