Last active
February 26, 2025 13:03
-
-
Save alonsoir/c660e33681257ff95cce6d499372609f to your computer and use it in GitHub Desktop.
The generic Dockerfile represents a hypothetical app that may have problems and the gh.action pipeline will review it with the help of a series of tools and a custom script so that with the help of an ollama it can try to generate patches, tests and iterative pull requests. The script will try to save each iteration to show Ollama where it could…
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Dockerfile | |
FROM python:3.8-slim | |
# Establecer un usuario no root | |
RUN adduser --disabled-password appuser | |
USER appuser | |
# Configurar el directorio de trabajo | |
WORKDIR /app | |
# Copiar solo los archivos necesarios para la instalación de dependencias | |
COPY requirements.txt . | |
# Instalar dependencias con un cache limpio | |
RUN pip install --no-cache-dir -r requirements.txt | |
# Copiar el resto del código | |
COPY --chown=appuser:appuser . . | |
# Exponer el puerto de la aplicación (ajustar según la aplicación) | |
EXPOSE 8000 | |
# Establecer el comando de ejecución | |
CMD ["python", "your_app.py"] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# .github/workflows/ci-cd-security.yml | |
name: CI/CD Security Workflow | |
on: | |
push: | |
branches: | |
- main | |
jobs: | |
security-check: | |
runs-on: ubuntu-latest | |
steps: | |
- name: Checkout code | |
uses: actions/checkout@v3 | |
- name: Set up Python | |
uses: actions/setup-python@v4 | |
with: | |
python-version: '3.9' | |
- name: Install dependencies | |
run: | | |
pip install -r requirements.txt | |
- name: Run Static Code Analysis | |
run: | | |
flake8 . | |
bandit -r . | |
- name: Run Dependency Check | |
run: | | |
safety check | |
- name: Run Tests | |
run: | | |
pytest | |
- name: Execute Secure CICD Automation | |
run: | | |
python secure_cicd.py # Ejecuta el script SecureCICD | |
- name: Build Docker Image | |
run: | | |
docker build -t my-app:latest . | |
- name: Run Security Scan on Docker Image | |
run: | | |
docker run --rm -v /var/run/docker.sock:/var/run/docker.sock \ | |
aquasec/trivy my-app:latest | |
- name: Publish Image to Docker Registry | |
if: success() && github.ref == 'refs/heads/main' | |
run: | | |
docker tag my-app:latest myregistry/my-app:latest | |
docker push myregistry/my-app:latest | |
- name: Notify Vulnerabilities | |
if: failure() | |
run: | | |
echo "Vulnerabilities detected. Notifying team..." | |
# Enviar notificación a Slack o a otro canal |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
requests | |
websocket-client | |
colorlog | |
PyGithub | |
slack-sdk | |
pytest | |
flake8 | |
bandit | |
safety | |
asyncio | |
sqlite3 # No es necesario incluirlo, ya que es parte de la librería estándar de Python. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sqlite3 | |
import json | |
import requests | |
import websocket | |
import logging | |
import time | |
import asyncio | |
import colorlog | |
from github import Github | |
from slack_sdk import WebClient | |
class SecureCICD: | |
def __init__(self, github_token, repo_name, slack_token, slack_channel, ollama_url): | |
self.github = Github(github_token) | |
self.repo = self.github.get_repo(repo_name) | |
self.slack = WebClient(token=slack_token) | |
self.slack_channel = slack_channel | |
self.ollama_url = ollama_url | |
self.ws = None | |
self.db_init() | |
self.setup_logging() | |
def setup_logging(self): | |
handler = colorlog.StreamHandler() | |
handler.setFormatter(colorlog.ColoredFormatter('%(log_color)s%(asctime)s - %(levelname)s - %(message)s', | |
log_colors={ | |
'DEBUG': 'cyan', | |
'INFO': 'green', | |
'WARNING': 'yellow', | |
'ERROR': 'red', | |
'CRITICAL': 'bold_red' | |
})) | |
self.logger = colorlog.getLogger() | |
self.logger.addHandler(handler) | |
self.logger.setLevel(logging.INFO) | |
def db_init(self): | |
self.conn = sqlite3.connect("history.db", check_same_thread=False) | |
self.cursor = self.conn.cursor() | |
self.cursor.execute('''CREATE TABLE IF NOT EXISTS patches ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
issue TEXT, | |
patch TEXT, | |
test TEXT, | |
status TEXT | |
)''') | |
self.conn.commit() | |
def get_patch_history(self, issue): | |
self.cursor.execute("SELECT patch, test, status FROM patches WHERE issue = ?", (issue,)) | |
return self.cursor.fetchall() | |
def notify_slack(self, message): | |
try: | |
self.slack.chat_postMessage(channel=self.slack_channel, text=message) | |
self.logger.info(f"Slack notification sent: {message}") | |
except Exception as e: | |
self.logger.error(f"Failed to send Slack notification: {str(e)}") | |
async def connect_websocket(self): | |
try: | |
self.ws = websocket.create_connection(self.ollama_url) | |
self.logger.info("WebSocket connection established with Ollama") | |
except Exception as e: | |
self.logger.error(f"Error connecting to WebSocket: {str(e)}") | |
self.ws = None | |
async def close_websocket(self): | |
if self.ws: | |
self.ws.close() | |
self.logger.info("WebSocket connection closed") | |
async def request_patch(self, issue): | |
if not self.ws: | |
await self.connect_websocket() | |
if not self.ws: | |
return None, None | |
retries = 3 | |
history = self.get_patch_history(issue) | |
history_str = "\n".join([f"Patch: {h[0]}, Test: {h[1]}, Status: {h[2]}" for h in history]) | |
for attempt in range(retries): | |
try: | |
prompt = { | |
"prompt": f"Genera un parche y un test unitario para la siguiente vulnerabilidad: {issue}.\n\nHistorial de intentos previos:\n{history_str}\n\nAsegúrate de que el parche soluciona el problema y que el test verifica su corrección. La solución debe ser clara, mantenible y probada antes de enviarse." | |
} | |
self.ws.send(json.dumps(prompt)) | |
response = json.loads(self.ws.recv()) | |
patch, test = response.get("patch"), response.get("test") | |
if patch and test: | |
return patch, test | |
self.logger.warning(f"Received incomplete response from Ollama, retrying ({attempt+1}/{retries})") | |
await asyncio.sleep(2) | |
except Exception as e: | |
self.logger.error(f"Error communicating with Ollama: {str(e)}") | |
return None, None | |
def store_patch(self, issue, patch, test, status="pending"): | |
try: | |
self.cursor.execute("INSERT INTO patches (issue, patch, test, status) VALUES (?, ?, ?, ?)", | |
(issue, patch, test, status)) | |
self.conn.commit() | |
self.logger.info(f"Patch stored for issue: {issue} with status {status}") | |
except Exception as e: | |
self.logger.error(f"Error storing patch in database: {str(e)}") | |
def create_pull_request(self, patch, test, branch_name): | |
try: | |
main_branch = self.repo.get_branch("main") | |
new_branch = self.repo.create_git_ref(ref=f'refs/heads/{branch_name}', sha=main_branch.commit.sha) | |
patch_file = self.repo.get_contents("patch.py", ref=branch_name) | |
test_file = self.repo.get_contents("test_patch.py", ref=branch_name) | |
self.repo.update_file(patch_file.path, "Applying patch", patch, patch_file.sha, branch=branch_name) | |
self.repo.update_file(test_file.path, "Adding test", test, test_file.sha, branch=branch_name) | |
pr = self.repo.create_pull(title=f"Patch for {branch_name}", | |
body="Automated patch and test submission", | |
head=branch_name, | |
base="main") | |
self.logger.info(f"Pull request created: {pr.html_url}") | |
return pr.html_url | |
except Exception as e: | |
self.logger.error(f"Error creating PR: {str(e)}") | |
return None | |
async def process_vulnerability(self, issue): | |
patch, test = await self.request_patch(issue) | |
if patch and test: | |
self.store_patch(issue, patch, test, "generated") | |
branch_name = f"patch-{issue.replace(' ', '-')}" | |
pr_url = self.create_pull_request(patch, test, branch_name) | |
if pr_url: | |
self.store_patch(issue, patch, test, "PR_created") | |
self.notify_slack(f"PR created: {pr_url}") | |
else: | |
self.notify_slack(f"Failed to create PR for {issue}") | |
self.store_patch(issue, patch, test, "failed_PR_creation") | |
else: | |
self.notify_slack(f"Failed to generate patch for {issue}") | |
self.store_patch(issue, None, None, "generation_failed") | |
def close(self): | |
self.conn.close() | |
asyncio.run(self.close_websocket()) | |
self.logger.info("Database and WebSocket connections closed") | |
# Example usage | |
if __name__ == "__main__": | |
cicd = SecureCICD("github_token", "repo_name", "slack_token", "slack_channel", "ws://ollama.local") | |
asyncio.run(cicd.process_vulnerability("Critical security issue in auth module")) | |
cicd.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment