Created
April 18, 2016 00:52
-
-
Save icedraco/43daef49241fb3530edae10891039da5 to your computer and use it in GitHub Desktop.
Protocol Obfuscator (aka: University Firewall Countermeasure)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Protocol Obfuscator (aka: University Firewall Countermeasure) | |
# | |
# This little script was made to punch a hole through the anti-SSH firewall | |
# rule my university seems to have added for the server I use. The rule resets | |
# connections that seem to exhibit OpenSSH banner/signature in them, which | |
# kinda sucks, since I can't just pick another port: I have to hide that SSH | |
# banner somehow. | |
# | |
# This script comes in pairs: | |
# * station: sits on the server itself, on an accessible (FW-unaffected) port | |
# and forwards connections to the actual SSH port | |
# | |
# * node: sits on the client side and forwards connections to the station | |
# | |
# Both use the same key to mangle and unmangle data over the wire so that the | |
# SSH banners and negotiation signatures at the beginning aren't as obvious as | |
# they normally are to a dumb packet inspection. | |
# | |
# Current implementation works in a Thread-Per-Client manner. It might benefit | |
# of a better approach if it is to be used on a more massive scale... | |
# | |
# Author: IceDragon <[email protected]> | |
import socketserver | |
import socket | |
import threading | |
from time import sleep | |
from sys import argv | |
# ----------------------------------------------------------------------------- | |
# CONFIGURATION | |
# ----------------------------------------------------------------------------- | |
KEY = "Random keystring of arbitrary size goes here" | |
DEFAULT_PROFILE = "node" | |
PROFILES = { | |
"node": { | |
'local': ("127.0.0.1", 31337), | |
'remote': ("78.47.13.37", 21) | |
}, | |
"station": { | |
'local': ("0.0.0.0", 21), | |
'remote': ("127.0.0.1", 22) | |
} | |
} | |
BUFFER_SIZE = 2048 | |
LOCAL_ADDR = None | |
REMOTE_ADDR = None | |
# ----------------------------------------------------------------------------- | |
# CLASSES: TCP Server | |
# ----------------------------------------------------------------------------- | |
class ThreadedTcpServer (socketserver.ThreadingMixIn, socketserver.TCPServer): | |
pass | |
class Handler (socketserver.BaseRequestHandler): | |
def log(self, message): | |
prefix = "[%s:%d]" % self.client_address | |
print("%s[%s] %s" % (prefix, threading.current_thread(), message)) | |
def handle(self): | |
obfuscator_out = Obfuscator(KEY) | |
obfuscator_in = Obfuscator(KEY) | |
self.log("Connecting...") | |
remote_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
remote_socket.connect(REMOTE_ADDR) | |
self.log("Connection established!") | |
remote_socket.setblocking(False) | |
self.request.setblocking(False) | |
initial_delay = 0 | |
delay = initial_delay | |
while True: | |
# Read C2S | |
try: | |
buffer = self.request.recv(BUFFER_SIZE) | |
if buffer == b'': | |
self.log("Connection closed") | |
self.request.close() | |
remote_socket.close() | |
break | |
else: | |
processed_buffer = obfuscator_out.obfuscate(buffer) | |
# self.log("C2S>> %s" % processed_buffer) | |
remote_socket.sendall(processed_buffer) | |
delay = initial_delay | |
except socket.error as e: | |
# self.log("socket.error => <" + str(e) + ">") | |
if delay < 0.3: | |
delay += 0.005 | |
# Read S2C | |
try: | |
buffer = remote_socket.recv(BUFFER_SIZE) | |
if buffer == b'': | |
self.log("Remote conneciton closed") | |
self.request.close() | |
remote_socket.close() | |
break | |
else: | |
processed_buffer = obfuscator_in.obfuscate(buffer) | |
# self.log("S2C>> %s" % processed_buffer) | |
self.request.sendall(processed_buffer) | |
delay = initial_delay | |
except socket.error as e: | |
# self.log("socket.error => <" + str(e) +">") | |
if delay < 0.3: | |
delay += 0.005 | |
sleep(delay) | |
# ----------------------------------------------------------------------------- | |
# CLASSES: Obfuscator | |
# ----------------------------------------------------------------------------- | |
class Obfuscator(object): | |
def __init__(self, key): | |
self.__key = bytes(key, 'ascii') | |
self.__index = 0 | |
self.__len = len(key) | |
def reset(self): | |
self.__index = 0 | |
def obfuscate(self, data): | |
output = [] | |
for ch in data: | |
output += [ch ^ self.__key[self.__index]] | |
# advance index and roll over as necessary | |
self.__index += 1 | |
if self.__index >= self.__len: | |
self.__index = 0 | |
return bytes(output) | |
# ----------------------------------------------------------------------------- | |
# ENTRY POINT | |
# ----------------------------------------------------------------------------- | |
def main(argv): | |
global DEFAULT_PROFILE, LOCAL_ADDR, REMOTE_ADDR | |
profile = DEFAULT_PROFILE if argv == [] or argv[0] not in PROFILES else argv[0] | |
print("PROFILE:", profile) | |
REMOTE_ADDR = PROFILES[profile]['remote'] | |
LOCAL_ADDR = PROFILES[profile]['local'] | |
print("Starting server...") | |
server = ThreadedTcpServer(LOCAL_ADDR, Handler) | |
server_thread = threading.Thread(target=server.serve_forever) | |
server_thread.daemon = True | |
server_thread.start() | |
print("<<>> Server started on " + server_thread.name) | |
server_thread.join() | |
if __name__ == "__main__": | |
main(argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment