Skip to content

Instantly share code, notes, and snippets.

@ConnorRigby
Created August 16, 2019 17:08
Show Gist options
  • Save ConnorRigby/903373ca3880bba38090f5b936edcdf0 to your computer and use it in GitHub Desktop.
Save ConnorRigby/903373ca3880bba38090f5b936edcdf0 to your computer and use it in GitHub Desktop.
from .env import Env
from .auxiliary import Color
from threading import Thread
import time
import sys
import struct
import json
import uuid
HEADER_FORMAT = '>HII'
ENV = Env()
COLOR = Color()
class FarmbotConnection(Thread):
def __init__(self):
Thread.__init__(self)
if not ENV.farmware_api_available():
return
self.running = False
self.buffer = []
print(COLOR.error(f'opening request pipe {ENV.request_pipe}'), file=sys.stderr)
self.requestPipe = open(ENV.request_pipe, 'wb')
print(COLOR.error(f'opening response pipe {ENV.response_pipe}'), file=sys.stderr)
self.responsePipe = open(ENV.response_pipe, 'rb')
def __del__(self):
print(COLOR.error(f'Closing pipes'), file=sys.stderr)
self.requestPipe.close()
self.requestPipe = None
self.responsePipe.close()
self.responsePipe = None
def start(self):
print(COLOR.error("Starting thread"), file=sys.stderr)
self.running = True
Thread.start(self)
def stop(self):
print(COLOR.error("Stopping thread"), file=sys.stderr)
self.running = False
Thread.join(self)
def request(self, payload):
self.buffer.append(payload)
def run(self):
while True:
if not self.running:
break
nextPayload = None
if not self.buffer:
nextPayload = {
'kind': 'rpc_request',
'args': {'label': str(uuid.uuid4())},
'body': [{'kind': 'nothing', 'args': {}}]}
else:
nextPayload = self.buffer.pop(0)
if nextPayload:
message_bytes = bytes(json.dumps(nextPayload), 'utf-8')
header = struct.pack(HEADER_FORMAT, 0xFBFB, 0, len(message_bytes))
self.requestPipe.write(header + message_bytes)
self.requestPipe.flush()
print(COLOR.error(f'reading 10 bytes'), file=sys.stderr)
header = self.responsePipe.read(10)
if header == b'':
print(COLOR.error('No response from FBOS'), file=sys.stderr)
(_, _, size) = struct.unpack(HEADER_FORMAT, header)
print(COLOR.error(f'read {len(header)} bytes'), file=sys.stderr)
json.loads(self.responsePipe.read(size).decode())
print(COLOR.error("inside thread"), file=sys.stderr)
time.sleep(1)
#!/usr/bin/env python
from farmware_tools.connection import FarmbotConnection
from farmware_tools.device import log
import time
farmbot = FarmbotConnection()
farmbot.start()
payload = log("hello world")
farmbot.request(payload)
time.sleep(10)
farmbot.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment