Last active
January 14, 2022 05:06
-
-
Save ASolchen/ee04bf690070b710e526d3a7868666a5 to your computer and use it in GitHub Desktop.
cip_server basic example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import socket, struct, time | |
from pycomm3.packets.base import ResponsePacket, RequestPacket | |
from pycomm3.packets.ethernetip import RegisterSessionResponsePacket, DataItem | |
from pycomm3.cip.services import Services, EncapsulationCommands | |
class CIPServer(object): | |
def __init__(self) -> None: | |
super().__init__() | |
self.session = 1 | |
self.host = '127.0.0.1' | |
self.port = 44818 | |
self.socket = None | |
self.connection = None | |
self.client_addr = None | |
self.commands = { | |
"register_session": self.register_session, | |
"send_rr_data": self.send_rr_data | |
} | |
def __enter__(self): | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as self.socket: | |
self.socket.bind((self.host, self.port)) | |
print(f"Listening on {self.host}:{self.port}") | |
self.socket.listen() | |
self.connection, self.client_addr = self.socket.accept() | |
with self.connection as conn: | |
print('Connected by', self.client_addr) | |
for x in range(100): | |
data = conn.recv(1024) | |
if data: | |
#print(f"<<< RECEIVE <<<\n{data}") | |
res = RequestPacket() | |
req = ResponsePacket(res, raw_data=data) | |
cmd = EncapsulationCommands[req.command] | |
if cmd in self.commands: | |
conn.sendall(self.commands[cmd](req)) | |
time.sleep(0.01) | |
def __exit__(self, *args): | |
if self.socket: | |
self.socket.close() | |
self.socket = None | |
def register_session(self, req): | |
packet = EncapsulationCommands.register_session | |
packet += struct.pack('H', 4) #Length= 4 (16-bit unsigned) | |
packet += struct.pack('I', self.session) #Session number server chooses (32-bit Unsigned) | |
packet += struct.pack('I', 0) #Success | |
packet += req.raw[12:20] #Sender context ("_pycomm_" in ASCII) | |
packet += struct.pack('I', 0) #Options | |
packet += struct.pack('I', 1) #Protocol version = 1 | |
self.session += 1 | |
return packet | |
def send_rr_data(self, req): | |
#Ethernet/IP | |
packet = EncapsulationCommands.send_rr_data #Send RR data command | |
packet += struct.pack('H', 22) #Length = 26 or 0x1a bytes | |
packet += req.raw[4:8] #Session number | |
packet += struct.pack('I', 0) #Success | |
packet += req.raw[12:20] #Sender context ("_pycomm_" in ASCII) | |
packet += struct.pack('I', 0) # Options | |
#Command Specific Data# | |
packet += struct.pack('I', 0) #interface handle | |
packet += struct.pack('H', 10) #timeout (10 in decimal) | |
packet += struct.pack('H', 2) #Item Count: 2 | |
packet += struct.pack('H', 0) #Type ID: Null Address Item (0x0000) | |
packet += struct.pack('H', 0) #Length: 0 | |
packet += DataItem.unconnected #Type ID: Unconnected Data Item (0x00b2) | |
packet += struct.pack('H', 6) #Length: 6 | |
packet += Services.get_attribute_single and b'\x80' #MSB is a flag 1=Response, 0=Request. The rest of the bits are the Service | |
#get_attribute_single = b"\x0E" && MSB of Response = 0x8e | |
packet += struct.pack('BBB',0,0,0) #Status = Success 3 bytes | |
packet += struct.pack('f', 3.14159) # Send float 3.14159 | |
return packet | |
with CIPServer() as server: | |
print("Server closing") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment