Skip to content

Instantly share code, notes, and snippets.

@aq1
Last active February 13, 2017 22:27
Show Gist options
  • Save aq1/4517f002b3de0dca7558fc8eaa2fe724 to your computer and use it in GitHub Desktop.
Save aq1/4517f002b3de0dca7558fc8eaa2fe724 to your computer and use it in GitHub Desktop.
import os
from datetime import datetime
from twisted.names.client import lookupPointer
from twisted.web.server import Site
from twisted.web.resource import Resource
from twisted.internet import reactor, utils
class View(Resource):
HTML = '''
<!DOCTYPE html>
<html>
<head>
<title>WhoIs</title>
</head>
<body>
{}
</body>
</html>
'''
def get_page(self, body):
return self.HTML.format(body)
class HistoryResource(View):
TABLE = '''
<table>
<tr>
<th>IP</th>
<th>PTR</th>
<th>WhoIs</th>
<th>DateTime</th>
</tr>
{}
</table>
<form method="post" action="/history">
<input type="submit" value="Clear">
</form>
'''
TR = '''
<tr>
<td>{ip}</td>
<td>{ptr}</td>
<td>{whois}</td>
<td>{datetime}</td>
</tr>
'''
CLEAR = '''
<p>History's been cleared</p>
<a href="/">Back</a>
'''
records = []
defers = []
def render_GET(self, request):
rows = ''.join(self.TR.format(**row) for row in reversed(self.records))
return self.get_page(self.TABLE.format(rows))
def render_POST(self, request):
map(lambda d: d.cancel(), self.defers)
self.records = []
self.defers = []
return self.get_page(self.CLEAR)
@staticmethod
def _reverse_name_from_ip(ip):
return '.'.join(reversed(ip.split('.'))) + '.in-addr.arpa'
def _ptr_tasks(self, _id):
def _success(result):
result, _, _ = result
if result:
result = result[0].payload.name
else:
result = 'No result for no reason'
self.records[_id]['ptr'] = result
def _error(result):
self.records[_id]['ptr'] = result
d = lookupPointer(name=self._reverse_name_from_ip(self.records[_id]['ip']))
d.addCallback(_success)
d.addErrback(_error)
def _whois_task(self, _id):
def _callback(result):
self.records[_id]['whois'] = result
d = utils.getProcessOutput(r'/usr/bin/whois', [self.records[_id]['ip']])
d.addCallbacks(_callback, _callback)
def _add_task(self, _id):
self._ptr_tasks(_id)
self._whois_task(_id)
def add(self, ip):
self.records.append({
'ip': ip,
'datetime': datetime.now().isoformat(),
'ptr': 'Not ready yet',
'whois': 'Not ready yet',
})
self._add_task(len(self.records) - 1)
class WhoIsResource(View):
history_res = HistoryResource()
GETS = '''
<form method="post" action="/">
<label>
IPV4
<input type="text" name="ip" required>
<input type="submit" value="Add">
</label>
</form>
'''
POST = '''
<p>IP added</p>
<a href="/history">Check It</a>
'''
def render_GET(self, request):
return self.get_page(self.GETS)
def render_POST(self, request):
self.history_res.add(request.args.get('ip', [])[0])
return self.get_page(self.POST)
root = Resource()
w = WhoIsResource()
root.putChild('', w)
root.putChild('history', w.history_res)
reactor.listenTCP(80, Site(root))
print('Go')
reactor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment