Created
April 28, 2015 16:50
-
-
Save taylor224/0c63e8bc2262ae4a60f1 to your computer and use it in GitHub Desktop.
WiFi Client Service
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# /usr/local/lib/python2.7/dist-packages/wifi/scheme.py | |
import re | |
import itertools | |
import wifi.subprocess_compat as subprocess | |
from pbkdf2 import PBKDF2 | |
from wifi.utils import ensure_file_exists | |
from wifi.exceptions import ConnectionError | |
def configuration(cell, passkey=None): | |
""" | |
Returns a dictionary of configuration options for cell | |
Asks for a password if necessary | |
""" | |
if not cell.encrypted: | |
return { | |
'wireless-essid': cell.ssid, | |
'wireless-channel': 'auto', | |
} | |
else: | |
if cell.encryption_type.startswith('wpa'): | |
if len(passkey) != 64: | |
passkey = PBKDF2(passkey, cell.ssid, 4096).hexread(32) | |
return { | |
'wpa-ssid': cell.ssid, | |
'wpa-psk': passkey, | |
'wireless-channel': 'auto', | |
} | |
elif cell.encryption_type == 'wep': | |
# Pass key lengths in bytes for WEP depend on type of key and key length: | |
# | |
# 64bit 128bit 152bit 256bit | |
# hex 10 26 32 58 | |
# ASCII 5 13 16 29 | |
# | |
# (source: https://en.wikipedia.org/wiki/Wired_Equivalent_Privacy) | |
# | |
# ASCII keys need to be prefixed with an s: in the interfaces file in order to work with linux' wireless | |
# tools | |
ascii_lengths = (5, 13, 16, 29) | |
if len(passkey) in ascii_lengths: | |
# we got an ASCII passkey here (otherwise the key length wouldn't match), we'll need to prefix that | |
# with s: in our config for the wireless tools to pick it up properly | |
passkey = "s:" + passkey | |
return { | |
'wireless-essid': cell.ssid, | |
'wireless-key': passkey, | |
} | |
else: | |
raise NotImplementedError | |
bound_ip_re = re.compile(r'^bound to (?P<ip_address>\S+)', flags=re.MULTILINE) | |
class Scheme(object): | |
""" | |
Saved configuration for connecting to a wireless network. This | |
class provides a Python interface to the /etc/network/interfaces | |
file. | |
""" | |
interfaces = '/etc/network/interfaces' | |
@classmethod | |
def for_file(cls, interfaces): | |
""" | |
A class factory for providing a nice way to specify the interfaces file | |
that you want to use. Use this instead of directly overwriting the | |
interfaces Class attribute if you care about thread safety. | |
""" | |
return type(cls)(cls.__name__, (cls,), { | |
'interfaces': interfaces, | |
}) | |
def __init__(self, interface, name, options=None): | |
self.interface = interface | |
self.name = name.replace(" ", "__0__") | |
self.options = options or {} | |
def __str__(self): | |
""" | |
Returns the representation of a scheme that you would need | |
in the /etc/network/interfaces file. | |
""" | |
iface = "iface {interface}-{name} inet dhcp".format(**vars(self)) | |
options = ''.join("\n {k} {v}".format(k=k, v=v) for k, v in self.options.items()) | |
return iface + options + '\n' | |
def __repr__(self): | |
return 'Scheme(interface={interface!r}, name={name!r}, options={options!r}'.format(**vars(self)) | |
@classmethod | |
def all(cls): | |
""" | |
Returns an generator of saved schemes. | |
""" | |
ensure_file_exists(cls.interfaces) | |
with open(cls.interfaces, 'r') as f: | |
return extract_schemes(f.read(), scheme_class=cls) | |
@classmethod | |
def where(cls, fn): | |
return list(filter(fn, cls.all())) | |
@classmethod | |
def find(cls, interface, name): | |
""" | |
Returns a :class:`Scheme` or `None` based on interface and | |
name. | |
""" | |
try: | |
return cls.where(lambda s: s.interface == interface and s.name == name.replace(" ", "__0__"))[0] | |
except IndexError: | |
return None | |
@classmethod | |
def for_cell(cls, interface, name, cell, passkey=None): | |
""" | |
Intuits the configuration needed for a specific | |
:class:`Cell` and creates a :class:`Scheme` for it. | |
""" | |
return cls(interface, name, configuration(cell, passkey)) | |
def save(self): | |
""" | |
Writes the configuration to the :attr:`interfaces` file. | |
""" | |
assert not self.find(self.interface, self.name), "This scheme already exists" | |
with open(self.interfaces, 'a') as f: | |
f.write('\n') | |
f.write(str(self)) | |
def delete(self): | |
""" | |
Deletes the configuration from the :attr:`interfaces` file. | |
""" | |
iface = "iface %s-%s inet dhcp" % (self.interface, self.name) | |
content = '' | |
with open(self.interfaces, 'r') as f: | |
skip = False | |
for line in f: | |
if not line.strip(): | |
skip = False | |
elif line.strip() == iface: | |
skip = True | |
if not skip: | |
content += line | |
with open(self.interfaces, 'w') as f: | |
f.write(content) | |
@property | |
def iface(self): | |
return '{0}-{1}'.format(self.interface, self.name) | |
def as_args(self): | |
args = list(itertools.chain.from_iterable( | |
('-o', '{k}={v}'.format(k=k, v=v)) for k, v in self.options.items())) | |
return [self.interface + '=' + self.iface] + args | |
def activate(self): | |
""" | |
Connects to the network as configured in this scheme. | |
""" | |
subprocess.check_output(['/sbin/ifdown', self.interface], stderr=subprocess.STDOUT) | |
ifup_output = subprocess.check_output(['/sbin/ifup'] + self.as_args(), stderr=subprocess.STDOUT) | |
ifup_output = ifup_output.decode('utf-8') | |
return self.parse_ifup_output(ifup_output) | |
def parse_ifup_output(self, output): | |
matches = bound_ip_re.search(output) | |
if matches: | |
return Connection(scheme=self, ip_address=matches.group('ip_address')) | |
else: | |
raise ConnectionError("Failed to connect to %r" % self) | |
class Connection(object): | |
""" | |
The connection object returned when connecting to a Scheme. | |
""" | |
def __init__(self, scheme, ip_address): | |
self.scheme = scheme | |
self.ip_address = ip_address | |
# TODO: support other interfaces | |
scheme_re = re.compile(r'iface\s+(?P<interface>wlan\d?)(?:-(?P<name>\w+))?') | |
def extract_schemes(interfaces, scheme_class=Scheme): | |
lines = interfaces.splitlines() | |
while lines: | |
line = lines.pop(0) | |
if line.startswith('#') or not line: | |
continue | |
match = scheme_re.match(line) | |
if match: | |
options = {} | |
interface, scheme = match.groups() | |
if not scheme or not interface: | |
continue | |
while lines and lines[0].startswith(' '): | |
key, value = re.sub(r'\s{2,}', ' ', lines.pop(0).strip()).split(' ', 1) | |
options[key] = value | |
scheme = scheme_class(interface, scheme, options) | |
yield scheme |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import wifi | |
import bluetooth | |
import subprocess | |
import json | |
import time | |
def Search(): | |
wifilist = [] | |
cells = None | |
cells = wifi.Cell.all('wlan0') | |
return cells | |
def SearchForCommand(): | |
wifilist = [] | |
cells = None | |
try: | |
cells = wifi.Cell.all('wlan0') | |
except wifi.exceptions.InterfaceError: | |
return '<busy>' | |
except: | |
return '<exception>' | |
for cell in cells: | |
celldata = { | |
'ssid': cell.ssid, | |
'signal': cell.signal, | |
'quality': cell.quality, | |
'frequency': cell.frequency, | |
'bitrates': cell.bitrates, | |
'encrypted': cell.encrypted, | |
'channel': cell.channel, | |
'address': cell.address, | |
'mode': cell.mode | |
} | |
wifilist.append(celldata) | |
return wifilist | |
def FindFromSearchList(ssid): | |
wifilist = Search() | |
for cell in wifilist: | |
if cell.ssid == ssid: | |
return cell | |
return False | |
def FindFromSavedList(ssid): | |
cell = wifi.Scheme.find('wlan0', ssid) | |
if cell: | |
return cell | |
return False | |
def Connect(ssid, password=None): | |
cell = FindFromSearchList(ssid) | |
if cell: | |
savedcell = FindFromSavedList(cell.ssid) | |
# Already Saved from Setting | |
if savedcell: | |
savedcell.activate() | |
return cell | |
# First time to conenct | |
else: | |
if cell.encrypted: | |
if password: | |
scheme = Add(cell, password) | |
try: | |
scheme.activate() | |
# Wrong Password | |
except wifi.exceptions.ConnectionError: | |
Delete(ssid) | |
return '<wrong_password>' | |
return cell | |
else: | |
return '<require_password>' | |
else: | |
scheme = Add(cell) | |
try: | |
scheme.activate() | |
except wifi.exceptions.ConnectionError: | |
Delete(ssid) | |
return '<connect_error>' | |
return cell | |
else: | |
return '<not_found>' | |
def Add(cell, password=None): | |
if not cell: | |
return False | |
scheme = wifi.Scheme.for_cell('wlan0', cell.ssid, cell, password) | |
scheme.save() | |
return scheme | |
def Delete(ssid): | |
if not ssid: | |
return False | |
cell = FindFromSavedList(ssid) | |
if cell: | |
cell.delete() | |
connectedAP = GetConnectedAP() | |
if connectedAP == ssid: | |
subprocess.check_output('ifdown wlan0', shell=True) | |
subprocess.check_output('ifup wlan0', shell=True) | |
return True | |
return False | |
def GetConnectedAP(): | |
popresult = None | |
try: | |
popresult = subprocess.check_output('iwconfig | grep wlan0', shell=True) | |
except: | |
return False | |
if not popresult: | |
return False | |
if 'ESSID:off/any' in popresult: | |
return '<not_connected>' | |
if not len(popresult.split('\"')) > 2: | |
return False | |
ssid = popresult.split('\"')[1] | |
if ssid == '' or not ssid: | |
return False | |
else: | |
return ssid | |
def CommandProcess(data): | |
command = '' | |
try: | |
command = data.get('command') | |
print '[ALERT] Command : ', command | |
# Connect WiFi | |
if command == 'connect': | |
ssid = data.get('ssid') | |
password = data.get('password') | |
if not ssid: | |
return ResultProcess(command, 'no_argument') | |
result = Connect(ssid, password) | |
if type(result) is str: | |
return ResultProcess(command, result.replace('<', '').replace('>', ''), ssid) | |
if type(result) is wifi.Cell: | |
return ResultProcess(command, 'success', ssid) | |
if not result: | |
return ResultProcess(command, 'connect_fail', ssid) | |
# Delete WiFi | |
if command == 'delete': | |
ssid = data.get('ssid') | |
if not ssid: | |
return ResultProcess(command, 'no_argument') | |
if Delete(ssid): | |
return ResultProcess(command, 'success', ssid) | |
else: | |
return ResultProcess(command, 'delete_fail', ssid) | |
# Search WiFi | |
if command == 'search': | |
searchlist = SearchForCommand() | |
if searchlist == '<busy>': | |
return ResultProcess(command, 'busy') | |
if searchlist == '<exception>': | |
return ResultProcess(command, 'exception') | |
if not searchlist: | |
return ResultProcess(command, 'no_result') | |
return ResultProcess(command, 'success', searchlist) | |
# Get Current WiFi | |
if command == 'current': | |
ssid = GetConnectedAP() | |
if not ssid: | |
return ResultProcess(command, 'exception') | |
if ssid == '<not_connected>': | |
return ResultProcess(command, 'not_connected') | |
return ResultProcess(command, 'success', ssid) | |
except wifi.exceptions.InterfaceError, e: | |
return ResultProcess(command, str(e)) | |
except Exception, e: | |
print '[ALERT] Command Process Exception : ', e | |
return ResultProcess(command, str(e)) | |
return ResultProcess(command, 'program_fault') | |
def ResultProcess(command, result, data=None): | |
jsonresult = '' | |
if not command or not result: | |
return False | |
if data: | |
jsonresult = { | |
'command' : command, | |
'result' : result, | |
'data' : data | |
} | |
else: | |
jsonresult = { | |
'command' : command, | |
'result' : result | |
} | |
return json.dumps(jsonresult) | |
if __name__ == '__main__': | |
print '[INFO] Sleep 5 second to Initialize' | |
time.sleep(5) | |
blueSocket=bluetooth.BluetoothSocket(bluetooth.RFCOMM) | |
blueSocket.bind(("", 1)) | |
blueSocket.listen(1) | |
port = blueSocket.getsockname()[1] | |
uuid = "94f39d29-7d6d-437d-973b-fba39e49d4ee" | |
bluetooth.advertise_service( blueSocket, "WiFiAssistance", | |
service_id = uuid, | |
service_classes = [uuid, bluetooth.SERIAL_PORT_CLASS], | |
profiles = [bluetooth.SERIAL_PORT_PROFILE] | |
) | |
print "[INFO] Waiting for connection on RFCOMM channel %d" % port | |
clientSocket, clientInfo = blueSocket.accept() | |
print "[ALERT] Accepted connection from ", clientInfo | |
try: | |
while True: | |
data = clientSocket.recv(4096) | |
if len(data) == 0: break | |
print "received [%s]" % data | |
try: | |
data = json.loads(data) | |
result = CommandProcess(data) | |
if result: | |
clientSocket.send(result) | |
else: | |
pass | |
except Exception, e: | |
print e | |
ResultProcess('error', str(e)) | |
except IOError: | |
pass | |
print "[INFO] Bluetooth Disconnected" | |
clientSocket.close() | |
blueSocket.close() | |
print "Program Finished" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment