Last active
January 1, 2016 10:48
-
-
Save dimiro1/8133516 to your computer and use it in GitHub Desktop.
Este script ajuda a proteger seus servidores de ataques DOS. Uso netstat para descobrir a quantidade de conexões que determinado IP tem aberto, e uso o iptables como firewall. O script deve rodar como ROOT, já que precisa adicionar regras no firewall. USO: Rode o script com o supervisor, upstart ou systemd, ele tem várias opções de configuração …
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# Copyright (C) 2013 by Claudemiro Alves Feitosa Neto | |
# <[email protected]> | |
# | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with this program. If not, see <http://www.gnu.org/licences> | |
# This script must be executed as root. | |
# Run with Supervisor. | |
import time | |
import argparse | |
import subprocess | |
from functools import wraps | |
import logging | |
NETSTAT = "/usr/bin/env netstat" | |
IPTABLES = "/usr/bin/env iptables" | |
# When an ip reach this limit, it will be droppped | |
MAX_CONNECTIONS = 100 | |
# Check interval in seconds | |
INTERVAL = 3 | |
# Ips in this list will not be dropped | |
WHITELIST = ['127.0.0.1'] | |
def add_to_whitelist(ip): | |
"""Append an IP to whitelist | |
""" | |
WHITELIST.append(ip) | |
def run_in_a_shell(f): | |
"""A Helper decorator to run a shell command. | |
""" | |
@wraps(f) | |
def wrapper(*args): | |
return subprocess.check_output(f(*args), shell=True) | |
return wrapper | |
@run_in_a_shell | |
def iptables(args): | |
return "%s %s" % (IPTABLES, args) | |
@run_in_a_shell | |
def netstat(args): | |
return "%s %s" % (NETSTAT, args) | |
def drop(ip): | |
"""Run the iptables DROP command | |
""" | |
if not ip in WHITELIST: | |
command = "-A INPUT -s %s -j DROP" % ip | |
logging.debug(command) | |
iptables(command) | |
else: | |
logging.debug("IP in WHITELIST") | |
def free(ip): | |
"""Run the iptables DROP command with a -D flag | |
""" | |
command = '-D INPUT -s %s -j DROP' % ip | |
logging.debug(command) | |
iptables(command) | |
def check_dropped(ip): | |
logging.debug('Chcking Droppped') | |
return list_dropped().find(ip) >= 0 | |
def list_dropped(): | |
logging.debug('Listing Dropped') | |
return iptables("-L INPUT -v -n | awk '{print $8}' | tail -n +3") | |
def step(): | |
netstat_ips = netstat( | |
"-ntu | tail -n +3 | awk '{print $5}' | cut -d: -f1 | sort | uniq -c | sort -nr").strip() | |
for line in netstat_ips.split("\n"): | |
conections, ip = line.strip().split(" ") | |
if int(conections) > MAX_CONNECTIONS: | |
if not check_dropped(ip): | |
logging.info("Dropping %s" % ip) | |
drop(ip) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
'-r', '--run', action='store_true', help='Run') | |
parser.add_argument( | |
'-l', '--list', action='store_true', help='List dropped ips') | |
parser.add_argument( | |
'-d', '--drop', nargs='+', help='Drop ip', metavar='ip') | |
parser.add_argument( | |
'-f', '--free', nargs='+', help='Free ip', metavar='ip') | |
parser.add_argument( | |
'-w', '--whitelist', nargs='+', help='Whitelist', metavar='ip') | |
parser.add_argument( | |
'-i', '--interval', help='Interval, in seconds', type=int) | |
parser.add_argument( | |
'-m', '--max-connections', help='Max Connections', type=int) | |
parser.add_argument( | |
'-c', '--check', help='Check ip', metavar='ip') | |
parser.add_argument( | |
'--netstat-bin', help='Netstat Binary', metavar='location') | |
parser.add_argument( | |
'--iptables-bin', help='Iptables Binary', metavar='location') | |
args = parser.parse_args() | |
logging.debug(args) | |
# NETSTAT BINARY | |
if args.netstat_bin is not None: | |
NETSTAT = args.netstat_bin | |
# IPTABLES BINARY | |
if args.iptables_bin is not None: | |
IPTABLES = args.iptables_bin | |
# INTERVAL | |
if args.interval is not None: | |
INTERVAL = args.interval | |
# MAX CONNECTIONS | |
if args.max_connections is not None: | |
MAX_CONNECTIONS = args.max_connections | |
# DROP | |
if args.drop is not None: | |
for ip in args.drop: | |
drop(ip) | |
# FREE | |
if args.free is not None: | |
for ip in args.free: | |
free(ip) | |
# WHITELIST | |
if args.whitelist is not None: | |
for ip in args.whitelist: | |
add_to_whitelist(ip) | |
# LIST DROPPED | |
if args.list: | |
print list_dropped() | |
# CHECK IP | |
if args.check: | |
print check_dropped(args.check) | |
# RUN | |
if args.run: | |
while True: | |
step() | |
time.sleep(INTERVAL) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
3 80.81.82.83 | |
101 20.21.22.23 | |
290 90.91.92.93 | |
1 50.51.52.53 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import unittest | |
from mock import patch | |
from dosblocker import drop, check_dropped, free, list_dropped, add_to_whitelist, netstat, step | |
class DosBlockerTest(unittest.TestCase): | |
def test_drop(self): | |
drop('10.11.12.13') | |
self.assertEqual(True, check_dropped('10.11.12.13')) | |
free('10.11.12.13') | |
def test_drop_in_whitelist(self): | |
drop("127.0.0.1") | |
self.assertEqual(False, check_dropped('127.0.0.1')) | |
def test_add_to_whitelist(self): | |
add_to_whitelist('12.13.14.15') | |
drop('12.13.14.15') | |
self.assertEqual(False, check_dropped('12.13.14.15')) | |
def test_free(self): | |
drop('10.11.12.13') | |
self.assertEqual(True, check_dropped('10.11.12.13')) | |
free('10.11.12.13') | |
self.assertEqual(False, check_dropped('10.11.12.13')) | |
def test_list_droppped(self): | |
drop('10.11.12.13') | |
drop('11.12.13.14') | |
self.assertEqual("10.11.12.13\n11.12.13.14\n", list_dropped()) | |
free('10.11.12.13') | |
free('11.12.13.14') | |
@patch('dosblocker.netstat', return_value=open('netstat.txt').read()) | |
def test_step(self, *args): | |
step() | |
self.assertEqual(True, check_dropped("20.21.22.23")) | |
self.assertEqual(True, check_dropped("90.91.92.93")) | |
self.assertEqual(False, check_dropped("80.81.82.83")) | |
self.assertEqual(False, check_dropped("50.51.52.53")) | |
free("20.21.22.23") | |
free("90.91.92.93") | |
if __name__ == '__main__': | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment