Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save SamueldaCostaAraujoNunes/c0afeb11531f75861d3548089196b035 to your computer and use it in GitHub Desktop.
Save SamueldaCostaAraujoNunes/c0afeb11531f75861d3548089196b035 to your computer and use it in GitHub Desktop.
import socket
import time
import json
SATURN_UDP_PORT = 3000
MQTT_PORT = 1883
TIMEOUT = 2
class SaturnPrinter:
def __init__(self, addr, data):
self.addr = addr
self.data = data
def find_printers():
broadcast = '<broadcast>'
printers = []
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
with sock:
sock.settimeout(TIMEOUT)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, TIMEOUT)
sock.sendto(b'M99999', (broadcast, SATURN_UDP_PORT))
now = time.time()
while True:
if time.time() - now > TIMEOUT:
break
try:
data, addr = sock.recvfrom(1024)
except socket.timeout:
continue
else:
pdata = json.loads(data.decode('utf-8'))
printers.append(SaturnPrinter(addr, pdata))
return printers
def connect_mqtt(self):
broadcast = self.data["Data"]["Attributes"]["MainboardIP"]
print(broadcast)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
with sock:
sock.settimeout(TIMEOUT)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, TIMEOUT)
start_mqtt = f"M66666 {MQTT_PORT}"
packet = start_mqtt.encode('utf-8')
log.info(sock.sendto(packet, (broadcast, SATURN_UDP_PORT)))
@service
def resin_printer():
log.info("Procurando impressoras de resina elegoo:")
printers = SaturnPrinter.find_printers()
for printer in printers:
log.info(printer)
printer.connect_mqtt()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment