Skip to content

Instantly share code, notes, and snippets.

@alonsoir
Last active February 26, 2025 13:03
Show Gist options
  • Save alonsoir/c660e33681257ff95cce6d499372609f to your computer and use it in GitHub Desktop.
Save alonsoir/c660e33681257ff95cce6d499372609f to your computer and use it in GitHub Desktop.
The generic Dockerfile represents a hypothetical app that may have problems and the gh.action pipeline will review it with the help of a series of tools and a custom script so that with the help of an ollama it can try to generate patches, tests and iterative pull requests. The script will try to save each iteration to show Ollama where it could…
# Dockerfile
FROM python:3.8-slim
# Establecer un usuario no root
RUN adduser --disabled-password appuser
USER appuser
# Configurar el directorio de trabajo
WORKDIR /app
# Copiar solo los archivos necesarios para la instalación de dependencias
COPY requirements.txt .
# Instalar dependencias con un cache limpio
RUN pip install --no-cache-dir -r requirements.txt
# Copiar el resto del código
COPY --chown=appuser:appuser . .
# Exponer el puerto de la aplicación (ajustar según la aplicación)
EXPOSE 8000
# Establecer el comando de ejecución
CMD ["python", "your_app.py"]
# .github/workflows/ci-cd-security.yml
name: CI/CD Security Workflow
on:
push:
branches:
- main
jobs:
security-check:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: |
pip install -r requirements.txt
- name: Run Static Code Analysis
run: |
flake8 .
bandit -r .
- name: Run Dependency Check
run: |
safety check
- name: Run Tests
run: |
pytest
- name: Execute Secure CICD Automation
run: |
python secure_cicd.py # Ejecuta el script SecureCICD
- name: Build Docker Image
run: |
docker build -t my-app:latest .
- name: Run Security Scan on Docker Image
run: |
docker run --rm -v /var/run/docker.sock:/var/run/docker.sock \
aquasec/trivy my-app:latest
- name: Publish Image to Docker Registry
if: success() && github.ref == 'refs/heads/main'
run: |
docker tag my-app:latest myregistry/my-app:latest
docker push myregistry/my-app:latest
- name: Notify Vulnerabilities
if: failure()
run: |
echo "Vulnerabilities detected. Notifying team..."
# Enviar notificación a Slack o a otro canal
requests
websocket-client
colorlog
PyGithub
slack-sdk
pytest
flake8
bandit
safety
asyncio
sqlite3 # No es necesario incluirlo, ya que es parte de la librería estándar de Python.
import sqlite3
import json
import requests
import websocket
import logging
import time
import asyncio
import colorlog
from github import Github
from slack_sdk import WebClient
class SecureCICD:
def __init__(self, github_token, repo_name, slack_token, slack_channel, ollama_url):
self.github = Github(github_token)
self.repo = self.github.get_repo(repo_name)
self.slack = WebClient(token=slack_token)
self.slack_channel = slack_channel
self.ollama_url = ollama_url
self.ws = None
self.db_init()
self.setup_logging()
def setup_logging(self):
handler = colorlog.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter('%(log_color)s%(asctime)s - %(levelname)s - %(message)s',
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'bold_red'
}))
self.logger = colorlog.getLogger()
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def db_init(self):
self.conn = sqlite3.connect("history.db", check_same_thread=False)
self.cursor = self.conn.cursor()
self.cursor.execute('''CREATE TABLE IF NOT EXISTS patches (
id INTEGER PRIMARY KEY AUTOINCREMENT,
issue TEXT,
patch TEXT,
test TEXT,
status TEXT
)''')
self.conn.commit()
def get_patch_history(self, issue):
self.cursor.execute("SELECT patch, test, status FROM patches WHERE issue = ?", (issue,))
return self.cursor.fetchall()
def notify_slack(self, message):
try:
self.slack.chat_postMessage(channel=self.slack_channel, text=message)
self.logger.info(f"Slack notification sent: {message}")
except Exception as e:
self.logger.error(f"Failed to send Slack notification: {str(e)}")
async def connect_websocket(self):
try:
self.ws = websocket.create_connection(self.ollama_url)
self.logger.info("WebSocket connection established with Ollama")
except Exception as e:
self.logger.error(f"Error connecting to WebSocket: {str(e)}")
self.ws = None
async def close_websocket(self):
if self.ws:
self.ws.close()
self.logger.info("WebSocket connection closed")
async def request_patch(self, issue):
if not self.ws:
await self.connect_websocket()
if not self.ws:
return None, None
retries = 3
history = self.get_patch_history(issue)
history_str = "\n".join([f"Patch: {h[0]}, Test: {h[1]}, Status: {h[2]}" for h in history])
for attempt in range(retries):
try:
prompt = {
"prompt": f"Genera un parche y un test unitario para la siguiente vulnerabilidad: {issue}.\n\nHistorial de intentos previos:\n{history_str}\n\nAsegúrate de que el parche soluciona el problema y que el test verifica su corrección. La solución debe ser clara, mantenible y probada antes de enviarse."
}
self.ws.send(json.dumps(prompt))
response = json.loads(self.ws.recv())
patch, test = response.get("patch"), response.get("test")
if patch and test:
return patch, test
self.logger.warning(f"Received incomplete response from Ollama, retrying ({attempt+1}/{retries})")
await asyncio.sleep(2)
except Exception as e:
self.logger.error(f"Error communicating with Ollama: {str(e)}")
return None, None
def store_patch(self, issue, patch, test, status="pending"):
try:
self.cursor.execute("INSERT INTO patches (issue, patch, test, status) VALUES (?, ?, ?, ?)",
(issue, patch, test, status))
self.conn.commit()
self.logger.info(f"Patch stored for issue: {issue} with status {status}")
except Exception as e:
self.logger.error(f"Error storing patch in database: {str(e)}")
def create_pull_request(self, patch, test, branch_name):
try:
main_branch = self.repo.get_branch("main")
new_branch = self.repo.create_git_ref(ref=f'refs/heads/{branch_name}', sha=main_branch.commit.sha)
patch_file = self.repo.get_contents("patch.py", ref=branch_name)
test_file = self.repo.get_contents("test_patch.py", ref=branch_name)
self.repo.update_file(patch_file.path, "Applying patch", patch, patch_file.sha, branch=branch_name)
self.repo.update_file(test_file.path, "Adding test", test, test_file.sha, branch=branch_name)
pr = self.repo.create_pull(title=f"Patch for {branch_name}",
body="Automated patch and test submission",
head=branch_name,
base="main")
self.logger.info(f"Pull request created: {pr.html_url}")
return pr.html_url
except Exception as e:
self.logger.error(f"Error creating PR: {str(e)}")
return None
async def process_vulnerability(self, issue):
patch, test = await self.request_patch(issue)
if patch and test:
self.store_patch(issue, patch, test, "generated")
branch_name = f"patch-{issue.replace(' ', '-')}"
pr_url = self.create_pull_request(patch, test, branch_name)
if pr_url:
self.store_patch(issue, patch, test, "PR_created")
self.notify_slack(f"PR created: {pr_url}")
else:
self.notify_slack(f"Failed to create PR for {issue}")
self.store_patch(issue, patch, test, "failed_PR_creation")
else:
self.notify_slack(f"Failed to generate patch for {issue}")
self.store_patch(issue, None, None, "generation_failed")
def close(self):
self.conn.close()
asyncio.run(self.close_websocket())
self.logger.info("Database and WebSocket connections closed")
# Example usage
if __name__ == "__main__":
cicd = SecureCICD("github_token", "repo_name", "slack_token", "slack_channel", "ws://ollama.local")
asyncio.run(cicd.process_vulnerability("Critical security issue in auth module"))
cicd.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment