Skip to content

Instantly share code, notes, and snippets.

@petez69
Forked from hdo/adam6050.py
Last active October 23, 2021 11:53
Show Gist options
  • Save petez69/4fc35fe9095c87f16c330d1fe1e1c61d to your computer and use it in GitHub Desktop.
Save petez69/4fc35fe9095c87f16c330d1fe1e1c61d to your computer and use it in GitHub Desktop.
Emulate Advantech ADAM-6050 for Synology Surveillance Station
from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor
import time
import RPi.GPIO as GPIO
#import time
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
for x in range(2, 7):
GPIO.setup(x, GPIO.OUT)
class Echo(DatagramProtocol):
def datagramReceived(self, data, (host, port)):
# print "received %r from %s:%d" % (data, host, port)
# print time.ctime(),
# print "received %r from %s:%d" % (data, host, port)
# Login
if data.startswith('$01PW'):
self.transport.write(">01\r", (host, port))
return
# Request IO status
if data.startswith('$01C'):
self.transport.write("!01" + "000000000000" + "000000000000" + "000000000000" + "\r", (host, port))
return
# Request IO input status
if data.startswith('$016'):
seconds = int(time.time()) % 60
# print seconds
if seconds > 20 and seconds < 24:
# print "trigger!"
self.transport.write("!01"+"00"+"FFFF\r", (host, port))
else:
self.transport.write("!01"+"01"+"FFFF\r", (host, port))
return
# Request set IO output
if data.startswith('#011'):
# I/O 13 on Adam6050 will be received as channel 00
# I/O 14 on ADAM6050 will be received as channel 10
# I/O 15 on ADAM6050 will be received as channel 20 etc
# print time.ctime(),
# print "Channel[0x" + data[4:6]+ "] = " + data[6:7]
# Value of 2 starts the output on GPIO2, change to what you need.
channel = int(data[4:6],16)/16+2
status = int(data[6:7],16)
GPIO.output(channel,status)
# print channel
# print status
self.transport.write(">\r", (host, port))
return
# print "unknown command!"
# self.transport.write(data, (host, port))
#print GPIO.RPI_INFO ['P1_REVISION']
#print "starting ..."
reactor.listenUDP(1025, Echo())
reactor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment