Last active
October 21, 2019 21:13
-
-
Save LoadLow/32911addf2ac5e5f6a9a723cebd1d0fe to your computer and use it in GitHub Desktop.
ECW Prequals - Challenge "Puzzle"
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Modbus Message Parser | |
-------------------------------------------------------------------------- | |
""" | |
from __future__ import print_function | |
import codecs as c | |
import types | |
from pymodbus.compat import IS_PYTHON3 | |
from pymodbus.factory import ClientDecoder, ServerDecoder | |
from pymodbus.transaction import ModbusSocketFramer | |
class Decoder(object): | |
def __init__(self, framer, encode=False): | |
""" Initialize a new instance of the decoder | |
:param framer: The framer to use | |
:param encode: If the message needs to be encoded | |
""" | |
self.framer = framer | |
self.encode = encode | |
self.registers = bytearray(0xffff) | |
def decode(self, message): | |
""" Attempt to decode the supplied message | |
:param message: The messge to decode | |
""" | |
if IS_PYTHON3: | |
value = message if self.encode else c.encode(message, 'hex_codec') | |
else: | |
value = message if self.encode else message.encode('hex') | |
decoders = [ | |
self.framer(ServerDecoder(), client=None), | |
self.framer(ClientDecoder(), client=None) | |
] | |
for decoder in decoders: | |
try: | |
decoder.addToFrame(message) | |
if decoder.checkFrame(): | |
unit = decoder._header.get("uid", 0x00) | |
decoder.advanceFrame() | |
decoder.processIncomingPacket(message, self.report, unit) | |
except: | |
pass | |
def check_errors(self, decoder, message): | |
""" Attempt to find message errors | |
:param message: The message to find errors in | |
""" | |
def report(self, message): | |
""" The callback to print the message information | |
:param message: The message to print | |
""" | |
if message.__class__.__name__ == "WriteSingleRegisterRequest": | |
self.registers[message.address] = message.value | |
def get_messages(option): | |
""" A helper method to generate the messages to parse | |
:param options: The option manager | |
:returns: The message iterator to parse | |
""" | |
if option.message: | |
if option.transaction: | |
msg = "" | |
for segment in option.message.split(): | |
segment = segment.replace("0x", "") | |
segment = "0" + segment if len(segment) == 1 else segment | |
msg = msg + segment | |
option.message = msg | |
if not option.ascii: | |
if not IS_PYTHON3: | |
option.message = option.message.decode('hex') | |
else: | |
option.message = c.decode(option.message.encode(), 'hex_codec') | |
yield option.message | |
elif option.file: | |
with open(option.file, "r") as handle: | |
for line in handle: | |
if line.startswith('#'): continue | |
if not option.ascii: | |
line = line.strip() | |
line = c.decode(line.encode(), 'hex_codec') | |
yield line | |
def main(): | |
""" The main runner function | |
""" | |
infile = "samples/modbus_messages" | |
outfile = "samples/modbus_memdump" | |
option = types.SimpleNamespace() | |
option.ascii = False | |
option.debug = False | |
option.message = "" | |
option.file = infile | |
decoder = Decoder(ModbusSocketFramer, False) | |
for message in get_messages(option): | |
decoder.decode(message) | |
newfile = open(outfile, 'wb') | |
newfile.write(decoder.registers) | |
newfile.close() | |
print("Done ! Go check mem dump now : < " + outfile+" >") | |
if __name__ == "__main__": | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ strings samples/result_modbus | |
<FLAG> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment