Skip to content

Instantly share code, notes, and snippets.

@tshirtman
Created August 20, 2012 18:12
Show Gist options
  • Select an option

  • Save tshirtman/3406285 to your computer and use it in GitHub Desktop.

Select an option

Save tshirtman/3406285 to your computer and use it in GitHub Desktop.
from kivy.app import App
from jnius.reflect import autoclass
from jnius.jnius import cast
from kivy.uix.button import Button
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.gridlayout import GridLayout
#from kivy.clock import Clock
#from multiprocessing import Process, Pipe
from functools import partial
PythonActivity = autoclass('org.renpy.android.PythonActivity')
BluetoothAdapter = autoclass('android.bluetooth.BluetoothAdapter')
Intent = autoclass('android.content.Intent')
List = autoclass('java.util.ArrayList')
class MyApp(App):
def build(self):
root = BoxLayout()
buttons = BoxLayout(size_hint_x=.2, orientation='vertical')
root.add_widget(buttons)
b = Button(text='ask for bluetooth')
b.bind(on_release=self.ask)
buttons.add_widget(b)
b = Button(text='get bonded devices')
b.bind(on_release=self.get_bonded)
buttons.add_widget(b)
b = Button(text='discover')
b.bind(on_release=self.get_bonded)
buttons.add_widget(b)
b = Button(text='cancel discovery')
b.bind(on_release=self.get_bonded)
buttons.add_widget(b)
self.devices = GridLayout(rows=4)
root.add_widget(self.devices)
self.adapter = BluetoothAdapter.getDefaultAdapter()
return root
def ask(self, *args):
PythonActivity.mActivity.startActivityForResult(
Intent(BluetoothAdapter.ACTION_REQUEST_ENABLE), 200)
def get_bonded(self, args):
self.populate(self.adapter.getBondedDevices())
def discover(self, *args):
if not self.adapter.isEnabled():
self.ask()
#TODO
def connect_async(self, device, *args):
self.cancel_discovery()
#parent_conn, child_conn = Pipe()
#p = Process(target=self.conn_worker, args=(device, child_conn))
#Clock.schedule_interval(partial(self.check_conn, parent_conn), 1)
#p.start()
def conn_worker(self, device, conn):
socket = device.createRfcommSocketToServiceRecord('OHHY!THX')
conn.send(socket)
def connect(self, device):
self.cancel_discovery()
socket = device.createRfcommSocketToServiceRecord('OHHY!THX')
print socket
def check_conn(self, conn):
print conn.recv()
def cancel_discovery(self, *args):
self.adapter.cancelDiscovery()
def populate(self, devices):
self.devices.clear_widgets()
list_devices = List(devices.size())
list_devices.addAll(cast('java.util.Collection', devices))
for d in list_devices:
b = Button(text=d.getName())
b.bind(on_release=partial(self.connect, d))
self.devices.add_widget(b)
def on_pause(self, *args):
return True
def on_resume(self, *args):
pass
if __name__ == '__main__':
MyApp().run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment