-
-
Save reisraff/421b20d665ec2c0d2c4697baa3f302bd to your computer and use it in GitHub Desktop.
Simple pwntool::process
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import subprocess | |
import select | |
import fcntl | |
import os | |
import pty | |
import tty | |
import sys | |
import time | |
class P: | |
def __init__(self, command, args=[], debug=True): | |
self.stdout = [] | |
self.stderr = [] | |
self.debug = debug | |
self.is_in_interactive = False | |
master, slave = pty.openpty() | |
tty.setraw(master) | |
tty.setraw(slave) | |
if self.debug: | |
sys.stdout.write('Starting process "{}"\n'.format(' '.join([command] + args))) | |
sys.stdout.flush() | |
self.process = subprocess.Popen( | |
[command] + args, | |
stdin=subprocess.PIPE, | |
stdout=slave, | |
stderr=subprocess.STDOUT, | |
bufsize=1, | |
close_fds=True | |
) | |
self.process.stdout = os.fdopen(os.dup(master), 'rb', 0) | |
os.close(master) | |
os.close(slave) | |
if self.process.stdout: | |
fd = self.process.stdout.fileno() | |
fl = fcntl.fcntl(fd, fcntl.F_GETFL) | |
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) | |
def poll(self): | |
self.process.poll() | |
returncode = self.process.returncode | |
if returncode is not None: | |
self._stop_noticed = time.time() | |
sys.stdout.write("Process stopped with exit code {}\n".format(returncode)) | |
sys.stdout.flush() | |
exit() | |
return returncode | |
def send_raw(self, data): | |
self.poll() | |
self.process.stdin.write(data) | |
self.process.stdin.flush() | |
if self.debug: | |
self.__print_debug('Sent: ', data) | |
def sendline(self, line): | |
self.send_raw(line + b'\n') | |
def can_read(self): | |
a, b, c = select.select([self.process.stdout], [], [], 0.05) | |
if not a: | |
return False | |
return True | |
def recvuntil(self, delimiter): | |
while not self.can_read(): | |
time.sleep(0.05) | |
stop = False | |
buff = '' | |
while stop is False: | |
try: | |
if not self.is_in_interactive: | |
while not self.can_read(): | |
time.sleep(0.05) | |
buff += self.process.stdout.read(1) | |
if delimiter is not None: | |
if buff[-1] == delimiter: | |
stop = True | |
except Exception: | |
stop = True | |
if self.debug: | |
self.__print_debug('Received: ', buff) | |
return buff | |
def recvline(self): | |
self.poll() | |
return self.recvuntil('\n') | |
def recv(self): | |
self.poll() | |
return self.recvuntil(None) | |
def set_debug(self, val): | |
self.debug = val | |
def interactive(self, debug=None): | |
self.is_in_interactive = True | |
self.poll() | |
self.recv() | |
if debug is not None: | |
self.debug = debug | |
i = input if sys.version_info[0] >= 3 else raw_input | |
while True: | |
self.poll() | |
data = i('$ ') | |
if data: | |
self.sendline(data) | |
sys.stdout.write(self.recv()) | |
sys.stdout.flush() | |
def __print_debug(self, tag, data): | |
new_line = False if data[-1:] == '\n' else True | |
sys.stdout.write('[\033[31mDEBUG\033[0m] {} {} bytes\n'.format(tag, hex(len(data)))) | |
sys.stdout.write(' \033[34mRAW\033[0m: {}{}'.format(data, ('\n' if new_line else ''))) | |
sys.stdout.write(' \033[34mHEX\033[0m: ') | |
for c in data: | |
sys.stdout.write(('{:02x} '.format(ord(c) if type(c) is not int else c)).upper()) | |
sys.stdout.write('\n\n') | |
sys.stdout.flush() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment