Created
March 28, 2020 12:57
-
-
Save jeremygilly/a12e9f42673b91cb5a207a6e9566a939 to your computer and use it in GitHub Desktop.
Raspberry Pi test code for James
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' Jeremy's test script - 28 Mar 2020. | |
It creates two threads and waits for both to finish before presenting the results. | |
Each thread generates random data, but data that is similar to what is expected from the sensor devices. | |
The results are added to the queue, removed, then presented to the user. | |
Not completed: writing to ''' | |
import queue, threading, time, random, math, numpy, sys | |
assert(sys.version_info >= (3,2)) | |
def commercial_pH(result_queue, connected = False): | |
if connected == True: # i.e. the pH meter is connected | |
delay = 0.8 * math.ceil(random.random()*3)# generate 0.8, 1.6, or 2.4 delay | |
time.sleep(delay) | |
result_queue.put(("commercial_pH", random.random()*10)) | |
else: # if it's not connected, wait 0.8 seconds anyway | |
time.sleep(0.8) | |
result_queue.put(("commercial_pH", "N/A")) | |
return 0 | |
def GaN_sensor(result_queue): | |
medians = numpy.random.rand(1,5)*1000 | |
standard_deviations = numpy.random.rand(1,5)*10 | |
time.sleep(random.random()) | |
result_queue.put(("GaN", (medians, standard_deviations))) | |
return 0 | |
def main(): | |
q = queue.Queue() | |
connected_pH_meter = False # this is the only value that will be changed during operation. | |
i = 0 | |
while i < 10: # Show this works more than once (queue overwrites shown not to be an issue due to clearing below). | |
commercial_pH_thread = threading.Thread(target=commercial_pH, args=(q, connected_pH_meter)) | |
GaN_sensor_thread = threading.Thread(target=GaN_sensor, args=(q, )) | |
threads = [commercial_pH_thread, GaN_sensor_thread] | |
for thread in threads: | |
thread.daemon = True | |
thread.start() # start the threads running | |
for thread in threads: | |
thread.join() # wait until all the threads are completed before accessing results | |
# Sometimes the results come in a different order in the queue, so you can't depend on | |
# when the results arrive to be indicative of where the results came from. | |
# Therefore, get all results into a list. | |
results = [] | |
while not q.empty(): | |
results.append(q.get()) | |
# Then search the all queued results for the name given in the functions above. | |
for element in results: | |
if 'GaN' in element[0]: | |
GaN_sensor_result = element[1] | |
elif 'commercial_pH' in element[0]: | |
commercial_pH_result = element[1] | |
else: | |
pass | |
print("Commercial pH result:", commercial_pH_result) | |
print("GaN result:", GaN_sensor_result) | |
# This is where a csv.writer would be used to record results. | |
i += 1 | |
if __name__=='__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment