Skip to content

Instantly share code, notes, and snippets.

@ASolchen
Last active January 14, 2022 05:06
Show Gist options
  • Save ASolchen/ee04bf690070b710e526d3a7868666a5 to your computer and use it in GitHub Desktop.
Save ASolchen/ee04bf690070b710e526d3a7868666a5 to your computer and use it in GitHub Desktop.
cip_server basic example
#!/usr/bin/env python3
import socket, struct, time
from pycomm3.packets.base import ResponsePacket, RequestPacket
from pycomm3.packets.ethernetip import RegisterSessionResponsePacket, DataItem
from pycomm3.cip.services import Services, EncapsulationCommands
class CIPServer(object):
def __init__(self) -> None:
super().__init__()
self.session = 1
self.host = '127.0.0.1'
self.port = 44818
self.socket = None
self.connection = None
self.client_addr = None
self.commands = {
"register_session": self.register_session,
"send_rr_data": self.send_rr_data
}
def __enter__(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as self.socket:
self.socket.bind((self.host, self.port))
print(f"Listening on {self.host}:{self.port}")
self.socket.listen()
self.connection, self.client_addr = self.socket.accept()
with self.connection as conn:
print('Connected by', self.client_addr)
for x in range(100):
data = conn.recv(1024)
if data:
#print(f"<<< RECEIVE <<<\n{data}")
res = RequestPacket()
req = ResponsePacket(res, raw_data=data)
cmd = EncapsulationCommands[req.command]
if cmd in self.commands:
conn.sendall(self.commands[cmd](req))
time.sleep(0.01)
def __exit__(self, *args):
if self.socket:
self.socket.close()
self.socket = None
def register_session(self, req):
packet = EncapsulationCommands.register_session
packet += struct.pack('H', 4) #Length= 4 (16-bit unsigned)
packet += struct.pack('I', self.session) #Session number server chooses (32-bit Unsigned)
packet += struct.pack('I', 0) #Success
packet += req.raw[12:20] #Sender context ("_pycomm_" in ASCII)
packet += struct.pack('I', 0) #Options
packet += struct.pack('I', 1) #Protocol version = 1
self.session += 1
return packet
def send_rr_data(self, req):
#Ethernet/IP
packet = EncapsulationCommands.send_rr_data #Send RR data command
packet += struct.pack('H', 22) #Length = 26 or 0x1a bytes
packet += req.raw[4:8] #Session number
packet += struct.pack('I', 0) #Success
packet += req.raw[12:20] #Sender context ("_pycomm_" in ASCII)
packet += struct.pack('I', 0) # Options
#Command Specific Data#
packet += struct.pack('I', 0) #interface handle
packet += struct.pack('H', 10) #timeout (10 in decimal)
packet += struct.pack('H', 2) #Item Count: 2
packet += struct.pack('H', 0) #Type ID: Null Address Item (0x0000)
packet += struct.pack('H', 0) #Length: 0
packet += DataItem.unconnected #Type ID: Unconnected Data Item (0x00b2)
packet += struct.pack('H', 6) #Length: 6
packet += Services.get_attribute_single and b'\x80' #MSB is a flag 1=Response, 0=Request. The rest of the bits are the Service
#get_attribute_single = b"\x0E" && MSB of Response = 0x8e
packet += struct.pack('BBB',0,0,0) #Status = Success 3 bytes
packet += struct.pack('f', 3.14159) # Send float 3.14159
return packet
with CIPServer() as server:
print("Server closing")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment