Last active
August 20, 2023 22:23
-
-
Save ramunasd/a2e40cd13dd4cacf5685 to your computer and use it in GitHub Desktop.
ANT+ Garmin bike cadence and speed sensor reader
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Initialize a basic broadcast slave channel for listening to | |
an ANT+ HR monitor. | |
""" | |
import sys | |
import time | |
from ant.core import driver, node, event, message | |
from ant.core.constants import * | |
from config import * | |
def convertSB(raw): | |
value = int(raw[1]) << 8 | |
value += int(raw[0]) | |
return value | |
class CadenceListener(event.EventCallback): | |
lastEvent = None | |
def process(self, msg): | |
if isinstance(msg, message.ChannelBroadcastDataMessage): | |
page = msg.payload[1] & 0x7F | |
if page != 0: | |
return | |
eventTime = convertSB(msg.payload[5:7]) | |
if eventTime == self.lastEvent: | |
return | |
self.lastEvent = eventTime | |
revolutions = convertSB(msg.payload[7:9]) | |
print eventTime, ':', revolutions | |
NETKEY = '\xB9\xA5\x21\xFB\xBD\x72\xC3\x45' | |
# Initialize | |
stick = driver.USB1Driver(SERIAL, log=LOG, debug=DEBUG) | |
antnode = node.Node(stick) | |
antnode.start() | |
antnode.registerEventListener(CadenceListener()) | |
# Set network key | |
network = node.Network(key=NETKEY, name='N:ANT+') | |
antnode.setNetworkKey(0, network) | |
# Get the first unused channel. Returns an instance of the node.Channel class. | |
channel = antnode.getFreeChannel() | |
# Initialize it as a receiving channel using our network key | |
channel.assign(network, CHANNEL_TYPE_TWOWAY_RECEIVE) | |
# Now set the channel id for pairing with an ANT+ HR monitor | |
channel.setID(122, 0, 0) | |
# Listen forever and ever (not really, but for a long time) | |
channel.searchTimeout = TIMEOUT_NEVER | |
# We want a ~4.06 Hz transmission period | |
channel.period = 8102 | |
# And ANT frequency 57 | |
channel.frequency = 57 | |
try: | |
# Time to go live | |
channel.open() | |
print "Listening for events..." | |
while True: | |
time.sleep(60) | |
finally: | |
# Shutdown channel | |
channel.close() | |
channel.unassign() | |
# Shutdown | |
antnode.stop() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from ant.core import log | |
# USB1 ANT stick interface. Running `dmesg | tail -n 25` after plugging the | |
# stick on a USB port should tell you the exact interface. | |
SERIAL = '/dev/ttyUSB0' | |
# If set to True, the stick's driver will dump everything it reads/writes | |
# from/to the stick. | |
# Some demos depend on this setting being True, so unless you know what you | |
# are doing, leave it as is. | |
DEBUG = True | |
# Set to None to disable logging | |
LOG = None | |
#LOG = log.LogWriter() | |
# ========== DO NOT CHANGE ANYTHING BELOW THIS LINE ========== | |
#print "Using log file:", LOG.filename | |
#print "" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import sys | |
import time | |
from ant.core import driver, node, event, message | |
from ant.core.constants import * | |
from config import * | |
NETKEY = '\xB9\xA5\x21\xFB\xBD\x72\xC3\x45' | |
CIRCUMFERENCE = 2.0 | |
def convertSB(raw): | |
value = int(raw[1]) << 8 | |
value += int(raw[0]) | |
return value | |
class SpeedListener(event.EventCallback): | |
lastTime = None | |
lastRevolutions = None | |
def calcSpeed(self, time, revolutions): | |
if self.lastTime is None: | |
return None | |
if time < self.lastTime: | |
time += 64 * 1024 | |
if revolutions < self.lastRevolutions: | |
revolutions += 65535 | |
return (revolutions - self.lastRevolutions) * 1024.0 * CIRCUMFERENCE / (time - self.lastTime) | |
def process(self, msg): | |
if isinstance(msg, message.ChannelBroadcastDataMessage): | |
page = msg.payload[1] & 0x7F | |
if page != 0 and page != 5: | |
return | |
if page == 5: | |
stopped = (msg.payload[2] & 1 == 1) | |
if stopped: | |
print self.lastRevolutions, 0 | |
eventTime = convertSB(msg.payload[5:7]) | |
if eventTime == self.lastTime: | |
return | |
revolutions = convertSB(msg.payload[7:9]) | |
speed = self.calcSpeed(eventTime, revolutions) | |
print revolutions, speed | |
self.lastTime = eventTime | |
self.lastRevolutions = revolutions | |
# Initialize | |
stick = driver.USB1Driver(SERIAL, log=LOG, debug=DEBUG) | |
antnode = node.Node(stick) | |
antnode.start() | |
antnode.registerEventListener(SpeedListener()) | |
# Set network key | |
network = node.Network(key=NETKEY, name='N:ANT+') | |
antnode.setNetworkKey(0, network) | |
# Get the first unused channel. Returns an instance of the node.Channel class. | |
channel = antnode.getFreeChannel() | |
# Initialize it as a receiving channel using our network key | |
channel.assign(network, CHANNEL_TYPE_TWOWAY_RECEIVE) | |
# Now set the channel id for pairing with an ANT+ HR monitor | |
channel.setID(123, 0, 0) | |
# Listen forever and ever (not really, but for a long time) | |
channel.searchTimeout = TIMEOUT_NEVER | |
# We want a ~4.06 Hz transmission period | |
channel.period = 8118 | |
# And ANT frequency 57 | |
channel.frequency = 57 | |
try: | |
# Time to go live | |
channel.open() | |
print "Listening for events..." | |
while True: | |
time.sleep(60) | |
finally: | |
# Shutdown channel | |
channel.close() | |
channel.unassign() | |
# Shutdown | |
antnode.stop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hello. I wonder if speed.py should work for speed and cadence sensor with only corresponding setID and period?