Skip to content

Instantly share code, notes, and snippets.

@jeremygilly
Created March 28, 2020 12:57
Show Gist options
  • Save jeremygilly/a12e9f42673b91cb5a207a6e9566a939 to your computer and use it in GitHub Desktop.
Save jeremygilly/a12e9f42673b91cb5a207a6e9566a939 to your computer and use it in GitHub Desktop.
Raspberry Pi test code for James
''' Jeremy's test script - 28 Mar 2020.
It creates two threads and waits for both to finish before presenting the results.
Each thread generates random data, but data that is similar to what is expected from the sensor devices.
The results are added to the queue, removed, then presented to the user.
Not completed: writing to '''
import queue, threading, time, random, math, numpy, sys
assert(sys.version_info >= (3,2))
def commercial_pH(result_queue, connected = False):
if connected == True: # i.e. the pH meter is connected
delay = 0.8 * math.ceil(random.random()*3)# generate 0.8, 1.6, or 2.4 delay
time.sleep(delay)
result_queue.put(("commercial_pH", random.random()*10))
else: # if it's not connected, wait 0.8 seconds anyway
time.sleep(0.8)
result_queue.put(("commercial_pH", "N/A"))
return 0
def GaN_sensor(result_queue):
medians = numpy.random.rand(1,5)*1000
standard_deviations = numpy.random.rand(1,5)*10
time.sleep(random.random())
result_queue.put(("GaN", (medians, standard_deviations)))
return 0
def main():
q = queue.Queue()
connected_pH_meter = False # this is the only value that will be changed during operation.
i = 0
while i < 10: # Show this works more than once (queue overwrites shown not to be an issue due to clearing below).
commercial_pH_thread = threading.Thread(target=commercial_pH, args=(q, connected_pH_meter))
GaN_sensor_thread = threading.Thread(target=GaN_sensor, args=(q, ))
threads = [commercial_pH_thread, GaN_sensor_thread]
for thread in threads:
thread.daemon = True
thread.start() # start the threads running
for thread in threads:
thread.join() # wait until all the threads are completed before accessing results
# Sometimes the results come in a different order in the queue, so you can't depend on
# when the results arrive to be indicative of where the results came from.
# Therefore, get all results into a list.
results = []
while not q.empty():
results.append(q.get())
# Then search the all queued results for the name given in the functions above.
for element in results:
if 'GaN' in element[0]:
GaN_sensor_result = element[1]
elif 'commercial_pH' in element[0]:
commercial_pH_result = element[1]
else:
pass
print("Commercial pH result:", commercial_pH_result)
print("GaN result:", GaN_sensor_result)
# This is where a csv.writer would be used to record results.
i += 1
if __name__=='__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment