Skip to content

Instantly share code, notes, and snippets.

@dkw72n
Created June 28, 2018 07:51
Show Gist options
  • Select an option

  • Save dkw72n/727ee2c3b94fbf92771b314b91694e4b to your computer and use it in GitHub Desktop.

Select an option

Save dkw72n/727ee2c3b94fbf92771b314b91694e4b to your computer and use it in GitHub Desktop.
crontrol ipset with a restful api
# works with following iptable rules
# iptables -A FORWARD -p tcp -m set --match-set weaknet_tcp_rst src -j REJECT --reject-with tcp-reset
#
# PUT /weaknet_tcp_rst
# add invoker's ip into weaknet_tcp_rst
# DELETE /weaknet_tcp_rst
# remove invoker's ip from weaknet_tcp_rst
import sys
import logging
import os
import traceback
import argparse
import BaseHTTPServer
import signal
import subprocess
import json
from logging.handlers import RotatingFileHandler
_my_file_ = os.path.basename(__file__)
_my_full_path_ = os.path.abspath(__file__)
_my_dir_ = os.path.dirname(_my_full_path_)
parser = argparse.ArgumentParser()
parser.add_argument('--notdebugging', action='store_true')
parser.add_argument('--daemon', action = 'store_true')
parser.add_argument('--pidfile', default=os.path.join(_my_dir_, _my_file_ + ".pid"))
parser.add_argument('--logfile', default = os.path.join(_my_dir_, _my_file_ + ".log"))
DEBUG = True
FORMAT="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
logging.basicConfig(level = logging.DEBUG, format = FORMAT)
def set_daemon(opts):
try:
pid = os.fork()
if pid > 0:
with open(opts.pidfile, 'w') as fp:
fp.write('%d' % pid)
os._exit(0)
except OSError, error:
logging.error('fork #2 faild: %d (%s)', error.errno, error.strerror)
os._exit(1)
os.setsid() # detach process group
os.umask(0)
f = open(os.devnull, 'wb')
os.dup2(f.fileno(), 1) # redirect stdout to /dev/null
os.dup2(f.fileno(), 2) # redirect stderr to /dev/null
def __sigTERMhandler(signum, frame):
logging.info("Caught signal {}. Exiting".format(signum))
sys.exit(signum)
def get_ipset_name(path):
if path == '/weaknet_tcp_rst':
return 'weaknet_tcp_rst'
return None
def run_ipset_cmd(op, name, addr):
if op not in ('A', 'D'):
return False
cmd = ['ipset', '-' + op, name, addr, '-exist']
logging.info("run cmd: %s", cmd)
try:
output = subprocess.check_output(cmd, stderr = subprocess.STDOUT, universal_newlines = True)
return {"ok": True}
except subprocess.CalledProcessError as exc:
return {"ok": False, "err": (exc.returncode, exc.output)}
else:
return {"ok": False, "err": traceback.format_exc()}
class IpSetHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def log_message(self, fmt, *l):
logging.warning(fmt, *l)
def do_GET(self):
logging.info("%s -- GET %s", self.client_address[0], self.path)
self.send_error(405)
self.finish()
self.connection.close()
def do_POST(self):
logging.info("%s -- POST %s", self.client_address[0], self.path)
self.send_error(405)
self.finish()
self.connection.close()
def do_PUT(self):
logging.info("%s -- PUT %s", self.client_address[0], self.path)
setname = get_ipset_name(self.path)
if not setname:
self.send_error(404)
return
msg = run_ipset_cmd('A', setname, self.client_address[0])
self.send_response(200)
self.end_headers()
self.wfile.write(json.dumps(msg))
self.finish()
self.connection.close()
def do_DELETE(self):
logging.info("%s -- DEL %s", self.client_address, self.path)
setname = get_ipset_name(self.path)
if not setname:
self.send_error(404)
return
msg = run_ipset_cmd('D', setname, self.client_address[0])
self.send_response(200)
self.end_headers()
self.wfile.write(json.dumps(msg))
self.finish()
self.connection.close()
def run(server_class=BaseHTTPServer.HTTPServer,
handler_class=BaseHTTPServer.BaseHTTPRequestHandler):
server_address = ('', 8001)
httpd = server_class(server_address, handler_class)
logging.info("server running at %s", server_address)
httpd.serve_forever()
def main(opts):
run(handler_class = IpSetHandler)
pass
if __name__ == '__main__':
opts = parser.parse_args()
try:
if opts.notdebugging:
DEBUG = False
rotater = RotatingFileHandler(filename = opts.logfile,
maxBytes = 1024 * 1024 * 5,
backupCount = 5)
rotater.setFormatter(logging.Formatter(FORMAT))
rotater.setLevel(logging.DEBUG)
logging.getLogger().addHandler(rotater)
signal.signal(signal.SIGTERM, __sigTERMhandler)
signal.signal(signal.SIGINT, __sigTERMhandler)
if opts.daemon:
set_daemon(opts)
main(opts)
except:
logging.error(traceback.format_exc())
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment