Last active
August 8, 2018 22:05
-
-
Save nro-bot/2a6a733d8facd23115a4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
void rxCallback(uint8_t *buffer, uint8_t len) | |
{ | |
// other code as per Adafruit | |
if ((byte)buffer[0] == (byte)10){ //color cmd received | |
setColor(buffer[1],buffer[2], buffer[3]); | |
} | |
else if ((byte)buffer[0] == (byte)20){ //string cmd received | |
Lservo.write(buffer[1]); | |
Rservo.write(map(buffer[2],0,180,180,0)); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
July 5, 2014 Nancy Ouyang | |
Usage: | |
$ sudo python blescan.py | |
Copyright: | |
public domain | |
Automation of hcitool lescan to return MAC addresses | |
Requires sudo privileges | |
This operates with hcitool v5.20, tested on Ubuntu 14.04 | |
Hardware: Requires a laptop with 4.0 dongle or with built-in bluetooth low energy capabilities | |
Note: You can check if your dongle is recognized by using hciconfig at a terminal prompt. | |
@params None | |
@returns ble_adrs - A set of MAC addresses of BLE peripherals | |
''' | |
import pexpect, time, sys, signal, os | |
from ctypes.util import find_library | |
SCAN_TIME = 1 #in seconds | |
if not os.geteuid() == 0: | |
sys.exit("hictool lescan only works as root") | |
btlib = find_library("bluetooth") | |
if not btlib: | |
raise Exception( | |
"Can't find required bluetooth libraries" | |
) | |
def closeall(connection): | |
isalive = connection.terminate(force=True) | |
#print 'process was killed: ', isalive | |
#hciout.kill(signal.SIGTERM) | |
connection.close(force = True) | |
def gatheradr(conn): | |
start_time = time.time() | |
print 'Scanning for addresses...' | |
ble_adrs = set() | |
try: | |
for line in conn: | |
address = line.strip().split(' ')[0] | |
#print address | |
if address != '': | |
ble_adrs.add(address) | |
elapsed_time = time.time() - start_time | |
if elapsed_time > SCAN_TIME: #in seconds | |
closeall(conn) | |
break | |
except (pexpect.TIMEOUT): | |
print 'No BLE addresses found! Are peripherals on and reset?' | |
closeall(conn) | |
return ble_adrs | |
def blescan(): | |
ble_adrs = set() | |
# seems like we cannot catch the timeout here, must be inside gatheradr? | |
# pexpect.timeout thrown if no inputs, aka no addresses, are being found with hcitool lescan | |
hciout = pexpect.spawn('hcitool lescan', timeout=1) | |
i = hciout.expect(['LE Scan ...','File descriptor in bad state','failed', pexpect.TIMEOUT], timeout=1) | |
if i == 0: | |
ble_adrs = gatheradr(hciout) | |
if i == 1: | |
c = True | |
while c: #code to check if input acceptable | |
inp = raw_input('Check if dongle is plugged in. Type "y" to continue, or type "n" to cancel.') | |
if inp.lower().startswith('y'): | |
hciout.terminate() | |
hciout.close() | |
print 'Waiting a second for dongle to initialize' | |
time.sleep(1) | |
hciout = pexpect.spawn('hcitool lescan', timeout=1) | |
j = hciout.expect(['LE Scan ...',pexpect.TIMEOUT], timeout=1) | |
if j == 0: | |
ble_adrs = gatheradr(hciout) | |
break | |
if j == 1: | |
'Dongle not plugged in.' | |
closeall(hciout) | |
break | |
elif inp.lower().startswith('n'): | |
closeall(hciout) | |
break | |
else: | |
print 'Did not understand command. Try again.' | |
if i == 2: | |
print 'Is dongle plugged in? Could not scan: ', hciout.before, hciout.after | |
closeall(hciout) | |
if i == 3: | |
print 'Is dongle pluggeded in? Could not scan: ', hciout.before, hciout.after | |
closeall(hciout) | |
return ble_adrs | |
if __name__ == "__main__": | |
ble_adrs = blescan() | |
if ble_adrs: | |
print 'Addresses found: ', ble_adrs | |
else: | |
print 'Empty set. No addresses found' | |
#self.handle = 'b' #!! this is the TX service on the nRF8001 adafruit breakout with callbackEcho sketch | |
#= int(sys.argv[1]) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# with help from https://github.com/msaunby/ble-sensor-pi/blob/master/sensortag/sensortag.py | |
# Michael Saunby. April 2013 | |
import os, sys | |
from ctypes.util import find_library | |
import pexpect, traceback, threading, Queue, time, socket, select | |
import constants, blescan | |
if not os.geteuid() == 0: | |
sys.exit("script only works as root") | |
btlib = find_library("bluetooth") | |
if not btlib: | |
raise Exception( | |
"Can't find required bluetooth libraries" | |
) | |
class bleBot: | |
def __init__( self, ble_adr ): | |
self.ble_adr = ble_adr | |
self.con = pexpect.spawn('gatttool -b ' + self.ble_adr + ' -I -t random') | |
self.con.delaybeforesend = 0 #THIS LINE IS SUPER IMPORTANT | |
self.con.expect('\[LE\]', timeout=1) | |
self.handle = 'b' #!! this is the TX service on the nRF8001 adafruit breakout with callbackEcho sketch | |
def connect( self ): | |
# OH HEXAPOD this is so sketchy, it will break if bluez gatttool changes at all. I have version 5.20 | |
print "Preparing to connect. Address: " + self.ble_adr | |
self.con.sendline('connect') | |
try: | |
self.con.read_nonblocking(2048,0) #flush the read pipe!! SUPER IMPORTANT | |
except: | |
pass | |
i = self.con.expect(['Attempting', 'Error'], timeout=1) | |
if i == 0: | |
#print 'Attempting to connect' | |
j = self.con.expect(['Connection successful', 'No route', 'busy', pexpect.TIMEOUT], timeout = 1) | |
if j == 0: | |
print self.ble_adr, ': connected!' | |
if j == 1: | |
print self.ble_adr, ': No route to host, is USB dongle plugged in?' | |
self.cleanup() | |
if j == 2: | |
print self.ble_adr, ': Device busy, is something else already connected to it?' | |
c = True | |
while c: | |
inp = raw_input('Try hitting reset. Type "y" to continue or "n" to quit.') | |
if inp.lower().startswith('y'): | |
self.con.sendline('connect') | |
try: | |
self.con.read_nonblocking(2048,0) #flush the read pipe!! SUPER IMPORTANT | |
except: | |
pass | |
k = self.con.expect(['Connection successful', pexpect.TIMEOUT], timeout = 1) | |
print 'k: ', k | |
if k == 0: | |
print self.ble_adr, ': connected!' | |
if k == 1: | |
print self.ble_adr, ': Could not connect' | |
self.cleanup() | |
break | |
elif inp.lower().startswith('n'): | |
self.cleanup() | |
break | |
else: | |
print 'Did not understand command. Try again.' | |
if j == 3: | |
print 'Attempting to connect, is device on and in range? ' | |
#foostr = raw_input('Type anything to continue, or enter to cancel') | |
self.con.sendline('connect') | |
try: | |
self.con.read_nonblocking(2048,0) #flush the read pipe!! SUPER IMPORTANT | |
except: | |
pass | |
k = self.con.expect(['Connection successful', pexpect.TIMEOUT], timeout = 3) | |
print 'k: ', k | |
if k == 0: | |
print self.ble_adr, ': connected!' | |
if k == 1: | |
print self.ble_adr, ': Could not connect' | |
self.cleanup() | |
if i == 1: | |
print 'Is USB dongle plugged in?' | |
self.cleanup() | |
return self | |
def char_write_cmd( self, value ): | |
cmd = 'char-write-cmd 0x%s %s' % (self.handle, value) | |
#print self.ble_adr, cmd | |
self.con.sendline( cmd ) | |
try: | |
print self.con.read_nonblocking(2048,0) #flush the read pipe!! SUPER IMPORTANT | |
except: | |
pass | |
#print 'After sending command, before: ', self.con.before, 'after :', self.con.after | |
return | |
def cleanup( self ): | |
print self.ble_adr, ': attempting to disconnect' | |
try: | |
self.con.sendline('disconnect') | |
self.con.sendline('exit') | |
try: | |
self.con.read_nonblocking(2048,0) #flush the read pipe!! SUPER IMPORTANT | |
except: | |
pass | |
isalive = self.con.terminate(force=True) | |
print self.ble_adr, ': has been terminated? ', isalive | |
self.con.close(force=True) | |
#print self.ble_adr, 'is alive: ', self.con.isalive() | |
except OSError: | |
print self.ble_adr, ': OSError' | |
pass | |
return | |
def tupToHex( foolist ): | |
hexed = '' | |
for i in foolist : | |
hexed += format(i, '02x') | |
#print hexed | |
return hexed | |
#def worker( address, commands ): | |
def worker( cmdQueue, connection): | |
while True: | |
cmd = cmdQueue.get() | |
#print 'queue items left: ', cmdQueue.qsize() | |
print connection.ble_adr, 'rcvd from queue:', cmd | |
#print connection.ble_adr, ': queue size: ', cmdQueue.qsize() | |
if cmd is None: | |
print connection.ble_adr, ': attempting to cleanup' | |
connection.cleanup() | |
return | |
else: | |
connection.char_write_cmd(tupToHex(cmd)) | |
#def ports_init(portnum): | |
def sendRobotCmds(cmd, queues): | |
if cmd[1] == constants.PREFIX_COLOR: | |
if len(cmd) == constants.LENGTH_CMD_C: | |
botID = cmd[0] | |
queues[botID].put(cmd[1:]) | |
############## | |
# for rgb music, because the packets tend to squish together | |
# such hack much disgust | |
else: | |
if len(cmd) == 2*constants.LENGTH_CMD_C: | |
cmd1 = cmd[:constants.LENGTH_CMD_C] | |
botID = cmd1[0] | |
queues[botID].put(cmd1[1:]) | |
cmd2 = cmd[constants.LENGTH_CMD_C:] | |
botID = cmd2[0] | |
queues[botID].put(cmd2[1:]) | |
#if len(cmd) == 2*constants.LENGTH_CMD_C-1: | |
#clean = cmd[:constants.LENGTH_CMD_C] | |
#clean[constants.LENGTH_CMD_C-1] = int(str(clean[constants.LENGTH_CMD_C-1])[:-1]) #strip final char from int | |
#botID = clean[0] | |
#queues[botID].put(clean[1:]) | |
#clean = cmd[constants.LENGTH_CMD_C-1:] | |
#clean[0] = int(str(clean[0])[2:]) #strip first two char from int | |
#botID = clean[0] | |
#queues[botID].put(clean[1:]) | |
else: | |
print 'invalid color command received: ', cmd | |
elif cmd[1] == constants.PREFIX_SERVO: | |
if len(cmd) == constants.LENGTH_CMD_S: | |
botID = cmd[0] | |
queues[botID].put(cmd[1:]) | |
else: | |
if len(cmd) == 2*constants.LENGTH_CMD_S: | |
cmd1 = cmd[:constants.LENGTH_CMD_S] | |
botID = cmd1[0] | |
queues[botID].put(cmd1[1:]) | |
cmd2 = cmd[constants.LENGTH_CMD_S:] | |
botID = cmd2[0] | |
queues[botID].put(cmd2[1:]) | |
else: | |
print 'invalid servo command received: ', cmd | |
else: | |
print 'invalid command type (servo or color) received: ', cmd, 'expected: ', constants.CMD_FORMAT | |
def main(): | |
print 'Server running, ready to scan for BTLE peripherals.' | |
connections = [] | |
addresses = blescan.blescan() | |
while len(addresses) < constants.NUM_ROBOTS: | |
c = True | |
while c: | |
inp = raw_input('Expected ' + str(constants.NUM_ROBOTS) + ', found ' + str(len(addresses)) + ' robots. Try hitting reset on the robots. Type "y" to continue or "n" to quit.') | |
if inp.lower().startswith('y'): | |
addresses = blescan.blescan() | |
break | |
elif inp.lower().startswith('n'): | |
sys.exit('User cancelled program when told not enough robots found.') | |
break | |
else: | |
print 'Did not understand command. Try again.' | |
if len(addresses) > constants.NUM_ROBOTS: | |
culled_adr = [] | |
c = True | |
while c: | |
addresses = list(addresses) | |
print addresses | |
inp = raw_input('Expected ' + str(constants.NUM_ROBOTS) + ', found ' + str(len(addresses)) + ' robots. \ | |
Enter numbers of the robots you want, separated by commas. e.g. "0,4"') | |
indices = inp.split(',') | |
if inp.lower().startswith('n'): | |
sys.exit('User cancelled program when told too many robots found.') | |
break | |
for i in indices: | |
culled_adr = addresses[int(i)] | |
break | |
#else: | |
#print 'Did not understand command. Try again.' | |
print 'culled', culled_adr | |
addresses = set() | |
addresses.add(culled_adr) | |
for address in addresses: | |
b = bleBot(address) | |
# first connect them all because that takes the longest | |
connection = b.connect() | |
connections.append(connection) | |
queues = [] | |
threads = [] | |
for i in range(len(addresses)): | |
q = Queue.Queue() | |
queues.append(q) | |
cmdQueue = queues[i] | |
connection = connections[i] | |
#t = threading.Thread(target=worker, args=(connection, cmdList)) | |
t = threading.Thread(target=worker, args=(cmdQueue, connection)) | |
t.daemon = True | |
threads.append(t) | |
for t in threads: | |
t.start() | |
#ports_init(int(sys.argv[1])) | |
HOST = '' # Symbolic name meaning all available interfaces | |
PORT = constants.PORT_BTLE # Arbitrary non-privileged port | |
print 'expecting port: ', PORT | |
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
server_socket.bind((HOST, PORT)) | |
server_socket.listen(constants.MAX_BTLE_CONNECTIONS) | |
sockets = [] | |
sockets.append(server_socket) | |
print 'Server ready to accept commands to pass over BTLE to peripherals' | |
while 1: | |
try: | |
read_sockets,write_sockets,error_sockets = select.select(sockets,[],[]) | |
for sock in read_sockets: | |
if sock == server_socket: | |
try: | |
conn, addr = server_socket.accept() | |
sockets.append(conn) | |
print "Client (%s, %s) connect" % addr | |
except (KeyboardInterrupt, ValueError, socket.error) as inst: | |
print "Caught exception: ", type(inst), "closing ble connections" | |
for connection in connections: | |
connection.cleanup() | |
for conn in sockets: | |
conn.shutdown(socket.SHUT_RDWR) | |
conn.close() | |
sys.exit(0) | |
print 'port: ', PORT, '|| Connected by', addr | |
else: | |
try: | |
data = sock.recv(2048) | |
#print 'received data', data | |
strRGB = data | |
############## | |
cmd = [int(s) for s in filter(None, strRGB.rstrip().split(','))] #for python | |
sendRobotCmds(cmd, queues) | |
except Exception as e: | |
print 'Exception', e | |
print "Client (%s, %s) is offline" % addr, 'removing from list of sockets: ', sockets | |
sock.close() | |
sockets.remove(sock) | |
continue | |
except (KeyboardInterrupt) as inst: | |
print type(inst) | |
print 'closing due to error' | |
print 'number of robots connected: ', len(connections) | |
print 'number of sockets connect: ', len(sockets) | |
for q in queues: | |
q.put(constants.KILL_CMD_C) | |
q.put(constants.KILL_CMD_S) | |
q.put(None) | |
for t in threads: | |
t.join() #timeout required so that main thread also receives KeyboardInterrupt | |
print 'threads closed' | |
for conn in sockets: | |
conn.shutdown(socket.SHUT_RDWR) | |
conn.close() | |
print 'serial connection closed: ', conn | |
sys.exit() | |
print 'closing normally' | |
server_socket.close() | |
if __name__ == "__main__": | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
NUM_ROBOTS = 2 | |
PORT_BTLE = 5207 | |
PORT_SWISS = 3000 | |
HOST_SWISS = 'localhost' | |
HOST_BTLE = '127.0.0.1' | |
#Command format: botID, cmdType=10, rval, gval, bval | |
CMD_FORMAT ='botID, cmdType=10 or 20, lservo, rservo' | |
PREFIX_COLOR = 10 | |
PREFIX_SERVO = 20 | |
LENGTH_CMD_C = 3+2 #1 for botid, 1 for cmdType | |
LENGTH_CMD_S = 2+2 | |
KILL_CMD_C = [PREFIX_COLOR] + [0,0,0] | |
KILL_CMD_S = [PREFIX_SERVO] + [90,90] | |
MAX_BTLE_CONNECTIONS = 2 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import processing.net.*; | |
Client myClient; | |
String COLOR_TYPE = "10"; | |
void setup() { | |
myClient = new Client(this, "127.0.0.1", 5207); | |
} | |
void draw() { | |
rvals = "0," + COLOR_TYPE + "," + r + "," + g + "," + b + ","; | |
gvals = "1," + COLOR_TYPE + "," + r + "," + g + "," + b + ","; | |
bvals = "2," + COLOR_TYPE + "," + r + "," + g + "," + b + ","; | |
if (dropTime == dropMax){ | |
myClient.write(rvals); | |
myClient.write(gvals); | |
//myClient.write(bvals); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## http://stackoverflow.com/questions/24564587/communicate-between-a-processing-sketch-and-a-python-program/24565160?noredirect=1#comment38049611_24565160 | |
import time, sys, serial, socket | |
import constants | |
self.sock.send(message) | |
self.host = constants.HOST_BTLE | |
self.port = constants.PORT_BTLE | |
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
self.NUM_ROBOTS = constants.NUM_ROBOTS | |
COLOR_TYPE = 20 | |
sock = socket.socket() | |
sock.connect((self.host,self.port)) | |
def step(): | |
message = "" | |
for i in range(0, self.NUM_ROBOTS): | |
message = "0,20,90,90" | |
self.sock.send(message) | |
def main(): | |
while 1: | |
try: | |
s.step() | |
time.sleep(0.02) | |
except (socket.error, KeyboardInterrupt): | |
s.closeSocket() | |
sys.exit() | |
if __name__ == '__main__': | |
main() |
@TiernanKennedy Sorry I did not see this until now -- Absolutely, I successfully used it for both android and iOS <-> arduino. I'll try to republish as a github repo shortly, but you should read http://www.orangenarwhals.com/2014/07/python-adafruit-nrf8001-bluetooth-low-energy-breakout-in-20-minutes-ubuntu-14-04/ (I never thought people would hit the gist directly).
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@nouyang could this code be used to transfer data from a mobile device, i.e. iOS, Android to Pi?