Last active
August 29, 2015 14:23
-
-
Save mcm/1351aa982d9fcf7d2502 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import collections | |
import re | |
import requests | |
import sys | |
import yaml | |
from urllib.parse import parse_qs | |
Result = collections.namedtuple("Result", ("value", "pretty_name")) | |
class Site: | |
_session = None | |
@property | |
def session(self): | |
if self._session is None: | |
self._session = requests.Session() | |
self._session.headers.update({"User-Agent": "Machinae/1.0"}) | |
return self._session | |
def _get(self, url, data, **kwargs): | |
return self.session.get(url, params=data, **kwargs) | |
def _post(self, url, data, **kwargs): | |
return self.session.post(url, data=data, **kwargs) | |
def __init__(self, conf): | |
self.conf = conf | |
class JsonApi(Site): | |
def run(self, target): | |
runConf = self.conf.get("request") | |
method = runConf.get("method", "get").upper() | |
url = runConf.get("url", "").format(target=target) | |
if method == "GET": | |
r = self.session.get(url) | |
elif method == "POST": | |
data = runConf.get("data", "").format(target=target) | |
data = parse_qs(data) | |
r = self.session.post(url, data=data) | |
data = r.json() | |
results = list() | |
for parser in self.conf["results"]: | |
key = parser["key"] | |
if key in data: | |
result_dict = {key: data[key]} | |
result = Result(result_dict, parser["pretty_name"]) | |
if result not in results: | |
results.append(result) | |
return results | |
class Webscraper(Site): | |
def setup(self, target): | |
setupConf = self.conf["setup"] | |
method = setupConf.get("method", "get").upper() | |
url = setupConf.get("url", "").format(target=target) | |
if method == "GET": | |
r = self.session.get(url) | |
elif method == "POST": | |
data = setupConf.get("data", "").format(target=target) | |
data = parse_qs(data) | |
r = self.session.post(url, data=data) | |
def run(self, target): | |
if "setup" in self.conf: | |
self.setup(target) | |
runConf = self.conf.get("request") | |
method = runConf.get("method", "get").upper() | |
url = runConf.get("url", "").format(target=target) | |
headers = runConf.get("headers", {}) | |
if method == "GET": | |
r = self.session.get(url, headers=headers) | |
elif method == "POST": | |
data = runConf.get("data", "").format(target=target) | |
data = parse_qs(data) | |
r = self.session.post(url, data=data, headers=headers) | |
body = r.text | |
results = list() | |
for parser in self.conf["results"]: | |
rex = re.compile(parser["regex"], flags=re.I) | |
for match in rex.finditer(body): | |
result_dict = dict() | |
for (k, v) in zip(parser["values"], match.groups()): | |
result_dict[k] = v | |
result = Result(result_dict, parser["pretty_name"]) | |
if result not in results: | |
results.append(result) | |
return results | |
with open("default.yml", "r") as f: | |
conf = yaml.safe_load(f) | |
def target_type(target): | |
import socket | |
# IPv4 | |
try: | |
socket.inet_aton(target) | |
except: | |
pass | |
else: | |
return "ip" | |
# IPv6 | |
try: | |
socket.inet_pton(socket.AF_INET6, target) | |
except: | |
pass | |
else: | |
return "ip6" | |
# Hashes | |
if re.match("^[a-f0-9]{32}$", target, re.I): | |
return "hash" | |
elif re.match("^[a-f0-9]{40}$", target, re.I): | |
return "hash" | |
elif re.match("^[a-f0-9]{64}$", target, re.I): | |
return "hash" | |
elif re.match("^[a-f0-9]{128}$", target, re.I): | |
return "hash" | |
# URL | |
if re.match("^https?://?([\da-z\.-]+)\.([a-z\.]{2,6})([/\w \.-]*)*/?$", target, re.I): | |
return "url" | |
#TODO: Add email checking | |
#TODO: Add SSL checking | |
return "fqdn" | |
target = sys.argv[1] | |
otype = target_type(target) | |
for site in conf.values(): | |
site_otypes = map(lambda x: x.lower(), site["otypes"]) | |
if otype.lower() not in site_otypes: | |
continue | |
if "webscraper" in site: | |
scraper = Webscraper(site["webscraper"]) | |
elif "json" in site: | |
scraper = JsonApi(site["json"]) | |
results = scraper.run(target) | |
if not results: | |
print("[-] No {1} results for {0}".format(target, site["name"])) | |
continue | |
print("[+] {1} results for {0}".format(target, site["name"])) | |
for result in results: | |
if len(result[0].values()) > 1: | |
values = map(repr, result[0].values()) | |
output = "({0})".format(", ".join(values)) | |
else: | |
output = list(result[0].values())[0] | |
print(" [-] {1}: {0}".format(output, result[1])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment