Created
August 6, 2020 12:09
-
-
Save felixvd/d75932fc0dd337e31c9e817412b39389 to your computer and use it in GitHub Desktop.
Debugging notes for Modbus RTU/TCP for Robotiq gripper
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import sys | |
import socket | |
from math import ceil | |
import time | |
import threading | |
from pymodbus.client.sync import ModbusTcpClient | |
from pymodbus.register_read_message import ReadInputRegistersResponse | |
# Based on comModbusTcp.py and comModbusRtu.py in https://github.com/ros-industrial/robotiq | |
class ComModbus: | |
def __init__(self): | |
self.client = None | |
self.lock = threading.Lock() | |
def connectToDeviceTCP(self, address, port=''): | |
""" | |
Connection to the client - the method takes the IP address (as a string, e.g. '192.168.1.11') as an argument. | |
""" | |
if port: | |
self.client = ModbusTcpClient(address, port=port) | |
else: | |
self.client = ModbusTcpClient(address) | |
if not self.client.connect(): | |
print("Unable to connect to " + address) | |
return False | |
def connectToDeviceSerial(self, device): | |
"""Connection to the client - the method takes the IP address (as a string, e.g. '192.168.1.11') as an argument.""" | |
self.client = ModbusSerialClient(method='rtu',port=device,stopbits=1, bytesize=8, baudrate=115200, timeout=0.2) | |
if not self.client.connect(): | |
print("Unable to connect to " + device) | |
return False | |
def disconnectFromDevice(self): | |
"""Close connection""" | |
self.client.close() | |
def sendCommand(self, data): | |
""" | |
Send a command to the Gripper - the method takes a list of uint8 as an argument. | |
The meaning of each variable depends on the Gripper model | |
""" | |
# make sure data has an even number of elements | |
if(len(data) % 2 == 1): | |
data.append(0) | |
# Initiate message as an empty list | |
message = [] | |
# Fill message by combining two bytes in one register | |
for i in range(0, len(data)/2): | |
message.append((data[2*i] << 8) + data[2*i+1]) | |
try: | |
with self.lock: | |
self.client.write_registers(0, message) | |
except Exception as e: | |
print("Encountered an exception when writing registers:") | |
print(e) | |
def getStatus(self, numBytes): | |
""" | |
Sends a request to read, wait for the response and returns the Gripper status. | |
The method gets the number of bytes to read as an argument | |
""" | |
numRegs = int(ceil(numBytes/2.0)) | |
# Get status from the device | |
with self.lock: | |
response = self.client.read_input_registers(0, numRegs) | |
# Instantiate output as an empty list | |
output = [] | |
# Fill the output with the bytes in the appropriate order | |
try: | |
for i in range(0, numRegs): | |
output.append((response.getRegister(i) & 0xFF00) >> 8) | |
output.append( response.getRegister(i) & 0x00FF) | |
except: | |
print("response for self.client.read_input_registers(0, numRegs) is not as expected: ") | |
print(response) | |
# Output the result | |
return output | |
class RobotiqCModel: | |
def __init__(self): | |
#Initiate output message as an empty list | |
self.message = [] | |
def verifyCommand(self, command): | |
# Verify that each variable is in its correct range | |
command.rACT = max(0, command.rACT) | |
command.rACT = min(1, command.rACT) | |
command.rGTO = max(0, command.rGTO) | |
command.rGTO = min(1, command.rGTO) | |
command.rATR = max(0, command.rATR) | |
command.rATR = min(1, command.rATR) | |
command.rPR = max(0, command.rPR) | |
command.rPR = min(255, command.rPR) | |
command.rSP = max(0, command.rSP) | |
command.rSP = min(255, command.rSP) | |
command.rFR = max(0, command.rFR) | |
command.rFR = min(255, command.rFR) | |
# Return the cropped command | |
return command | |
def refreshCommand(self, command): | |
# Limit the value of each variable | |
command = self.verifyCommand(command) | |
# Initiate command as an empty list | |
self.message = [] | |
# Build the command with each output variable | |
# TODO: add verification that all variables are in their authorized range | |
self.message.append(command.rACT + (command.rGTO << 3) + (command.rATR << 4)) | |
self.message.append(0) | |
self.message.append(0) | |
self.message.append(command.rPR) | |
self.message.append(command.rSP) | |
self.message.append(command.rFR) | |
def sendCommand(self): | |
self.client.sendCommand(self.message) | |
def getStatus(self): | |
""" | |
Request the status from the gripper and return it in the CModelStatus msg type. | |
""" | |
#Acquire status from the Gripper | |
status = self.client.getStatus(6) | |
return status | |
# Message to report the gripper status | |
# message = CModelStatus() | |
# #Assign the values to their respective variables | |
# message.gACT = (status[0] >> 0) & 0x01; | |
# message.gGTO = (status[0] >> 3) & 0x01; | |
# message.gSTA = (status[0] >> 4) & 0x03; | |
# message.gOBJ = (status[0] >> 6) & 0x03; | |
# message.gFLT = status[2] | |
# message.gPR = status[3] | |
# message.gPO = status[4] | |
# message.gCU = status[5] | |
# return message | |
# ======== | |
def mainLoop(address, port=''): | |
# Gripper is a C-Model with a TCP connection | |
print("Starting up") | |
gripper = RobotiqCModel() | |
gripper.client = ComModbus() | |
print("Connecting to TCP address " + str(address) + " and port " + str(port)) | |
gripper.client.connectToDeviceTCP(address, port) | |
# print("Connecting to device " + str(address)) | |
# gripper.client.connectToDeviceSerial(address) | |
while True: | |
# Get and publish the Gripper status | |
print("Getting status") | |
status = gripper.getStatus() | |
print("Got status:") | |
print(status) | |
# Wait a little | |
time.sleep(1) | |
# Send the most recent command | |
gripper.sendCommand() | |
# Wait a little | |
time.sleep(1) | |
if __name__ == '__main__': | |
# Verify user gave a legal IP address | |
try: | |
ip = sys.argv[1] | |
socket.inet_aton(ip) | |
except socket.error: | |
print('[cmodel_tcp_driver] Please provide a valid IP address') | |
# Run the main loop | |
try: | |
if sys.argv[2]: | |
mainLoop(sys.argv[1], sys.argv[2]) | |
else: | |
mainLoop(sys.argv[1]) | |
except Exception as e: | |
print('Received exception:') | |
print(e) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment