Created
November 22, 2012 00:17
-
-
Save sporsh/4128667 to your computer and use it in GitHub Desktop.
Implementation of Android Debug Bridge (ADB) protocol in Twisted
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from adb import protocol | |
data = ( | |
'\x4f\x50\x45\x4e\x02\x00\x00\x00' | |
'\x00\x00\x00\x00\x09\x00\x00\x00' | |
'\x31\x03\x00\x00\xb0\xaf\xba\xb1' | |
'\x73\x68\x65\x6c\x6c\x3a\x6c\x73' | |
'\x00' | |
) | |
data = ( | |
'\x4f\x4b\x41\x59\x02\x00\x00\x00' | |
'\x09\x00\x00\x00\x00\x00\x00\x00' | |
'\x00\x00\x00\x00\xb0\xb4\xbe\xa6' | |
) | |
message, _ = protocol.AdbMessage.decode(data) | |
print "HEADER:", message.header | |
print "MESSAGE:", message |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
@author: Geir Sporsheim | |
@see: git repo https://android.googlesource.com/platform/system/core/ | |
@see: source file adb/adb.h | |
""" | |
from twisted.internet import protocol | |
import struct | |
MAX_PAYLOAD = 4096 | |
A_SYNC = 0x434e5953 | |
A_CNXN = 0x4e584e43 | |
A_OPEN = 0x4e45504f | |
A_OKAY = 0x59414b4f | |
A_CLSE = 0x45534c43 | |
A_WRTE = 0x45545257 | |
A_VERSION = 0x01000000 # ADB protocol version | |
def getCommandString(message): | |
"""Returns a readable string representation of a message code | |
""" | |
return struct.pack('L', message) | |
class AdbMessageHeader(tuple): | |
_fmt = '6L' | |
def __new__(cls, command, arg0, arg1, data_length, data_check, magic): | |
""" | |
@param command: command identifier constant | |
@param arg0: first argument | |
@param arg1: second argument | |
@param length: length of payload (0 is allowed) | |
@param data_check: checksum of data payload | |
@param magic: command ^ 0xffffffff | |
""" | |
return tuple.__new__(cls, (command, | |
arg0, | |
arg1, | |
data_length, | |
data_check, | |
magic)) | |
command = property(lambda self: self[0]) | |
arg0 = property(lambda self: self[1]) | |
arg1 = property(lambda self: self[2]) | |
data_length = property(lambda self: self[3]) | |
data_check = property(lambda self: self[4]) | |
magic = property(lambda self: self[5]) | |
def encode(self): | |
return struct.pack(self._fmt, | |
self.command, | |
self.arg0, | |
self.arg1, | |
self.data_length, | |
self.data_check, | |
self.magic) | |
@classmethod | |
def decode(cls, data): | |
length = struct.calcsize(cls._fmt) | |
if len(data) < length: | |
return | |
args = struct.unpack(cls._fmt, data[:length]) | |
return cls(*args), data[length:] | |
def __str__(self, *args, **kwargs): | |
return str((getCommandString(self.command), | |
self.arg0, self.arg1, self.data_length, | |
self.data_check, self.magic)) | |
class AdbMessage(object): | |
def __init__(self, command, arg0, arg1, data=''): | |
self.command = command | |
self.arg0 = arg0 | |
self.arg1 = arg1 | |
self.data = data | |
@property | |
def header(self): | |
data_check = sum(ord(c) for c in self.data) | |
magic = self.command ^ 0xffffffff | |
return AdbMessageHeader(self.command, | |
self.arg0, | |
self.arg1, | |
len(self.data), | |
data_check, | |
magic) | |
@classmethod | |
def decode(cls, data): | |
header, data = AdbMessageHeader.decode(data) | |
if len(data) < header.data_length: | |
return | |
message = cls(header.command, header.arg0, header.arg1, data) | |
message.validate(header) | |
return message, data[header.data_length:] | |
def encode(self): | |
return self.header.encode() + self.data | |
def validate(self, header): | |
assert self.header == header | |
def __eq__(self, other): | |
return self.header == other.header and self.data == other.data | |
def __repr__(self): | |
return "%s(%r)" % (self.header, self.data) | |
class AdbProtocolBase(protocol.Protocol): | |
deferred = None | |
buff = '' | |
def dataReceived(self, data): | |
self.buff += data | |
message = self.getMessage() | |
while message: | |
self.dispatchMessage(message) | |
message = self.getMessage() | |
def getMessage(self): | |
try: | |
message, self.buff = AdbMessage.decode(self.buff) | |
except: | |
#TODO: correctly handle corrupt messages | |
return | |
return message | |
def dispatchMessage(self, message): | |
name = 'adb_' + getCommandString(message.command) | |
handler = getattr(self, name, self.unhandeledMessage) | |
handler(message) | |
def unhandeledMessage(self, message): | |
print "Unhandeled message:", message | |
def sendConnect(self): | |
data = 'host::' | |
self.sendCommand(A_CNXN, A_VERSION, MAX_PAYLOAD, data) | |
def sendCommand(self, command, arg0, arg1, data): | |
message = AdbMessage(command, arg0, arg1, data + '\x00') | |
self.sendMessage(message) | |
def sendMessage(self, message): | |
"""Send a complete ADB message to the peer | |
""" | |
#TODO: split message into chunks of MAX_PAYLOAD | |
self.transport.write(message.encode()) | |
connectMessage = AdbMessage(A_CNXN, A_VERSION, MAX_PAYLOAD, 'host::\x00') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import unittest | |
from adb import protocol | |
class AdbProtocolTest(unittest.TestCase): | |
def setUp(self): | |
self.protocol = protocol.AdbProtocolBase() | |
def test_get_message(self): | |
messages = [] | |
self.protocol.adb_OKAY = messages.append | |
data = "hello adb\x00" | |
message = protocol.AdbMessage(protocol.A_OKAY, 0, 1, data) | |
# Encode the message and send it in two pieces | |
encoded_message = message.encode() | |
self.protocol.dataReceived(encoded_message[:10]) | |
self.protocol.dataReceived(encoded_message[10:]) | |
self.assertEqual(messages, [message]) | |
def test_encode_decode_message(self): | |
# This is the connect message grabbed from adb server | |
data = ('\x43\x4e\x58\x4e\x00\x00\x00\x01' | |
'\x00\x10\x00\x00\x07\x00\x00\x00' | |
'\x32\x02\x00\x00\xbc\xb1\xa7\xb1' | |
'\x68\x6f\x73\x74\x3a\x3a\x00') | |
message = protocol.AdbMessage(protocol.A_CNXN, | |
protocol.A_VERSION, | |
protocol.MAX_PAYLOAD, | |
'host::\x00') | |
self.assertEquals(message.encode(), data, | |
"Message did encode to the expected data") | |
decoded_message, _ = protocol.AdbMessage.decode(data) | |
self.assertEquals(decoded_message, message, | |
"Data did not decode to the expected message") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""System test | |
""" | |
from twisted.internet import reactor | |
from twisted.internet.endpoints import clientFromString | |
from twisted.internet.protocol import ClientFactory | |
from adb import protocol | |
def main(): | |
endpoint = clientFromString(reactor, "tcp:localhost:5555:timeout=5") | |
# endpoint = clientFromString(reactor, "unix:/dev/bus/usb/001/012") | |
factory = ClientFactory() | |
factory.protocol = protocol.AdbProtocolBase | |
data = [] | |
def adb_CNXN(client, message): | |
print "GOT MESSAGE", message | |
client.sendCommand(protocol.A_OPEN, 2, message.arg0, 'shell:ls\x00') | |
def adb_WRTE(client, message): | |
print "GOT MESSAGE", message | |
data.append(message.data) | |
client.sendCommand(protocol.A_OKAY, 2, message.arg0, '') | |
def adb_CLSE(client, message): | |
print "GOT MESSAGE", message | |
client.sendCommand(protocol.A_CLSE, 2, message.arg0, '') | |
reactor.stop() | |
factory.protocol.adb_CNXN = adb_CNXN | |
factory.protocol.adb_WRTE = adb_WRTE | |
factory.protocol.adb_CLSE = adb_CLSE | |
client_d = endpoint.connect(factory) | |
@client_d.addCallback | |
def send_connect(client): | |
print "TCP CONNECTED", client | |
client.sendCommand(protocol.A_CNXN, | |
protocol.A_VERSION, | |
protocol.MAX_PAYLOAD, | |
'host::\x00') | |
@client_d.addErrback | |
def connection_failed(reason): | |
print reason.getErrorMessage() | |
reactor.stop() | |
reactor.run() | |
print "DONE", ''.join(data).splitlines() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment