Last active
December 23, 2021 16:06
-
-
Save jn0/12e78176cabe83d79bf75a464025ed82 to your computer and use it in GitHub Desktop.
OpenVPN controller (via management socket protocol) to employ TOTP static challenge/response feature auth
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
''' | |
Start your OpenVPN config with | |
client | |
management myconfig.socket unix | |
management-query-passwords | |
lines. | |
Make sure the config has | |
static-challenge "Enter PIN: " 1 | |
and | |
auth-user-pass | |
entries (`static-challenge` prompt and echo may differ, of course). | |
Put your name, password, and TOTP secret to a file (say, `my.pw`). | |
Run this script as `./ovpnctl.py myconfig.socket my.pw` | |
(make sure you have NO `myconfig.socket` file!). | |
Run `sudo openvpn --config myconfig.ovpn` now... | |
''' | |
import logging; log = logging.getLogger('OpenVPNctl' if __name__ == '__main__' else __name__) # noqa:E702,E501 | |
import sys | |
import socket | |
import time | |
from os.path import exists, realpath, dirname, basename | |
from base64 import b64encode | |
from inotify.adapters import Inotify | |
from pyotp import TOTP | |
def pin(secret): return TOTP(secret).now() | |
def b64(s): return b64encode(s.encode()).decode() | |
def wait_for_file(path): | |
dname = dirname(path) | |
fname = basename(path) | |
log.info('Waiting for %r in %r...', fname, dname) | |
i = Inotify() | |
i.add_watch(dname) | |
for e in i.event_gen(yield_nones=False): | |
# print(e) # noqa:E501 (_INOTIFY_EVENT(wd=1, mask=256, cookie=0, len=16), ['IN_CREATE'], '/tmp', 'qwe') | |
if fname == e[-1] and 'IN_CREATE' in e[1]: | |
log.info('File %r has been created', fname) | |
break | |
i.remove_watch(dname) | |
class Control: | |
'Handle AF_UNIX control socket' | |
def __init__(self, socket): | |
self.socket = socket | |
self._s = None | |
self.buffer = [] | |
def connect(self): | |
log.debug('Connecting socket') | |
self._s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
self._s.connect(self.socket) | |
def _recv(self, n=8192): | |
log.debug('accessing socket') | |
r = self._s.recv(n) | |
log.debug('socket read: %r', r) | |
return (r or b'').decode() | |
def recv(self): # returns a line (or try to) | |
log.debug('reading socket') | |
r = None | |
if self.buffer: | |
if len(self.buffer) > 1: | |
r = self.buffer.pop(0) | |
else: | |
if self.buffer[0].endswith(('\n', '\r')): | |
r = self.buffer.pop(0) | |
if r is None: | |
r = self._recv() | |
r = r.splitlines(keepends=True) if r else [] | |
if (not self.buffer) or self.buffer[-1].endswith(('\n', '\r')): | |
self.buffer += r # all lines in buffer are "complete" | |
else: | |
self.buffer[-1] += r.pop(0) # fill incomplete line | |
self.buffer += r | |
r = self.buffer.pop(0) if self.buffer else None | |
log.debug('socket return: %r', r) | |
return r | |
def line(self, quiet=False): | |
r = self.recv() | |
if r: | |
r = r.rstrip() | |
if not quiet: | |
log.info('> %s', r) | |
return r | |
def read_to_end(self, end='END'): | |
r = [] | |
ok = False | |
while not ok: | |
s = self.recv() | |
log.info('+ %s', s) | |
r.append(s) | |
ok = ok or (s == end) | |
return r | |
def send(self, s, eol=b'\r\n'): | |
log.debug('writing socket %r', s) | |
return self._s.send(s.encode() + eol) | |
def close(self): | |
log.debug('Closing socket') | |
if self._s: | |
self._s.close() | |
self._s = None | |
def input(self, quiet=True): | |
while True: | |
line = self.line(quiet=quiet) | |
if line is None: | |
break | |
yield line | |
self.close() | |
class OvpnMgmtRealTimeMessagesMixin: | |
'''\ | |
Protocol primitives: "real time messages". | |
Syntax is: '>' <tag> ':' <arg> <EOL> | |
''' | |
@staticmethod | |
def is_rtm(line): | |
return line.startswith('>') and ':' in line | |
@staticmethod | |
def get_tag_and_arg(line): | |
tag, arg = line[1:].split(':', 1) | |
return tag.replace('-', '_'), arg.strip() | |
def BYTECOUNT(self, text): | |
'''>BYTECOUNT:{BYTES_IN},{BYTES_OUT}''' | |
bin, bout = text.split(',') | |
log.info('BYTECOUNT: in=%s out=%s', bin, bout) | |
def BYTECOUNT_CLI(self, text): | |
'''>BYTECOUNT_CLI:{CID},{BYTES_IN},{BYTES_OUT}''' | |
cid, bin, bout = text.split(',') | |
log.info('BYTECOUNT_CLI: cid=%s in=%s out=%s', cid, bin, bout) | |
def CLIENT(self, text): | |
log.info('CLIENT: %s', text) | |
if not text.startswith('ADDRESS,'): | |
self.ctl.read_to_end('>CLIENT:ENV,END') | |
def ECHO(self, text): | |
'''>ECHO:{xz1},{xz2}''' | |
log.info('ECHO: %s', text) | |
def FATAL(self, text): | |
log.info('FATAL: %s', text) | |
def HOLD(self, text): | |
'''>HOLD:Waiting for hold release:10''' | |
log.info('HOLD: %s', text) | |
def INFO(self, text): | |
log.info('INFO: %s', text) | |
def LOG(self, text): | |
'''>LOG:{time},{flags},{text}''' | |
utime, flags, text = text.split(',', 2) | |
timev = time.strftime('%F %T', time.localtime(int(utime))) | |
fdecoded = [{'I': 'informational', | |
'F': 'fatal error', | |
'N': 'non-fatal error', | |
'W': 'warning', | |
'D': 'debug'}.get(c, 'unknown %r' % c) for c in flags] | |
log.info('LOG: time=%r flags=%r: %s', timev, fdecoded, text) | |
def NEED_OK(self, text): | |
'''>NEED-OK:Need 'token-insertion-request' confirmation MSG:Please insert your cryptographic token''' # noqa | |
need, msg = text.split('MSG:', 1) | |
tag = need.split("'" if "'" in need else '"', 2)[1] | |
log.info('NEED-OK: [%s] %s // %s', tag, need, msg) | |
self.ctl.send('needok %s %s' % (tag, 'cancel')) # cancel every request | |
def NEED_STR(self, text): | |
'''>NEED-STR:Need 'name' input MSG:Please specify your name''' | |
need, msg = text.split('MSG:', 1) | |
tag = need.split("'" if "'" in need else '"', 2)[1] | |
log.info('NEED-STR: [%s] %s // %s', tag, need, msg) | |
self.ctl.send('needstr %s %s' % (tag, tag)) | |
def PASSWORD(self, text): | |
log.debug('PASSWORD(%r)', text) | |
if text.startswith("Need 'Auth' username/password SC:"): | |
text = text.split(':', 1)[-1].strip() | |
self.run_static_challenge(text) | |
elif text.startswith('Auth-Token:'): | |
text = text.split(':', 1)[-1].strip() | |
self.auth_token = text | |
log.info('Auth token: %s', text) | |
elif text.startswith("Verification Failed:"): | |
text = text.split(':', 1)[-1].strip() | |
log.info('Verification failed: %s', text) | |
if text in ("'Auth'", "'Private Key'"): | |
log.fatal('Oops.') | |
else: | |
log.error('Unhandled %r', text) | |
else: | |
log.warning('Unhandled PASSWORD entry %r', text) | |
def STATE(self, text): | |
log.info('STATE: %s', text) | |
def INFOMSG(self, text): | |
log.info('INFOMSG: %s', text) | |
class OvpnMgmt(OvpnMgmtRealTimeMessagesMixin): | |
'https://github.com/OpenVPN/openvpn/blob/42f6063f611b75a07c266061dc6a07f6dcfe372c/doc/management-notes.txt' # noqa | |
def __init__(self, socket, upfile): | |
self.socket = socket | |
self.upfile = upfile | |
self.ctl = Control(socket) | |
def load_up(self): | |
with open(self.upfile, 'rt') as f: | |
return f.read().strip().split() | |
def send_user(self, user): | |
self.ctl.send('username "Auth" %s' % (user,)) | |
repl = self.ctl.line(quiet=True) | |
log.info('+ %s', repl) | |
return repl | |
def send_pass(self, pasw, otp=None): | |
self.ctl.send('password "Auth" "SCRV1:%s:%s"' % | |
(b64(pasw), b64(otp) if otp else '')) | |
repl = self.ctl.line(quiet=True) | |
log.info('+ %s', repl) | |
return repl | |
def send_cr_response(self, otp): | |
self.ctl.send('cr-response %s' % (b64(otp),)) | |
repl = self.ctl.line(quiet=True) | |
log.info('+ %s', repl) | |
return repl | |
def send_status(self): | |
self.ctl.send('status') | |
return self.ctl.read_to_end() | |
def send_state(self): | |
self.ctl.send('state') | |
return self.ctl.read_to_end() | |
def send_user_pass(self, otp=None, echo=None): | |
user, pasw, secret = self.load_up() | |
self.send_user(user) | |
if otp is True: | |
otp = pin(secret) | |
if echo: | |
print(echo, otp, file=sys.stderr) | |
self.send_pass(pasw, otp) | |
def run_static_challenge(self, line): # "1,Please enter token PIN" | |
e, t = line.split(',', 1) | |
self.send_user_pass(otp=True, echo=t if e == '1' else None) | |
def run(self): # the only "public" method | |
try: | |
self.ctl.connect() | |
for line in self.ctl.input(): | |
if self.is_rtm(line): | |
tag, arg = self.get_tag_and_arg(line) | |
log.debug('tag=%r', tag) | |
if hasattr(self, tag): # dispatch to a method | |
getattr(self, tag)(arg) | |
else: | |
log.warning('Unsupported %r in %r', tag, line) | |
else: | |
log.warning('Unknown %r', line) | |
except IOError as e: | |
log.error('Error %s on %r', e, self.socket, exc_info=1) | |
finally: | |
self.ctl.close() | |
def main(): | |
if len(sys.argv) != 3: | |
print('Usage:', sys.argv[0], '<socket> <up>', file=sys.stderr) | |
return 1 | |
socket = realpath(sys.argv[1]) | |
upfile = realpath(sys.argv[2]) | |
if not exists(socket): | |
log.warning('No socket %r', socket) | |
wait_for_file(socket) | |
OvpnMgmt(socket, upfile).run() | |
return 0 | |
if __name__ == '__main__': | |
logging.basicConfig(level=logging.INFO, | |
datefmt='%F %T', | |
format='%(asctime)s %(name)s %(levelname)s' | |
' %(message)s') | |
try: | |
log.info('start') | |
sys.exit(main()) | |
finally: | |
log.info('stop') | |
# vim:set ft=python ai et ts=4 sts=4 sw=4 cc=80:EOF # |
The OpenVPN log should start with
2021-12-23 14:57:30 OpenVPN 2.5.5 x86_64-pc-linux-gnu [SSL (OpenSSL)] [LZO] [LZ4] [EPOLL] [PKCS11] [MH/PKTINFO] [AEAD] built on Dec 15 2021
2021-12-23 14:57:30 library versions: OpenSSL 1.1.1f 31 Mar 2020, LZO 2.10
2021-12-23 14:57:30 MANAGEMENT: unix domain socket listening on myconfig.socket
2021-12-23 14:57:30 Need password(s) from management interface, waiting...
2021-12-23 14:57:30 MANAGEMENT: Client connected from myconig.socket
2021-12-23 14:57:30 MANAGEMENT: CMD 'username "Auth" jjnnoo'
2021-12-23 14:57:30 MANAGEMENT: CMD 'password [...]'
2021-12-23 14:57:30 Outgoing Control Channel Encryption: Cipher 'AES-256-CTR' initialized with 256 bit key
2021-12-23 14:57:30 Outgoing Control Channel Encryption: Using 256 bit message hash 'SHA256' for HMAC authentication
2021-12-23 14:57:30 Incoming Control Channel Encryption: Cipher 'AES-256-CTR' initialized with 256 bit key
2021-12-23 14:57:30 Incoming Control Channel Encryption: Using 256 bit message hash 'SHA256' for HMAC authentication
…
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
One should see something like
before successful OpenVPN session start.
PS. May differ a bit as I've updated the code. But mostly the same ;)