Skip to content

Instantly share code, notes, and snippets.

@icedraco
Created April 18, 2016 00:52
Show Gist options
  • Save icedraco/43daef49241fb3530edae10891039da5 to your computer and use it in GitHub Desktop.
Save icedraco/43daef49241fb3530edae10891039da5 to your computer and use it in GitHub Desktop.
Protocol Obfuscator (aka: University Firewall Countermeasure)
# Protocol Obfuscator (aka: University Firewall Countermeasure)
#
# This little script was made to punch a hole through the anti-SSH firewall
# rule my university seems to have added for the server I use. The rule resets
# connections that seem to exhibit OpenSSH banner/signature in them, which
# kinda sucks, since I can't just pick another port: I have to hide that SSH
# banner somehow.
#
# This script comes in pairs:
# * station: sits on the server itself, on an accessible (FW-unaffected) port
# and forwards connections to the actual SSH port
#
# * node: sits on the client side and forwards connections to the station
#
# Both use the same key to mangle and unmangle data over the wire so that the
# SSH banners and negotiation signatures at the beginning aren't as obvious as
# they normally are to a dumb packet inspection.
#
# Current implementation works in a Thread-Per-Client manner. It might benefit
# of a better approach if it is to be used on a more massive scale...
#
# Author: IceDragon <[email protected]>
import socketserver
import socket
import threading
from time import sleep
from sys import argv
# -----------------------------------------------------------------------------
# CONFIGURATION
# -----------------------------------------------------------------------------
KEY = "Random keystring of arbitrary size goes here"
DEFAULT_PROFILE = "node"
PROFILES = {
"node": {
'local': ("127.0.0.1", 31337),
'remote': ("78.47.13.37", 21)
},
"station": {
'local': ("0.0.0.0", 21),
'remote': ("127.0.0.1", 22)
}
}
BUFFER_SIZE = 2048
LOCAL_ADDR = None
REMOTE_ADDR = None
# -----------------------------------------------------------------------------
# CLASSES: TCP Server
# -----------------------------------------------------------------------------
class ThreadedTcpServer (socketserver.ThreadingMixIn, socketserver.TCPServer):
pass
class Handler (socketserver.BaseRequestHandler):
def log(self, message):
prefix = "[%s:%d]" % self.client_address
print("%s[%s] %s" % (prefix, threading.current_thread(), message))
def handle(self):
obfuscator_out = Obfuscator(KEY)
obfuscator_in = Obfuscator(KEY)
self.log("Connecting...")
remote_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
remote_socket.connect(REMOTE_ADDR)
self.log("Connection established!")
remote_socket.setblocking(False)
self.request.setblocking(False)
initial_delay = 0
delay = initial_delay
while True:
# Read C2S
try:
buffer = self.request.recv(BUFFER_SIZE)
if buffer == b'':
self.log("Connection closed")
self.request.close()
remote_socket.close()
break
else:
processed_buffer = obfuscator_out.obfuscate(buffer)
# self.log("C2S>> %s" % processed_buffer)
remote_socket.sendall(processed_buffer)
delay = initial_delay
except socket.error as e:
# self.log("socket.error => <" + str(e) + ">")
if delay < 0.3:
delay += 0.005
# Read S2C
try:
buffer = remote_socket.recv(BUFFER_SIZE)
if buffer == b'':
self.log("Remote conneciton closed")
self.request.close()
remote_socket.close()
break
else:
processed_buffer = obfuscator_in.obfuscate(buffer)
# self.log("S2C>> %s" % processed_buffer)
self.request.sendall(processed_buffer)
delay = initial_delay
except socket.error as e:
# self.log("socket.error => <" + str(e) +">")
if delay < 0.3:
delay += 0.005
sleep(delay)
# -----------------------------------------------------------------------------
# CLASSES: Obfuscator
# -----------------------------------------------------------------------------
class Obfuscator(object):
def __init__(self, key):
self.__key = bytes(key, 'ascii')
self.__index = 0
self.__len = len(key)
def reset(self):
self.__index = 0
def obfuscate(self, data):
output = []
for ch in data:
output += [ch ^ self.__key[self.__index]]
# advance index and roll over as necessary
self.__index += 1
if self.__index >= self.__len:
self.__index = 0
return bytes(output)
# -----------------------------------------------------------------------------
# ENTRY POINT
# -----------------------------------------------------------------------------
def main(argv):
global DEFAULT_PROFILE, LOCAL_ADDR, REMOTE_ADDR
profile = DEFAULT_PROFILE if argv == [] or argv[0] not in PROFILES else argv[0]
print("PROFILE:", profile)
REMOTE_ADDR = PROFILES[profile]['remote']
LOCAL_ADDR = PROFILES[profile]['local']
print("Starting server...")
server = ThreadedTcpServer(LOCAL_ADDR, Handler)
server_thread = threading.Thread(target=server.serve_forever)
server_thread.daemon = True
server_thread.start()
print("<<>> Server started on " + server_thread.name)
server_thread.join()
if __name__ == "__main__":
main(argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment