Last active
January 10, 2022 07:36
-
-
Save jn0/2c906549788ff94eef0360eb38367cdc to your computer and use it in GitHub Desktop.
Updated controller for OpenVPN with TOTP static challenge support (it runs openvpn on its own)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
''' | |
Configure sudo(8) to run openvpn(8) without password prompt by adding | |
you ALL = NOPASSWD: /usr/sbin/openvpn | |
to your sudoers file. The script will try to verify your sudo(8) config. | |
Note: the command to run is | |
sudo openvpn --config myconfig.ovpn | |
where `myconfig.ovpn` is your actual config. | |
Start your OpenVPN config with | |
client | |
management myconfig.socket unix | |
management-query-passwords | |
lines or specify `--management` flag and leave the `client` alone. | |
Make sure the config has | |
static-challenge "Enter PIN: " 1 | |
and | |
auth-user-pass | |
entries (`static-challenge` prompt and echo may differ, of course). | |
The OpenVPN is being ran with the only `--config` argument if no `--management` | |
flag is specified! | |
The script will sorta validate your config and fail if it looks wrong. | |
Put your name, password, and TOTP secret to a file (`myconfig.ovpn.pw` here, | |
just next to the config file for autopickup) and make it user-only readable | |
(`chmod 0600 myconfig.ovpn.pw`)! The script will check for extra perms... | |
Make sure you have NO `myconfig.socket` file! Any leftover is a bug. | |
Run this script as `ovpnctl.py myconfig.ovpn`. The config is locked preventing | |
multiple instances from being ran. | |
You may find OpenVPN's log in `myconfig.ovpn.log` (rotated sorta daily). | |
''' | |
import logging; log = logging.getLogger('OpenVPNctl' if __name__ == '__main__' else __name__) # noqa:E702,E501 | |
import sys | |
import socket | |
from time import strftime, localtime, sleep, perf_counter | |
from datetime import timedelta | |
from os import chdir, rename, stat, access, X_OK, unlink | |
from os.path import exists, realpath, dirname, basename, isabs, getctime | |
from base64 import b64encode | |
from subprocess import Popen, DEVNULL, PIPE | |
from argparse import ArgumentParser | |
# one have to pip install them first | |
from inotify.adapters import Inotify | |
from pyotp import TOTP | |
from filelock import FileLock, Timeout as LockTimeout | |
LOG_FORMAT = '%(asctime)s %(levelname)s %(message)s' | |
def pin(secret): return TOTP(secret).now() | |
def b64(s): return b64encode(s.encode()).decode() | |
def stamp(xtime=None, format='%F %T', kill=''): | |
return strftime(format, localtime(xtime)).replace(kill, '') | |
def set_logging(level=logging.INFO): | |
logging.root.setLevel(level) | |
def wait_for_file(path): | |
dname = dirname(path) | |
fname = basename(path) | |
log.info('Waiting for %r in %r...', fname, dname) | |
i = Inotify() | |
i.add_watch(dname) | |
for e in i.event_gen(yield_nones=False): | |
# print(e) # noqa:E501 (_INOTIFY_EVENT(wd=1, mask=256, cookie=0, len=16), ['IN_CREATE'], '/tmp', 'qwe') | |
if fname == e[-1] and 'IN_CREATE' in e[1]: | |
log.info('File %r has been created', fname) | |
break | |
i.remove_watch(dname) | |
class Control: | |
'Handle AF_UNIX control socket' | |
def __init__(self, socket): | |
self.socket = socket | |
self._s = None | |
self.buffer = [] | |
def connect(self): | |
log.debug('Connecting socket') | |
self._s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
self._s.connect(self.socket) | |
def _recv(self, n=8192): | |
log.debug('accessing socket') | |
r = self._s.recv(n) | |
log.debug('socket read: %r', r) | |
return (r or b'').decode() | |
def recv(self): # returns a line (or try to) | |
log.debug('reading socket') | |
r = None | |
if self.buffer: | |
if len(self.buffer) > 1: | |
r = self.buffer.pop(0) | |
else: | |
if self.buffer[0].endswith(('\n', '\r')): | |
r = self.buffer.pop(0) | |
if r is None: | |
r = self._recv() | |
r = r.splitlines(keepends=True) if r else [] | |
if (not self.buffer) or self.buffer[-1].endswith(('\n', '\r')): | |
self.buffer += r # all lines in buffer are "complete" | |
else: | |
self.buffer[-1] += r.pop(0) # fill incomplete line | |
self.buffer += r | |
r = self.buffer.pop(0) if self.buffer else None | |
log.debug('socket return: %r', r) | |
return r | |
def line(self, quiet=False): | |
r = self.recv() | |
if r: | |
r = r.rstrip() | |
if not quiet: | |
log.info('> %s', r) | |
return r | |
def read_to_end(self, end='END'): | |
r = [] | |
ok = False | |
while not ok: | |
s = self.recv() | |
log.info('+ %s', s) | |
r.append(s) | |
ok = ok or (s == end) | |
return r | |
def send(self, s, eol=b'\r\n'): | |
log.debug('writing socket %r', s) | |
return self._s.send(s.encode() + eol) | |
def close(self): | |
log.debug('Closing socket') | |
if self._s: | |
self._s.close() | |
self._s = None | |
def input(self, quiet=True): | |
while True: | |
line = self.line(quiet=quiet) | |
if line is None: | |
break | |
yield line | |
self.close() | |
class OvpnMgmtRealTimeMessagesMixin: | |
'''\ | |
Protocol primitives: "real time messages". | |
Syntax is: '>' <tag> ':' <arg> <EOL> | |
''' | |
@staticmethod | |
def is_rtm(line): | |
return line.startswith('>') and ':' in line | |
@staticmethod | |
def get_tag_and_arg(line): | |
tag, arg = line[1:].split(':', 1) | |
return tag.replace('-', '_'), arg.strip() | |
def BYTECOUNT(self, text): | |
'''>BYTECOUNT:{BYTES_IN},{BYTES_OUT}''' | |
bin, bout = text.split(',') | |
log.info('BYTECOUNT: in=%s out=%s', bin, bout) | |
def BYTECOUNT_CLI(self, text): | |
'''>BYTECOUNT_CLI:{CID},{BYTES_IN},{BYTES_OUT}''' | |
cid, bin, bout = text.split(',') | |
log.info('BYTECOUNT_CLI: cid=%s in=%s out=%s', cid, bin, bout) | |
def CLIENT(self, text): | |
log.info('CLIENT: %s', text) | |
if not text.startswith('ADDRESS,'): | |
self.ctl.read_to_end('>CLIENT:ENV,END') | |
def ECHO(self, text): | |
'''>ECHO:{xz1},{xz2}''' | |
log.info('ECHO: %s', text) | |
def FATAL(self, text): | |
log.info('FATAL: %s', text) | |
def HOLD(self, text): | |
'''>HOLD:Waiting for hold release:10''' | |
log.info('HOLD: %s', text) | |
def INFO(self, text): | |
log.info('INFO: %s', text) | |
def LOG(self, text): | |
'''>LOG:{time},{flags},{text}''' | |
utime, flags, text = text.split(',', 2) | |
timev = stamp(int(utime)) | |
fdecoded = [{'I': 'informational', | |
'F': 'fatal error', | |
'N': 'non-fatal error', | |
'W': 'warning', | |
'D': 'debug'}.get(c, 'unknown %r' % c) for c in flags] | |
log.info('LOG: time=%r flags=%r: %s', timev, fdecoded, text) | |
def NEED_OK(self, text): | |
'''>NEED-OK:Need 'token-insertion-request' confirmation MSG:Please insert your cryptographic token''' # noqa | |
need, msg = text.split('MSG:', 1) | |
tag = need.split("'" if "'" in need else '"', 2)[1] | |
log.info('NEED-OK: [%s] %s // %s', tag, need, msg) | |
self.ctl.send('needok %s %s' % (tag, 'cancel')) # cancel every request | |
def NEED_STR(self, text): | |
'''>NEED-STR:Need 'name' input MSG:Please specify your name''' | |
need, msg = text.split('MSG:', 1) | |
tag = need.split("'" if "'" in need else '"', 2)[1] | |
log.info('NEED-STR: [%s] %s // %s', tag, need, msg) | |
self.ctl.send('needstr %s %s' % (tag, tag)) | |
def PASSWORD(self, text): | |
log.debug('PASSWORD(%r)', text) | |
if text.startswith("Need 'Auth' username/password SC:"): | |
text = text.split(':', 1)[-1].strip() | |
self.run_static_challenge(text) | |
elif text.startswith('Auth-Token:'): | |
text = text.split(':', 1)[-1].strip() | |
self.auth_token = text | |
log.info('Auth token: %s', text) | |
elif text.startswith("Verification Failed:"): | |
text = text.split(':', 1)[-1].strip() | |
log.info('Verification failed: %s', text) | |
if text in ("'Auth'", "'Private Key'"): | |
log.fatal('Oops.') | |
else: | |
log.error('Unhandled %r', text) | |
else: | |
log.warning('Unhandled PASSWORD entry %r', text) | |
def STATE(self, text): | |
log.info('STATE: %s', text) | |
def INFOMSG(self, text): | |
log.info('INFOMSG: %s', text) | |
class OvpnMgmt(OvpnMgmtRealTimeMessagesMixin): | |
'https://github.com/OpenVPN/openvpn/blob/42f6063f611b75a07c266061dc6a07f6dcfe372c/doc/management-notes.txt' # noqa | |
def __init__(self, socket, upfile): | |
self.socket = socket | |
self.upfile = upfile | |
self.ctl = Control(socket) | |
def load_up(self): | |
with open(self.upfile, 'rt') as f: | |
return f.read().strip().split() | |
def send_user(self, user): | |
self.ctl.send('username "Auth" %s' % (user,)) | |
repl = self.ctl.line(quiet=True) | |
log.info('+ %s', repl) | |
return repl | |
def send_pass(self, pasw, otp=None): | |
self.ctl.send('password "Auth" "SCRV1:%s:%s"' % | |
(b64(pasw), b64(otp) if otp else '')) | |
repl = self.ctl.line(quiet=True) | |
log.info('+ %s', repl) | |
return repl | |
def send_cr_response(self, otp): | |
self.ctl.send('cr-response %s' % (b64(otp),)) | |
repl = self.ctl.line(quiet=True) | |
log.info('+ %s', repl) | |
return repl | |
def send_status(self): | |
self.ctl.send('status') | |
return self.ctl.read_to_end() | |
def send_state(self): | |
self.ctl.send('state') | |
return self.ctl.read_to_end() | |
def send_user_pass(self, otp=None, echo=None): | |
user, pasw, secret = self.load_up() | |
self.send_user(user) | |
if otp is True: | |
otp = pin(secret) | |
if echo: | |
print(echo, otp, file=sys.stderr) | |
self.send_pass(pasw, otp) | |
def run_static_challenge(self, line): # "1,Please enter token PIN" | |
e, t = line.split(',', 1) | |
self.send_user_pass(otp=True, echo=t if e == '1' else None) | |
def run(self): # the only "public" method | |
try: | |
self.ctl.connect() | |
for line in self.ctl.input(): | |
if self.is_rtm(line): | |
tag, arg = self.get_tag_and_arg(line) | |
log.debug('tag=%r', tag) | |
if hasattr(self, tag): # dispatch to a method | |
getattr(self, tag)(arg) | |
else: | |
log.warning('Unsupported %r in %r', tag, line) | |
else: | |
log.warning('Unknown %r', line) | |
log.info('OpenVPN has shut down - exiting') | |
return 0 | |
except IOError as e: | |
log.error('Error %s on %r', e, self.socket, exc_info=1) | |
return 1 | |
finally: | |
self.ctl.close() | |
def get_socket(config, default): | |
socket = None | |
with open(config, 'rt') as f: | |
is_client = mqp = aup = scr = False | |
ln = 0 | |
for line in f: | |
ln += 1 | |
line = line.split('#', 1)[0].strip() # quite simplified parser... | |
if not line: | |
continue | |
if line == 'client': | |
is_client = True | |
continue | |
_ = line.split() | |
opt, arg = _[0], _[1:] | |
if opt == 'management': | |
if len(arg) == 2 and arg[1] == 'unix': | |
socket = arg[0] | |
mqp = mqp or opt == 'management-query-passwords' | |
aup = aup or opt == 'auth-user-pass' | |
scr = scr or opt == 'static-challenge' | |
if not is_client: | |
log.error('Config %r is not for client', config) | |
if default: | |
if mqp: | |
log.warning('Config %r has management-query-passwords entry' | |
' and it is enforced', config) | |
else: | |
mqp = True | |
log.info('Config %r has no management-query-passwords entry' | |
' but it is implied', config) | |
if socket: | |
log.warning('Config %r has management entry for' | |
' unix domain socket %r' | |
' but it will be overridden with %r', | |
config, socket, default) | |
else: | |
log.info('Config %r has no management entry for' | |
' unix domain socket, using %r', config, default) | |
socket = default | |
else: | |
if not mqp: | |
log.error('Config %r has no management-query-passwords entry', | |
config) | |
if not socket: | |
log.error('Config %r has no management entry for' | |
' unix domain socket', config) | |
if not aup: | |
log.error('Config %r has no auth-user-pass entry' | |
" -- I ain't sure I can authorize you!", config) | |
if not scr: | |
log.error('Config %r has no static-challenge entry' | |
' -- you do not have to use this tool!', config) | |
if not (is_client and mqp and aup and scr and socket): | |
log.fatal('Bad config %r', config) | |
sys.exit(1) | |
log.info('Using socket file %r, config of %d lines looks good to me.', | |
socket, ln) | |
return socket | |
def logrotate(logname): | |
if exists(logname): | |
today = stamp(format='%F', kill='-') | |
ctime = stamp(getctime(logname), format='%F', kill='-') | |
log.info('Log file %r of %r while now is %r', logname, ctime, today) | |
if today > ctime: | |
log.info('Rotating log to %r', logname + '.' + ctime) | |
rename(logname, logname + '.' + ctime) | |
else: | |
log.warning('No log file %r found', logname) | |
return logname | |
def sudo(*av, stdin=DEVNULL, stdout=PIPE, stderr=DEVNULL): | |
return Popen(['sudo'] + list(av), | |
stdin=stdin, stdout=stdout, stderr=stderr) | |
def start_openvpn(config, logname, mgmt, more): | |
log.info('Starting OpenVPN...') | |
f = open(logname, 'a') | |
cmd = ['openvpn', '--config', config] | |
if mgmt: | |
cmd += ['--management', mgmt, 'unix', | |
'--management-query-passwords'] | |
if more: | |
for a in more: | |
a = a.split() | |
a[0] = '--' + a[0] | |
cmd += a | |
log.debug('start_openvpn: %r', cmd) | |
return sudo(*cmd, stdout=f, stderr=f) | |
def check_sudo_openvpn(): | |
sp = sudo('--list', 'openvpn') | |
out, _ = sp.communicate() | |
rc = sp.wait() | |
if rc: | |
log.error('sudo(8) does not allow openvpn for you') | |
return None | |
openvpn = out.decode().splitlines()[0].strip() | |
if not access(openvpn, X_OK): | |
log.error('sudo(8) misconfigured for OpenVPN as %r', openvpn) | |
return None | |
sp = sudo('--list') | |
out, _ = sp.communicate() | |
rc = sp.wait() | |
if rc: # list may be disabled... | |
log.warning('Cannot verify sudo(8) config for %r', openvpn) | |
else: | |
ok = False | |
for line in out.decode().splitlines(): | |
# (root) NOPASSWD: /usr/sbin/openvpn | |
if openvpn in line and 'root' in line and 'NOPASSWD:' in line: | |
ok = True # quite primitive check... | |
break | |
if not ok: | |
log.error('sudo(8) does not allow passwordless openvpn for you') | |
return None | |
return openvpn | |
class ARGS(ArgumentParser): | |
def add(self, *av, **kw): | |
self.add_argument(*av, **kw) | |
return self | |
def parse(self): | |
arg = self.parse_args() | |
if not arg.log: | |
arg.log = arg.config + '.ctl.log' | |
if not arg.ovpnlog: | |
arg.ovpnlog = arg.config + '.log' | |
if not arg.upfile: | |
arg.upfile = arg.config + '.pw' | |
if not arg.lock: | |
arg.lock = arg.config + '.lock' | |
ff = logging.Formatter(arg.format or LOG_FORMAT) | |
fh = logging.FileHandler(logrotate(arg.log)) | |
fh.setFormatter(ff) | |
log.addHandler(fh) | |
log.fatal('-' * 80) | |
return arg | |
def get_args(): | |
return ARGS( | |
description='OpenVPN Controller', | |
fromfile_prefix_chars='@', | |
).add('config', help='OpenVPN config file' | |
).add('--upfile', help='OpenVPN up file, other than <config>.pw' # noqa:E124,E501 | |
).add('--lock', help='Lock file for this controller, other than <config>.lock' # noqa:E124,E501 | |
).add('--log', help='Log file for this controller, other than <config>.ctl.log' # noqa:E124,E501 | |
).add('--format', help='Log file format, other than ' + repr(LOG_FORMAT).replace('%', '%%') # noqa:E124,E501 | |
).add('--ovpnlog', help='Log file for openvpn(8), other than <config>.log' # noqa:E124,E501 | |
).add('--management', help='Force using this socket to control OpenVPN' # noqa:E124,E501 | |
).add('--add', help='Add more OpenVPN options', action='append', default=[] # noqa:E124,E501 | |
).add('--debug', help='Use DEBUG log level', action='store_true', default=False # noqa:E124,E501 | |
).parse() # noqa:E124,E501 | |
def main(): | |
arg = get_args() | |
if arg.debug: | |
set_logging(logging.DEBUG) | |
if not exists(arg.config): | |
log.fatal('No OpenVPN config %r exists', arg.config) | |
return 1 | |
if not exists(arg.upfile): | |
log.fatal('No credential file %r exists', arg.upfile) | |
return 1 | |
mode = stat(arg.upfile) | |
if mode.st_mode & 0o077: | |
log.fatal('Your credential file %r has mode of 0%o' | |
' and looks compromized (use `chmod 0%o %r` to fix)', | |
arg.upfile, mode.st_mode, mode.st_mode & ~0o077, arg.upfile) | |
return 1 | |
socket = get_socket(arg.config, arg.management) # config validation inside | |
openvpn = check_sudo_openvpn() | |
if not openvpn: | |
log.fatal('Your sudo(8) is not configured to run OpenVPN') | |
return 1 | |
log.info('OpenVPN binary for sudo(8): %r (looks ok and configured)', | |
openvpn) | |
if not isabs(socket): | |
dname = dirname(realpath(arg.config)) | |
log.info('Changind directory to %r', dname) | |
chdir(dname) | |
socket = realpath(socket) | |
if exists(socket): | |
try: | |
unlink(socket) | |
except IOError as e: | |
log.error('Cannot remove stale socket %r: %s', socket, e) | |
RC = 0 | |
try: | |
with FileLock(arg.lock, timeout=0.1): | |
log.info('Lock aquired') | |
sp = start_openvpn(arg.config, | |
logrotate(arg.ovpnlog), | |
arg.management, | |
arg.add) | |
log.info('OpenVPN is running PID %r', sp.pid) | |
if not exists(socket): | |
wait_for_file(socket) | |
sleep(0.1) | |
try: | |
RC = OvpnMgmt(socket, arg.upfile).run() | |
except KeyboardInterrupt: | |
log.fatal('Interrupted') | |
RC = 0 | |
rc = sp.wait() | |
log.info('OpenVPN exited with %r', rc) | |
log.info('Lock released') | |
unlink(arg.lock) | |
except LockTimeout: | |
log.fatal('Locked out') | |
return 1 | |
return RC | |
class Timed: | |
def __init__(self, logger=None): | |
self.logger = logger or None | |
def __enter__(self): | |
self.t1 = perf_counter() | |
(self.logger or log).info('start') | |
return self | |
def __exit__(self, exc_type, exc_value, traceback): | |
self.t2 = perf_counter() | |
(self.logger or log).info('stop (%s elapsed)%s', | |
timedelta(seconds=self.t2 - self.t1), | |
f' crashed {exc_type}' | |
if exc_value and exc_type is not SystemExit | |
else '') | |
if exc_value: | |
raise exc_value | |
if __name__ == '__main__': | |
logging.basicConfig(level=logging.INFO, datefmt='%F %T', format=LOG_FORMAT) | |
with Timed(): | |
sys.exit(main()) | |
# vim:set ft=python ai et ts=4 sts=4 sw=4 cc=80:EOF # |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment