Last active
September 26, 2022 14:19
-
-
Save bitonic/e0977aaa674523a9ffe0e77a96cd6cc7 to your computer and use it in GitHub Desktop.
race in `realpath`
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# If TEST_RACE = False, no exception is thrown. If it's True, it is. The only difference is | |
# a sleep in _joinrealpath. | |
TEST_RACE = False | |
import os | |
import time | |
import threading | |
import pathlib | |
def realpath(filename): | |
"""Return the canonical path of the specified filename, eliminating any | |
symbolic links encountered in the path.""" | |
filename = os.fspath(filename) | |
path, ok = _joinrealpath(filename[:0], filename, {}) | |
return os.path.abspath(path) | |
# Join two paths, normalizing and eliminating any symbolic links | |
# encountered in the second path. | |
def _joinrealpath(path, rest, seen): | |
if isinstance(path, bytes): | |
sep = b'/' | |
curdir = b'.' | |
pardir = b'..' | |
else: | |
sep = '/' | |
curdir = '.' | |
pardir = '..' | |
if os.path.isabs(rest): | |
rest = rest[1:] | |
path = sep | |
while rest: | |
name, _, rest = rest.partition(sep) | |
if not name or name == curdir: | |
# current dir | |
continue | |
if name == pardir: | |
# parent dir | |
if path: | |
path, name = os.path.split(path) | |
if name == pardir: | |
path = os.path.join(path, pardir, pardir) | |
else: | |
path = pardir | |
continue | |
newpath = os.path.join(path, name) | |
if not os.path.islink(newpath): | |
path = newpath | |
continue | |
if TEST_RACE: | |
time.sleep(5) | |
# Resolve the symbolic link | |
if newpath in seen: | |
# Already seen this path | |
path = seen[newpath] | |
if path is not None: | |
# use cached value | |
continue | |
# The symlink is not resolved, so we must have a symlink loop. | |
# Return already resolved part + rest of the path unchanged. | |
return os.path.join(newpath, rest), False | |
seen[newpath] = None # not resolved symlink | |
path, ok = _joinrealpath(path, os.readlink(newpath), seen) | |
if not ok: | |
return os.path.join(path, rest), False | |
seen[newpath] = path # resolved symlink | |
return path, True | |
link_dst = 'test-link-dst' | |
link_src = 'test-link-src' | |
try: | |
os.remove(link_dst) | |
os.remove(link_src) | |
except OSError: | |
pass | |
pathlib.Path(link_src).touch() | |
os.symlink(link_src, link_dst) | |
t = threading.Thread(target=realpath, args=(link_dst,)) | |
t.start() | |
time.sleep(1) | |
os.remove(link_dst) | |
t.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment