Skip to content

Instantly share code, notes, and snippets.

@brusangues
Forked from darkwave/proxy.py
Last active February 12, 2022 11:00
Show Gist options
  • Save brusangues/f297e71fb0d36ec3148f424644ff046e to your computer and use it in GitHub Desktop.
Save brusangues/f297e71fb0d36ec3148f424644ff046e to your computer and use it in GitHub Desktop.
Python 2.7 proxy server for HTTP GET
# Python Web Proxy for HTTP GET
# UFABC 2021 Computer Networks Project
# Inspired by: https://gist.github.com/darkwave/52842722c0c451807df4
# Imports
if 1:
import sys
import socket
import time # for delay
import select # selection of available sockets
import json # for usage database
import datetime # for usage time
# Socket options
delay = 0.0001
buffer_size = 4096 # or 8192
# Proxy options
proxy_port = 80
proxy_binding = '0.0.0.0' #'localhost'
proxy_setting = 'normal' # or 'append_path'
proxy_base = ('localhost', 80)
# Usage options
usage_limit = 100000000 # 100Mb
usage_days = 1
def headerChange(data="", proxy_setting='normal'):
print "---DATA BEFORE HEADER CHANGE:", data
try:
# Searching
path = data[5 : data.find(' HTTP/')]
print "---PATH:", path
hostidx = data.find('Host: ')+6
host = data[hostidx : hostidx+data[hostidx:].find('\n')-1]
print "---HOST:", host
cutidx = path.find('/') if path.find('/') != -1 else len(path)
realhost = path[0:cutidx]
realpath = path[cutidx:]
print "---REALHOST:", realhost
print "---REALPATH:", realpath
if proxy_setting == 'append_path':
# Checking
if realhost == '' or realhost == None:
raise "realhost not found"
# Substitutions
data = data.replace('/'+path, realpath)
#data = data.replace(host, realhost)
return data, realhost
else:
# Checking
if host == '' or host == None:
raise "host not found"
return data, host
except Exception as e:
print '---EXCEPT:', e
return data, proxy_base[0]
class Forward:
def __init__(self):
self.forward = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def start(self, host, port):
try:
self.forward.connect((host, port))
print "Forward", [host, port], "connected"
return self.forward
except Exception as e:
print e
return False
class Proxy:
# Sockets list
input_list = []
# Socket relationships (channel[user_socket] = server_socket; channel[server_socket] = user_socket)
channel = {}
def __init__(self, host, port, usage_limit, usage_days):
'''
Loads or creates usage database and initialize proxy server socket
'''
# Usage
try:
with open('usage.json') as json_file:
self.usage = json.load(json_file)
print " * JSON file has been loaded"
except:
print " * JSON file has been created"
self.usage = {}
self.usage_limit = usage_limit
self.usage_days = usage_days
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.bind((host, port))
self.server.listen(200)
def main_loop(self):
'''
This function will delay for a short time and select a subset of available sockets to process.
'''
self.input_list.append(self.server)
while 1:
time.sleep(delay)
ss = select.select
inputready, outputready, exceptready = ss(self.input_list, [], [])
for self.s in inputready:
print '---INPUTREADY'
if self.s == self.server:
self.on_accept()
break
if self.usage_limit_reached():
print "---USAGELIMIT:", self.s.getpeername()[0]
self.on_limit()
self.on_close()
break
try:
self.data = self.s.recv(buffer_size)
if len(self.data) == 0:
self.on_close()
break
else:
self.on_recv()
except Exception as e:
print e
self.on_close()
break
def on_accept(self):
'''Creates client-proxy socket'''
print "---ONACCEPT"
clientsock, clientaddr = self.server.accept()
print "Client", clientaddr, "connected"
self.input_list.append(clientsock)
def on_close(self):
'''Removes client-proxy and proxy-destination sockets from input_list and channel'''
print "---ONCLOSE"
try:
print self.s.getpeername(), "disconnected"
except Exception as e:
print e
print "Client closed"
self.input_list.remove(self.s)
try:
self.input_list.remove(self.channel[self.s])
out = self.channel[self.s]
self.channel[out].close() # equivalent to do self.s.close()
self.channel[self.s].close()
del self.channel[out]
del self.channel[self.s]
except Exception as e:
print e
print "Client closed"
def on_recv(self):
'''Function called whenever data is recovered from socket buffer.
If client socket has no matching destination socket, reads GET request and creates destination socket.
In any case, send data to matching socket.
If the data contains an error, closes connection.
'''
data = self.data
# If client socket has no destination socket yet
if not ( self.s in self.channel.keys() ):
print "---HEADERCHANGE"
data, realhost = headerChange(data, proxy_setting)
print "---FORWARD CREATION:", realhost
forward = Forward().start(realhost, 80)
if forward:
print "Server", forward.getpeername(), "connected"
self.input_list.append(forward)
self.channel[self.s] = forward
self.channel[forward] = self.s
else:
print "Can't establish connection with remote server"
print "Closing connection with client", self.s.getpeername()
self.s.close()
# Else if data is a get request from client
elif data.find("GET") == 0:
# If request is get
data, realhost = headerChange(data)
# Usage code
else:
# If is recieving data
self.usage_update()
print "---USAGE: ", self.usage
# Printing and sending data
print "---DATA TO", self.channel[self.s].getpeername(), ":"
print data
self.channel[self.s].send(data)
# If data is error
if data.find("HTTP/1.1 404") == 0:
self.on_close()
def usage_update(self):
'''Function to update usage data on database.'''
data = self.data
clientaddr = self.channel[self.s].getpeername()[0]
# Update dict
if clientaddr in self.usage.keys():
self.usage[clientaddr][0] += len(data)
else:
self.usage[clientaddr] = [len(data), time.time()]
# Update file
with open('usage.json', 'w') as outfile:
json.dump(self.usage, outfile)
def usage_limit_reached(self):
'''Function to check if limit has been reached.
Returns true if the limit has been reached.'''
addr = self.s.getpeername()[0]
if addr in self.usage.keys():
# Checks if the database time + 1 day is smaller than today's sate
one_day_old = datetime.datetime.fromtimestamp(self.usage[addr][1]) + \
datetime.timedelta(days=self.usage_days) < datetime.datetime.utcnow()
if not one_day_old:
if self.usage[addr][0] >= self.usage_limit:
return True
else:
self.usage[addr] = [0, time.time()]
return False
def on_limit(self):
'''Function to send http page informing that the usage limit has been reached.'''
addr = self.s.getpeername()[0]
data = "HTTP/1.1 200 OK \n"+\
"Content-Length: 73 \n"+\
"Content-Type: text/html; charset=utf-8 \n"+\
"\n"+\
"Usage Limit Reached! <br>\n"+\
"Host:{} <br>\n".format(addr)+\
"Bytes Used:{:0.0f} \n".format(self.usage[addr][0])+\
"\n"
print "---DATA:", data
self.s.send(data)
if __name__ == '__main__':
print ' * Proxy GET Server'
print ' * {} Setting '.format(proxy_setting.upper())
proxy = Proxy(proxy_binding, proxy_port, usage_limit, usage_days)
print ' * Listening on: ' + str(proxy_binding) + ' : ' + str(proxy_port)
try:
proxy.main_loop()
except KeyboardInterrupt:
print "Ctrl C - Stopping server"
sys.exit(1)
@juliofalbo
Copy link

Thanks for that.

Helped me to validate a POC easily!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment