Skip to content

Instantly share code, notes, and snippets.

@thinkier
Last active December 6, 2024 01:36
Show Gist options
  • Save thinkier/949664165370c7f02886d5e145651fd4 to your computer and use it in GitHub Desktop.
Save thinkier/949664165370c7f02886d5e145651fd4 to your computer and use it in GitHub Desktop.
Self-watering planter box w/ Bluetooth LE monitoring
import machine
from micropython import const
import struct
from machine import Pin, ADC, WDT
import asyncio
import aioble
import bluetooth
import time
# org.bluetooth.service.environmental_sensing
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
# org.bluetooth.characteristic.humidity
_ENV_SENSE_HUM_UUID = bluetooth.UUID(0x2A6F)
# org.bluetooth.characteristic.gap.appearance.xml
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)
_PUMP_UUID = bluetooth.UUID(0x183B)
_PUMP_PUMPING_UUID = bluetooth.UUID(0x2B2C)
# How frequently to send advertising beacons.
_ADV_INTERVAL_MS = 250_000
# How often to reboot the device.
_REBOOT_INTERVAL_MS = 21600_000 # 6 hrs
_MAX_PUMPING_TIME = 300_000 # 5 mins
# Plant_io compatible pins
# MOSFET-protected pump control
pump = Pin(12, Pin.OUT)
pump.value(0)
# Moisture Sensor Input
adc2 = ADC(Pin(28))
# May change these numbers from pot to pot
hygrometer_exposed_len = 3
hygrometer_max_depth = 12
# Converted ADC calibration values from arduino 10-bit to upy 16-bit
hygrometer_calibration_air = 880 << 6
hygrometer_calibration_wet = hygrometer_calibration_air * hygrometer_exposed_len / hygrometer_max_depth
# Volumetric Water Content (VWC) % for triggering the pump
vwc_low = 50
vwc_high = 75
# Register GATT server.
hum_service = aioble.Service(_ENV_SENSE_UUID)
hum_characteristic = aioble.Characteristic(
hum_service, _ENV_SENSE_HUM_UUID, read=True, notify=True
)
bss_service = aioble.Service(_PUMP_UUID)
bss_resp_characteristic = aioble.Characteristic(
bss_service, _PUMP_PUMPING_UUID, read=True, notify=True
)
aioble.register_services(hum_service, bss_service)
wdt = WDT(timeout=2000)
# Helper to encode the humidity characteristic encoding (uint16, hundredths of a percent).
def _encode_humidity(hum_pct):
return struct.pack("<h", int(hum_pct * 100))
def _encode_pump_state(pump_state):
return struct.pack("<B", pump_state)
# This would be periodically polling a hardware sensor.
async def sensor_task():
pumping = None
pumping_start = None
while True:
await asyncio.sleep_ms(1000)
h = read_moisture_content(adc2)
hum_characteristic.write(_encode_humidity(h), send_update=True)
if h < vwc_low:
pumping = True
pumping_start = time.ticks_ms()
elif h > vwc_high:
pumping = False
if pumping_start is not None and time.ticks_diff(time.ticks_ms(), pumping_start) > _MAX_PUMPING_TIME:
pump.value(0)
pumping = None
pumping_start = None
if pumping is not None:
bss_resp_characteristic.write(_encode_pump_state(pumping), send_update=True)
pump.value(pumping)
pumping = None
pumping_start = None
wdt.feed()
# Serially wait for connections. Don't advertise while a central is
# connected.
async def peripheral_task():
while True:
async with await aioble.advertise(
_ADV_INTERVAL_MS,
name="Planter Box",
services=[_ENV_SENSE_UUID, _PUMP_UUID],
appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER,
) as connection:
print("Connection from", connection.device)
await connection.disconnected(timeout_ms=None)
async def reboot_task():
await asyncio.sleep_ms(_REBOOT_INTERVAL_MS)
machine.reset()
# Run both tasks.
async def main():
t1 = asyncio.create_task(sensor_task())
t2 = asyncio.create_task(peripheral_task())
t3 = asyncio.create_task(reboot_task())
await asyncio.gather(t1, t2, t3)
def read_moisture_content(pin):
reading = min(max(pin.read_u16(), hygrometer_calibration_wet), hygrometer_calibration_air) - hygrometer_calibration_wet
return 100 - (reading * 100 / (hygrometer_calibration_air - hygrometer_calibration_wet))
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment