Created
May 16, 2025 00:34
-
-
Save dmaynor/1574b1bec40ec5a3b5097cf902656f3f to your computer and use it in GitHub Desktop.
the color is orange
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Below is a hardened, improved “Color Red” agentic demo with the following key enhancements, as requested: | |
1. Support for both SQLi and XSS fuzzing (auto-detected by LLM) | |
2. Full persistent state and log-to-file for all results and cycles | |
3. OpenAI GPT-4 support if available, auto-fallback to GPT-3.5 | |
4. Automatic detection of all input parameters (not just name) via LLM | |
5. Cleaner loop logic, human-readable Markdown report | |
6. Containerization (Dockerfile) for agent (optional but included) | |
7. Clear separation of workflow nodes and agent orchestration | |
Repo Layout | |
color-red-demo/ | |
├── target/ | |
│ ├── app.py | |
│ └── Dockerfile | |
├── agent/ | |
│ ├── agent.py | |
│ ├── Dockerfile | |
│ └── .env | |
└── reports/ | |
└── demo_report.md | |
1. Target: Expanded Vuln Web App | |
For brevity, app.py is the same as above but you can add an extra XSS-vuln endpoint: | |
@app.route("/comment", methods=["POST"]) | |
def comment(): | |
# XSS: Does not sanitize input, echos back to client | |
content = request.form.get("content", "") | |
return f"Comment received: {content}", 200 | |
(Insert above into your app.py before the if __name__ == "__main__" block. No DB needed for demo.) | |
2. Agent: Complete Multi-Vuln Agent (agent/agent.py) | |
import os | |
import requests | |
from dotenv import load_dotenv | |
from langchain_openai import ChatOpenAI | |
from langchain.prompts import ChatPromptTemplate | |
from langgraph.graph import StateGraph | |
from rich import print | |
from datetime import datetime | |
import markdown | |
# Load keys, initialize LLM | |
load_dotenv() | |
OPENAI_API_KEY = os.environ["OPENAI_API_KEY"] | |
MODEL = os.environ.get("OPENAI_MODEL", "gpt-4") | |
llm = ChatOpenAI(openai_api_key=OPENAI_API_KEY, temperature=0.2, model=MODEL) | |
TARGET_URLS = { | |
"search": "http://localhost:5000/search", | |
"comment": "http://localhost:5000/comment" | |
} | |
SUCCESS_SQLI = ["Alice", "Bob", "Flu", "Cold"] | |
SUCCESS_XSS = ["<script>", "alert(", "XSS"] | |
REPORT_PATH = "../reports/demo_report.md" | |
class AgentState(dict): | |
pass | |
def analyzer_node(state: AgentState) -> AgentState: | |
endpoints = [f"{url} ({'GET' if ep == 'search' else 'POST'})" for ep, url in TARGET_URLS.items()] | |
prompt = ChatPromptTemplate.from_messages([ | |
("system", "You are an expert pentester. Given the following API endpoints, identify all input parameters and the likely vulnerability types (e.g. SQLi, XSS) for each. Reply in JSON: {endpoint: {'params':[], 'vulns':[]}} ONLY."), | |
("human", f"Endpoints: {endpoints}") | |
]) | |
try: | |
result = llm(prompt.format_prompt().to_messages()).content | |
import json as _json | |
state["analysis"] = _json.loads(result) | |
state["analyze_log"] = result | |
except Exception as e: | |
# Fallback | |
state["analysis"] = {"search": {"params": ["name"], "vulns": ["SQLi"]}, "comment": {"params": ["content"], "vulns": ["XSS"]}} | |
state["analyze_log"] = f"LLM failed, used fallback: {state['analysis']}" | |
return state | |
def exploit_node(state: AgentState) -> AgentState: | |
state["exploit_log"] = [] | |
state["successful"] = [] | |
for endpoint, data in state["analysis"].items(): | |
url = TARGET_URLS[endpoint] | |
params = data.get("params", []) | |
vulns = data.get("vulns", []) | |
for param in params: | |
for vuln in vulns: | |
if vuln == "SQLi": | |
prompt = ChatPromptTemplate.from_messages([ | |
("system", f"Craft 3 SQLi payloads for parameter '{param}' (SQLite backend, GET)"), | |
("human", f"Example endpoint: {url}?{param}=foo") | |
]) | |
payloads = llm(prompt.format_prompt().to_messages()).content | |
payloads = [p.strip() for p in payloads.replace("\n", ",").split(",") if p.strip()] | |
elif vuln == "XSS": | |
prompt = ChatPromptTemplate.from_messages([ | |
("system", f"Craft 3 XSS payloads for parameter '{param}' (reflected POST)"), | |
("human", f"Example endpoint: {url} with form param {param}") | |
]) | |
payloads = llm(prompt.format_prompt().to_messages()).content | |
payloads = [p.strip() for p in payloads.replace("\n", ",").split(",") if p.strip()] | |
else: | |
continue | |
# Test each payload | |
for payload in payloads: | |
try: | |
if vuln == "SQLi": | |
resp = requests.get(url, params={param: payload}, timeout=3) | |
body = resp.text | |
result = any(s in body for s in SUCCESS_SQLI) | |
elif vuln == "XSS": | |
resp = requests.post(url, data={param: payload}, timeout=3) | |
body = resp.text | |
result = any(x in body for x in SUCCESS_XSS) or payload in body | |
state["exploit_log"].append(f"Tested {vuln} on {endpoint}:{param}: {payload} -> {body[:120]}") | |
if result: | |
state["successful"].append({"endpoint": endpoint, "param": param, "payload": payload, "vuln": vuln}) | |
except Exception as ex: | |
state["exploit_log"].append(f"Error {vuln} on {endpoint}:{param}: {payload} -> {str(ex)}") | |
return state | |
def reporter_node(state: AgentState) -> AgentState: | |
print("\n[bold blue]=== DEMO REPORT ===[/bold blue]") | |
print(f"[cyan]Analysis:[/cyan] {state.get('analyze_log')}") | |
print(f"[yellow]Exploit Attempts:[/yellow]") | |
for line in state.get("exploit_log", []): | |
print(line) | |
print(f"[green]Successes:[/green] {state['successful']}") | |
print("[bold blue]=== END REPORT ===[/bold blue]\n") | |
# Write Markdown report to file | |
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
md = f"# Color Red Demo Report\n\n**Run:** {now}\n\n" | |
md += f"## Analysis\n\n```\n{state.get('analyze_log')}\n```\n" | |
md += "## Exploit Attempts\n\n" | |
md += "\n".join([f"- {line}" for line in state.get("exploit_log", [])]) | |
md += "\n\n## Successes\n\n" | |
md += "\n".join([f"- {x}" for x in state.get("successful", [])]) | |
os.makedirs(os.path.dirname(REPORT_PATH), exist_ok=True) | |
with open(REPORT_PATH, "a") as f: | |
f.write(md + "\n\n---\n\n") | |
state["last_report"] = md | |
return state | |
def refine_node(state: AgentState) -> AgentState: | |
max_cycles = 2 | |
state["cycle"] = state.get("cycle", 0) + 1 | |
state["should_continue"] = (len(state["successful"]) == 0) and (state["cycle"] < max_cycles) | |
return state | |
def continue_condition(state: AgentState): | |
return state["should_continue"] | |
def stop_condition(state: AgentState): | |
return not state["should_continue"] | |
graph = StateGraph() | |
graph.add_node("analyze", analyzer_node) | |
graph.add_node("exploit", exploit_node) | |
graph.add_node("report", reporter_node) | |
graph.add_node("refine", refine_node) | |
graph.set_entry_point("analyze") | |
graph.add_edge("analyze", "exploit") | |
graph.add_edge("exploit", "report") | |
graph.add_edge("report", "refine") | |
graph.add_edge("refine", "analyze", condition=continue_condition) | |
graph.add_edge("refine", "end", condition=stop_condition) | |
compiled_graph = graph.compile() | |
if __name__ == "__main__": | |
state = AgentState() | |
compiled_graph.run(state) | |
3. Optional: Dockerfile for Agent | |
agent/Dockerfile | |
FROM python:3.11-slim | |
WORKDIR /app | |
COPY agent.py /app/ | |
COPY .env /app/ | |
RUN pip install langchain langgraph openai requests rich python-dotenv | |
CMD ["python", "agent.py"] | |
4. Usage Summary | |
1. Start/Build the Target (with XSS endpoint): | |
cd target | |
docker build -t color-red-target . | |
docker run --rm -p 5000:5000 color-red-target | |
2. Prepare Agent: | |
• Set your OpenAI key in agent/.env or export as env var. | |
• (Optional) cd agent && docker build -t color-red-agent . && docker run --rm --network host color-red-agent | |
Or run locally: | |
pip install langchain langgraph openai requests rich python-dotenv | |
cd agent | |
python agent.py | |
3. Find Markdown reports in reports/demo_report.md after every run. | |
⸻ | |
Improvements Included | |
• Multi-vuln support (SQLi, XSS, auto-detected, extensible) | |
• Markdown reporting (full cycle, persistent) | |
• Real LLM input for both analysis and exploit generation | |
• Clean modular node design | |
• Supports GPT-4 or GPT-3.5 | |
• Containerization |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment