Skip to content

Instantly share code, notes, and snippets.

@dmaynor
Created May 16, 2025 00:34
Show Gist options
  • Save dmaynor/1574b1bec40ec5a3b5097cf902656f3f to your computer and use it in GitHub Desktop.
Save dmaynor/1574b1bec40ec5a3b5097cf902656f3f to your computer and use it in GitHub Desktop.
the color is orange
Below is a hardened, improved “Color Red” agentic demo with the following key enhancements, as requested:
1. Support for both SQLi and XSS fuzzing (auto-detected by LLM)
2. Full persistent state and log-to-file for all results and cycles
3. OpenAI GPT-4 support if available, auto-fallback to GPT-3.5
4. Automatic detection of all input parameters (not just name) via LLM
5. Cleaner loop logic, human-readable Markdown report
6. Containerization (Dockerfile) for agent (optional but included)
7. Clear separation of workflow nodes and agent orchestration
Repo Layout
color-red-demo/
├── target/
│ ├── app.py
│ └── Dockerfile
├── agent/
│ ├── agent.py
│ ├── Dockerfile
│ └── .env
└── reports/
└── demo_report.md
1. Target: Expanded Vuln Web App
For brevity, app.py is the same as above but you can add an extra XSS-vuln endpoint:
@app.route("/comment", methods=["POST"])
def comment():
# XSS: Does not sanitize input, echos back to client
content = request.form.get("content", "")
return f"Comment received: {content}", 200
(Insert above into your app.py before the if __name__ == "__main__" block. No DB needed for demo.)
2. Agent: Complete Multi-Vuln Agent (agent/agent.py)
import os
import requests
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langgraph.graph import StateGraph
from rich import print
from datetime import datetime
import markdown
# Load keys, initialize LLM
load_dotenv()
OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]
MODEL = os.environ.get("OPENAI_MODEL", "gpt-4")
llm = ChatOpenAI(openai_api_key=OPENAI_API_KEY, temperature=0.2, model=MODEL)
TARGET_URLS = {
"search": "http://localhost:5000/search",
"comment": "http://localhost:5000/comment"
}
SUCCESS_SQLI = ["Alice", "Bob", "Flu", "Cold"]
SUCCESS_XSS = ["<script>", "alert(", "XSS"]
REPORT_PATH = "../reports/demo_report.md"
class AgentState(dict):
pass
def analyzer_node(state: AgentState) -> AgentState:
endpoints = [f"{url} ({'GET' if ep == 'search' else 'POST'})" for ep, url in TARGET_URLS.items()]
prompt = ChatPromptTemplate.from_messages([
("system", "You are an expert pentester. Given the following API endpoints, identify all input parameters and the likely vulnerability types (e.g. SQLi, XSS) for each. Reply in JSON: {endpoint: {'params':[], 'vulns':[]}} ONLY."),
("human", f"Endpoints: {endpoints}")
])
try:
result = llm(prompt.format_prompt().to_messages()).content
import json as _json
state["analysis"] = _json.loads(result)
state["analyze_log"] = result
except Exception as e:
# Fallback
state["analysis"] = {"search": {"params": ["name"], "vulns": ["SQLi"]}, "comment": {"params": ["content"], "vulns": ["XSS"]}}
state["analyze_log"] = f"LLM failed, used fallback: {state['analysis']}"
return state
def exploit_node(state: AgentState) -> AgentState:
state["exploit_log"] = []
state["successful"] = []
for endpoint, data in state["analysis"].items():
url = TARGET_URLS[endpoint]
params = data.get("params", [])
vulns = data.get("vulns", [])
for param in params:
for vuln in vulns:
if vuln == "SQLi":
prompt = ChatPromptTemplate.from_messages([
("system", f"Craft 3 SQLi payloads for parameter '{param}' (SQLite backend, GET)"),
("human", f"Example endpoint: {url}?{param}=foo")
])
payloads = llm(prompt.format_prompt().to_messages()).content
payloads = [p.strip() for p in payloads.replace("\n", ",").split(",") if p.strip()]
elif vuln == "XSS":
prompt = ChatPromptTemplate.from_messages([
("system", f"Craft 3 XSS payloads for parameter '{param}' (reflected POST)"),
("human", f"Example endpoint: {url} with form param {param}")
])
payloads = llm(prompt.format_prompt().to_messages()).content
payloads = [p.strip() for p in payloads.replace("\n", ",").split(",") if p.strip()]
else:
continue
# Test each payload
for payload in payloads:
try:
if vuln == "SQLi":
resp = requests.get(url, params={param: payload}, timeout=3)
body = resp.text
result = any(s in body for s in SUCCESS_SQLI)
elif vuln == "XSS":
resp = requests.post(url, data={param: payload}, timeout=3)
body = resp.text
result = any(x in body for x in SUCCESS_XSS) or payload in body
state["exploit_log"].append(f"Tested {vuln} on {endpoint}:{param}: {payload} -> {body[:120]}")
if result:
state["successful"].append({"endpoint": endpoint, "param": param, "payload": payload, "vuln": vuln})
except Exception as ex:
state["exploit_log"].append(f"Error {vuln} on {endpoint}:{param}: {payload} -> {str(ex)}")
return state
def reporter_node(state: AgentState) -> AgentState:
print("\n[bold blue]=== DEMO REPORT ===[/bold blue]")
print(f"[cyan]Analysis:[/cyan] {state.get('analyze_log')}")
print(f"[yellow]Exploit Attempts:[/yellow]")
for line in state.get("exploit_log", []):
print(line)
print(f"[green]Successes:[/green] {state['successful']}")
print("[bold blue]=== END REPORT ===[/bold blue]\n")
# Write Markdown report to file
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
md = f"# Color Red Demo Report\n\n**Run:** {now}\n\n"
md += f"## Analysis\n\n```\n{state.get('analyze_log')}\n```\n"
md += "## Exploit Attempts\n\n"
md += "\n".join([f"- {line}" for line in state.get("exploit_log", [])])
md += "\n\n## Successes\n\n"
md += "\n".join([f"- {x}" for x in state.get("successful", [])])
os.makedirs(os.path.dirname(REPORT_PATH), exist_ok=True)
with open(REPORT_PATH, "a") as f:
f.write(md + "\n\n---\n\n")
state["last_report"] = md
return state
def refine_node(state: AgentState) -> AgentState:
max_cycles = 2
state["cycle"] = state.get("cycle", 0) + 1
state["should_continue"] = (len(state["successful"]) == 0) and (state["cycle"] < max_cycles)
return state
def continue_condition(state: AgentState):
return state["should_continue"]
def stop_condition(state: AgentState):
return not state["should_continue"]
graph = StateGraph()
graph.add_node("analyze", analyzer_node)
graph.add_node("exploit", exploit_node)
graph.add_node("report", reporter_node)
graph.add_node("refine", refine_node)
graph.set_entry_point("analyze")
graph.add_edge("analyze", "exploit")
graph.add_edge("exploit", "report")
graph.add_edge("report", "refine")
graph.add_edge("refine", "analyze", condition=continue_condition)
graph.add_edge("refine", "end", condition=stop_condition)
compiled_graph = graph.compile()
if __name__ == "__main__":
state = AgentState()
compiled_graph.run(state)
3. Optional: Dockerfile for Agent
agent/Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY agent.py /app/
COPY .env /app/
RUN pip install langchain langgraph openai requests rich python-dotenv
CMD ["python", "agent.py"]
4. Usage Summary
1. Start/Build the Target (with XSS endpoint):
cd target
docker build -t color-red-target .
docker run --rm -p 5000:5000 color-red-target
2. Prepare Agent:
• Set your OpenAI key in agent/.env or export as env var.
• (Optional) cd agent && docker build -t color-red-agent . && docker run --rm --network host color-red-agent
Or run locally:
pip install langchain langgraph openai requests rich python-dotenv
cd agent
python agent.py
3. Find Markdown reports in reports/demo_report.md after every run.
Improvements Included
• Multi-vuln support (SQLi, XSS, auto-detected, extensible)
• Markdown reporting (full cycle, persistent)
• Real LLM input for both analysis and exploit generation
• Clean modular node design
• Supports GPT-4 or GPT-3.5
• Containerization
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment