Created
February 24, 2024 18:48
-
-
Save BlackthornYugen/064771731f8c75deb78e6d3da4df82c6 to your computer and use it in GitHub Desktop.
Python modify haproxy acls
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
backend be_ipwhitelist | |
option forwardfor | |
http-request set-header x-forwarded-proto %[ssl_fc,iif(https,http)] | |
acl is_authorized src -f /etc/haproxy/ip_pass.lst | |
acl is_authorized http_auth(basic-auth-list) | |
acl is_logout_path path_end logout | |
http-request auth realm myrealm.example.org unless is_authorized !is_logout_path | |
http-request add-header Cache-Control no-cache | |
server python1 127.0.0.1:5333 check |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<!doctype html> | |
<html> | |
<head> | |
<title>IP Allowlist</title> | |
</head> | |
<body> | |
<h2>Allow an IP Address</h2> | |
<form action="/whitelist" method="post"> | |
<label for="ip">IP Address:</label> | |
<input type="text" id="ip" name="ip" value="{{ user_ip }}"> | |
<input type="submit" value="Whitelist"> | |
</form> | |
<h3>Allowed IPs</h3> | |
<ul> | |
{% for item in whitelisted_ips_info %} | |
<li>{{ item.ip }} | |
{% if not item.from_disk %} | |
<form action="/delete" method="post" style="display:inline;"> | |
<input type="hidden" name="ip" value="{{ item.ip }}"> | |
<input type="submit" value="Delete"> | |
</form> | |
{% endif %} | |
</li> | |
{% endfor %} | |
</ul> | |
{% with messages = get_flashed_messages(category_filter=["ip_status"], with_categories=true)%} | |
{% if messages%} | |
{% for _, last_ip_status in messages%} | |
<p>{{last_ip_status}}</p> | |
{%endfor%} | |
{%endif%} | |
{%endwith%} | |
</body> | |
</html> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from werkzeug.middleware.proxy_fix import ProxyFix | |
from flask import Flask, flash, request, render_template, redirect, url_for | |
import logging | |
import os | |
import socket | |
# Initialize Flask app | |
app = Flask(__name__) | |
# Apply ProxyFix to respect the X-Forwarded-Proto header | |
app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1) | |
# Setup logging | |
logging.basicConfig(level=logging.DEBUG) | |
# Initialize list to store IPs | |
whitelisted_ips = [] | |
def read_whitelist_from_disk(): | |
whitelist_path = '/etc/haproxy/ip_pass.lst' | |
try: | |
with open(whitelist_path, 'r') as file: | |
whitelisted_ips = file.read().splitlines() | |
except Exception as e: | |
app.logger.error(f'Failed to read whitelist from disk: {e}') | |
whitelisted_ips = [] | |
return whitelisted_ips | |
def get_client_ip(): | |
if "X-Forwarded-For" in request.headers: | |
# In some cases, the 'X-Forwarded-For' header can contain multiple IPs | |
# separated by commas. The first one is the original client IP. | |
ip = request.headers["X-Forwarded-For"].split(",")[0] | |
else: | |
# Fallback to the direct connection IP if the header is missing. | |
ip = request.remote_addr | |
return ip | |
def send_command_to_socket(command): | |
# Make sure haproxy has a socket that you can read/write to, | |
# you could add the following to /etc/haproxy/haproxy.cfg: | |
# stats socket /var/lib/haproxy/stats mode 660 group wheel | |
# https://sleeplessbeastie.eu/2020/01/29/how-to-use-haproxy-stats-socket/ | |
# https://docs.haproxy.org/2.4/management.html#9.3 | |
socket_path = os.getenv("TARGET_SOCK", "/var/lib/haproxy/stats") | |
response = "" | |
try: | |
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client_socket: | |
client_socket.connect(socket_path) | |
# Ensure the command ends with a newline character | |
if not command.endswith('\n'): | |
command += '\n' | |
client_socket.sendall(command.encode('utf-8')) | |
# Wait for the response | |
response = client_socket.recv(4096).decode('utf-8') # Adjust buffer size as needed | |
except Exception as e: | |
app.logger.error(f"Failed to send command to socket: {e}") | |
return response | |
def fetch_whitelisted_ips(): | |
global whitelisted_ips | |
data = send_command_to_socket("show acl /etc/haproxy/ip_pass.lst\n") | |
# Parse the output to update the whitelisted_ips list | |
whitelisted_ips = [line.split()[1] for line in data.strip().split('\n')] | |
return whitelisted_ips | |
def get_dynamic_whitelisted_ips(): | |
whitelisted_ips = fetch_whitelisted_ips() | |
disk_whitelisted_ips = read_whitelist_from_disk() # IPs read from the disk | |
# Remove IPs that are saved on disk from the whitelisted_ips list | |
dynamic_whitelisted_ips = [ip for ip in whitelisted_ips if ip not in disk_whitelisted_ips] | |
return dynamic_whitelisted_ips | |
@app.route('/') | |
def home(): | |
user_ip = get_client_ip() | |
disk_whitelisted_ips = read_whitelist_from_disk() | |
all_whitelisted_ips = fetch_whitelisted_ips() # Assume this fetches all whitelisted IPs | |
# Mark each IP with whether it is from disk | |
whitelisted_ips_info = [ | |
{'ip': ip, 'from_disk': (ip in disk_whitelisted_ips)} | |
for ip in all_whitelisted_ips | |
] | |
return render_template("whitelist-ips.html", user_ip=user_ip, whitelisted_ips_info=whitelisted_ips_info) | |
@app.route('/whitelist', methods=['POST']) | |
def whitelist(): | |
ip_address = request.form['ip'] | |
disk_whitelisted_ips = read_whitelist_from_disk() | |
current_whitelisted_ips = fetch_whitelisted_ips() | |
if ip_address in current_whitelisted_ips: | |
app.logger.info(f'IP {ip_address} is already whitelisted.') | |
flash(f"{ip_address} can already connect.", 'ip_status') | |
else: | |
# Construct the command to add the IP to the ACL | |
command = f"add acl /etc/haproxy/ip_pass.lst {ip_address}\n" | |
response = send_command_to_socket(command) | |
app.logger.debug(f'Whitelisted {ip_address}') | |
flash(f"{ip_address} can now connect.", 'ip_status') | |
return redirect(url_for('home')) | |
@app.route('/delete', methods=['POST']) | |
def delete_ip(): | |
ip_to_delete = request.form['ip'] | |
if ip_to_delete in read_whitelist_from_disk(): | |
app.logger.debug(f'IP cannot be removed from whitelist: {ip_to_delete}') | |
flash(f'IP cannot be removed from whitelist: {ip_to_delete}', 'ip_status') | |
return redirect(url_for('home')) | |
# Construct the command to delete the IP from the ACL | |
command = f"del acl /etc/haproxy/ip_pass.lst {ip_to_delete}" | |
response = send_command_to_socket(command) | |
app.logger.debug(f'Deleted {ip_to_delete}') | |
message = f'{ip_to_delete} can no longer connect.' | |
flash(message, 'ip_status') | |
app.logger.debug(message) | |
return redirect(url_for('home')) | |
if __name__ == '__main__': | |
# You'll need to set this for flash messages | |
# app.config['SECRET_KEY'] = "" | |
app.run("127.0.0.1", port=5333, debug=True) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment