Last active
May 20, 2019 19:13
-
-
Save ptsneves/f0ae013c0f7ba551e8a79f6661e8e747 to your computer and use it in GitHub Desktop.
Serial data sniffer with byte time stamp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import os | |
import pty | |
import time | |
def writeToPTY(fd, replay_time, byte_hex): | |
if not writeToPTY.next_send: | |
writeToPTY.next_send = replay_time + writeToPTY.send_period | |
writeToPTY.real_time_offset = time.time() - replay_time | |
if replay_time > writeToPTY.next_send: | |
os.write(fd, bytearray.fromhex(writeToPTY.bytestream)) | |
writeToPTY.bytestream = "" | |
#We need to invert because the following formula gives the delay. | |
#The problem is that we are not interested in doing anything when | |
#we are delayed. In that situation just pump the fastest possible. | |
#On the other hand when we are ahead of time and reading faster than | |
#the timestamps indicate we should sleep for some time. This is why | |
#we invert the signal. | |
ahead_of_schedule = -1.0 * (time.time() - writeToPTY.real_time_offset - replay_time) | |
if ahead_of_schedule > 0.5 * writeToPTY.send_period: | |
time.sleep(ahead_of_schedule) | |
writeToPTY.next_send = replay_time + writeToPTY.send_period | |
else: | |
writeToPTY.bytestream += byte_hex | |
writeToPTY.next_send = None | |
writeToPTY.send_period = 20E-3 | |
writeToPTY.bytestream = "" | |
writeToPTY.real_time_offset = 0 | |
def parseLine(line): | |
if len(line.split()) != 2: | |
raise Exception("Invalid line format. Cannot play this file. Example line\n" + line) | |
replay_time, byte_hex = line.split() | |
replay_time = float(replay_time) | |
return replay_time, byte_hex | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description="A program to play timestamped data to a virtual serial.") | |
parser.add_argument('-i', '--input-file', help="The input file path for playback.") | |
parser.add_argument('-l', '--symlink', help="Make") | |
parser.add_argument('--delay', help="Start playing after a given delay") | |
args = parser.parse_args() | |
if not args.input_file or not os.path.exists(args.input_file): | |
raise Exception("Please provide a valid input file from which to replay from.") | |
master, slave = pty.openpty() | |
pts_device = os.ttyname(slave) | |
if args.symlink: | |
os.symlink(pts_device, args.symlink) | |
try: | |
if args.delay: | |
time.sleep(int(args.delay)) | |
starting_time = time.time() | |
with open(args.input_file, "r") as f: | |
for line in f: | |
replay_time, byte_hex = parseLine(line) | |
writeToPTY(master, replay_time, byte_hex) | |
print("Took " + str(time.time() - starting_time)) | |
finally: | |
if args.symlink and os.path.exists(args.symlink): | |
os.unlink(args.symlink) | |
print("The end") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import serial | |
import time | |
import os | |
import pty | |
import fcntl | |
def setFDNonBlocking(fd): | |
flag = fcntl.fcntl(fd, fcntl.F_GETFL) | |
fcntl.fcntl(fd, fcntl.F_SETFL, flag | os.O_NONBLOCK) | |
def writeToSerialFromVirtualSerial(serial_obj, master_fd): | |
try: | |
master_read = os.read(master_fd, 1) | |
serial_obj.write(master_read) | |
except BlockingIOError: | |
pass | |
def writeRawDataToLog(log_file, raw_data): | |
byte_hex = bytes(raw_data).hex() | |
log_line = str(time.time()) + ' ' + byte_hex + '\n' | |
log_file.write(log_line) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description="A program to record and timestamp data received in a given serial stream.") | |
parser.add_argument('-d', '--device', help="The device file path.") | |
parser.add_argument('-b', '--baud', help="The baud rate.") | |
parser.add_argument('-o', '--output-file', help="The output file path for the recording.") | |
parser.add_argument('-f', '--force', action='store_true', help="Force and overwrite on a previous existing capture file") | |
parser.add_argument('-v', '--virtual-serial-file', help="A path where to create a symlink to the virtual serial.") | |
args = parser.parse_args() | |
if not args.device or not args.baud or not args.output_file: | |
raise Exception("Please pass a valid device, baud rate and output file. They are mandatory arguments.") | |
if os.path.exists(args.output_file) and not args.force: | |
raise Exception("File already exist and we do not overwrite files. Run other utilities for that effect.") | |
master, slave = pty.openpty() | |
setFDNonBlocking(master) | |
print(os.ttyname(slave)) | |
try: | |
serial_object = serial.Serial(args.device, args.baud) | |
print("Recording. Press Ctrl-c to stop the recording.") | |
with open(args.output_file, "w") as log_file: | |
while 1: | |
raw_data = serial_object.read(1) | |
writeRawDataToLog(log_file, raw_data) | |
os.write(master, raw_data) | |
writeToSerialFromVirtualSerial(serial_object, master) | |
except KeyboardInterrupt: | |
print("Finished recording to {}".format(args.output_file)) | |
finally: | |
serial_object.close() # close port |
Updated gist to make sure we are controlling the delay when playing and do not suffer undue performance degradation when playing byte by byte
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Very simple programs set that allows for man in the middle of a serial port. For example, the recorder will tap the desired serial port and also provide a virtual PTY where you can let another program "talk" to your serial part through the sniffer. We do not record the data sent, only received. The data is recorded byte by byte with a timestamp for each byte.
The serial-player only plays back the recorded captured files through a virtual serial port. You can tell where the virtual serial port should be, for example, the /dev/ directory/. If you do not specify the virtual serial location it will just print the path where it is created by default.
The data is played back at the rate it was recorded.