Created
January 1, 2021 05:04
-
-
Save danricho/d3f406ff48f7cd428eafcf169f6bc3a4 to your computer and use it in GitHub Desktop.
Arduino Python Comms
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This is a simple communication scheme between python and Arduino over Serial. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
int TO_RPI = 4; | |
int FR_RPI = 5; | |
// NON BLOCKING SERIAL! | |
int millisSince(unsigned long start){ | |
return (millis()-start); | |
} | |
void waitUntil(int millis, unsigned long start){ | |
delay(millis - millisSince(start)); | |
} | |
int headerFromMillis(int millis){ | |
if ((millis > 85) || (millis < 15)){ return 0; } | |
else if ((millis > 15) && (millis < 25)) { return 1; } | |
else if ((millis > 35) && (millis < 45)) { return 2; } | |
else if ((millis > 55) && (millis < 65)) { return 3; } | |
else if ((millis > 75) && (millis < 85)) { return 4; } | |
} | |
void writeBit(bool state){ | |
digitalWrite(4, LOW); | |
if (state){ delay(10); } else { delay(5); } | |
digitalWrite(4, HIGH); | |
if (state){ delay(5); } else { delay(10); } | |
} | |
int message = 1; | |
long centiVolts; | |
byte encodedVolts; | |
void setup() { | |
pinMode(LED_BUILTIN, OUTPUT); | |
pinMode(TO_RPI, OUTPUT); | |
pinMode(FR_RPI, INPUT); | |
} | |
void loop() { | |
unsigned long loop_start = millis(); | |
digitalWrite(LED_BUILTIN, HIGH); | |
digitalWrite(TO_RPI, LOW); | |
delay(20 * (message)); | |
digitalWrite(TO_RPI, HIGH); | |
if (message == 1){ | |
delay(20); // make sure RPi is ready | |
// STANDIN - will be a analogue read | |
centiVolts = random(0, 1000); | |
encodedVolts = map(centiVolts,0,1000,0,255); | |
// Serial.print("Data binary (big-endian): "); | |
for (byte mask = 00000001; mask>0; mask <<= 1){ | |
if (encodedVolts & mask) { | |
// Serial.print("1"); | |
writeBit(1); | |
} else { | |
// Serial.print("0"); | |
writeBit(0); | |
} | |
} | |
} | |
Serial.print("Sent "); | |
Serial.print(message); | |
Serial.print(", Pulse Length: "); | |
Serial.print(20 * (message)); | |
if (message == 1){ | |
Serial.print(", Volts: "); | |
Serial.print(float(centiVolts)/100.0); | |
// Serial.print(", Data: "); | |
// Serial.print(encodedVolts); | |
} | |
Serial.println(""); | |
digitalWrite(LED_BUILTIN, LOW); | |
waitUntil(950, loop_start); | |
int previous_state = digitalRead(FR_RPI); | |
int this_state; | |
unsigned long fall_start = 0; | |
unsigned long pulseLength = 0; | |
int header = 0; | |
while (millisSince(loop_start) < 1950){ | |
this_state = digitalRead(FR_RPI); | |
if (fall_start == 0){ | |
if (previous_state) { | |
if (!this_state) { // FALLING EDGE | |
fall_start = millis(); | |
} | |
} | |
} | |
else { | |
if (this_state) { // RISING EDGE | |
pulseLength = millisSince(fall_start); | |
header = headerFromMillis(pulseLength); | |
break; | |
} | |
} | |
} | |
Serial.print("Got "); | |
Serial.print(header); | |
Serial.print(", Pulse Length: "); | |
Serial.print(pulseLength); | |
Serial.println(""); | |
Serial.println(""); | |
message = (message % 4) + 1; | |
waitUntil(2000, loop_start); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import RPi.GPIO as GPIO | |
from datetime import datetime | |
import random | |
import time | |
import struct | |
TO_RPI = 4 | |
FR_RPI = 5 | |
def myround(x, base=5): | |
return base * round(x/base) | |
def delay(millis): | |
time.sleep(max(0,millis/1000)) | |
def millisSince(start): | |
return round((datetime.now() - start).microseconds / 100) / 10 | |
def waitUntil(millis, start): | |
delay(millis - millisSince(start)); | |
def headerFromMillis(millis): | |
if millis > 85 or millis < 15: | |
return None # NOT A MESSAGE TYPE HEADER - PROBABLY END OF DATA | |
else: | |
return round(myround(millis, 20) / 20) | |
def getBit(pin): | |
GPIO.wait_for_edge(pin, GPIO.FALLING, timeout=1000) | |
rise = datetime.now() | |
GPIO.wait_for_edge(pin, GPIO.RISING, timeout=1000) | |
return round((datetime.now() - rise).microseconds / 100) / 10 > 7.5 | |
GPIO.setmode(GPIO.BCM) | |
GPIO.setup(TO_RPI, GPIO.IN, pull_up_down=GPIO.PUD_UP) | |
GPIO.setup(FR_RPI, GPIO.OUT) | |
try: | |
while(1): | |
GPIO.wait_for_edge(TO_RPI, GPIO.FALLING, timeout=1000) | |
loop_start = datetime.now() # UPS SEND SYNC. | |
GPIO.wait_for_edge(TO_RPI, GPIO.RISING, timeout=1000) | |
pulseLength = millisSince(loop_start) | |
header = headerFromMillis(pulseLength) | |
if header: | |
voltage = None | |
if header == 1: | |
number = 0 | |
for i in range(0,8): | |
number += (getBit(4) << i) | |
# print("Data binary (little-endian): {:08b}".format(number)) | |
voltage = round(number / 255.0 * 1000) / 100.0 | |
print("Got " + str(header) + ", Pulse Length: " + str(pulseLength) + ", Volts: {:.2f}".format(voltage)); | |
else: | |
print("Got " + str(header) + ", Pulse Length: " + str(pulseLength)); | |
waitUntil(1000, loop_start); | |
GPIO.output(FR_RPI, GPIO.LOW) | |
time.sleep(((header * 20))/1000) | |
GPIO.output(FR_RPI, GPIO.HIGH) | |
print("Sent " + str(header) + ", Pulse Length: " + str(pulseLength)); | |
print("") | |
except KeyboardInterrupt: | |
None | |
GPIO.cleanup() # clean up GPIO on normal exit |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment