Skip to content

Instantly share code, notes, and snippets.

@jn0
Last active December 23, 2021 16:06
Show Gist options
  • Save jn0/12e78176cabe83d79bf75a464025ed82 to your computer and use it in GitHub Desktop.
Save jn0/12e78176cabe83d79bf75a464025ed82 to your computer and use it in GitHub Desktop.
OpenVPN controller (via management socket protocol) to employ TOTP static challenge/response feature auth
#!/usr/bin/env python3
'''
Start your OpenVPN config with
client
management myconfig.socket unix
management-query-passwords
lines.
Make sure the config has
static-challenge "Enter PIN: " 1
and
auth-user-pass
entries (`static-challenge` prompt and echo may differ, of course).
Put your name, password, and TOTP secret to a file (say, `my.pw`).
Run this script as `./ovpnctl.py myconfig.socket my.pw`
(make sure you have NO `myconfig.socket` file!).
Run `sudo openvpn --config myconfig.ovpn` now...
'''
import logging; log = logging.getLogger('OpenVPNctl' if __name__ == '__main__' else __name__) # noqa:E702,E501
import sys
import socket
import time
from os.path import exists, realpath, dirname, basename
from base64 import b64encode
from inotify.adapters import Inotify
from pyotp import TOTP
def pin(secret): return TOTP(secret).now()
def b64(s): return b64encode(s.encode()).decode()
def wait_for_file(path):
dname = dirname(path)
fname = basename(path)
log.info('Waiting for %r in %r...', fname, dname)
i = Inotify()
i.add_watch(dname)
for e in i.event_gen(yield_nones=False):
# print(e) # noqa:E501 (_INOTIFY_EVENT(wd=1, mask=256, cookie=0, len=16), ['IN_CREATE'], '/tmp', 'qwe')
if fname == e[-1] and 'IN_CREATE' in e[1]:
log.info('File %r has been created', fname)
break
i.remove_watch(dname)
class Control:
'Handle AF_UNIX control socket'
def __init__(self, socket):
self.socket = socket
self._s = None
self.buffer = []
def connect(self):
log.debug('Connecting socket')
self._s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self._s.connect(self.socket)
def _recv(self, n=8192):
log.debug('accessing socket')
r = self._s.recv(n)
log.debug('socket read: %r', r)
return (r or b'').decode()
def recv(self): # returns a line (or try to)
log.debug('reading socket')
r = None
if self.buffer:
if len(self.buffer) > 1:
r = self.buffer.pop(0)
else:
if self.buffer[0].endswith(('\n', '\r')):
r = self.buffer.pop(0)
if r is None:
r = self._recv()
r = r.splitlines(keepends=True) if r else []
if (not self.buffer) or self.buffer[-1].endswith(('\n', '\r')):
self.buffer += r # all lines in buffer are "complete"
else:
self.buffer[-1] += r.pop(0) # fill incomplete line
self.buffer += r
r = self.buffer.pop(0) if self.buffer else None
log.debug('socket return: %r', r)
return r
def line(self, quiet=False):
r = self.recv()
if r:
r = r.rstrip()
if not quiet:
log.info('> %s', r)
return r
def read_to_end(self, end='END'):
r = []
ok = False
while not ok:
s = self.recv()
log.info('+ %s', s)
r.append(s)
ok = ok or (s == end)
return r
def send(self, s, eol=b'\r\n'):
log.debug('writing socket %r', s)
return self._s.send(s.encode() + eol)
def close(self):
log.debug('Closing socket')
if self._s:
self._s.close()
self._s = None
def input(self, quiet=True):
while True:
line = self.line(quiet=quiet)
if line is None:
break
yield line
self.close()
class OvpnMgmtRealTimeMessagesMixin:
'''\
Protocol primitives: "real time messages".
Syntax is: '>' <tag> ':' <arg> <EOL>
'''
@staticmethod
def is_rtm(line):
return line.startswith('>') and ':' in line
@staticmethod
def get_tag_and_arg(line):
tag, arg = line[1:].split(':', 1)
return tag.replace('-', '_'), arg.strip()
def BYTECOUNT(self, text):
'''>BYTECOUNT:{BYTES_IN},{BYTES_OUT}'''
bin, bout = text.split(',')
log.info('BYTECOUNT: in=%s out=%s', bin, bout)
def BYTECOUNT_CLI(self, text):
'''>BYTECOUNT_CLI:{CID},{BYTES_IN},{BYTES_OUT}'''
cid, bin, bout = text.split(',')
log.info('BYTECOUNT_CLI: cid=%s in=%s out=%s', cid, bin, bout)
def CLIENT(self, text):
log.info('CLIENT: %s', text)
if not text.startswith('ADDRESS,'):
self.ctl.read_to_end('>CLIENT:ENV,END')
def ECHO(self, text):
'''>ECHO:{xz1},{xz2}'''
log.info('ECHO: %s', text)
def FATAL(self, text):
log.info('FATAL: %s', text)
def HOLD(self, text):
'''>HOLD:Waiting for hold release:10'''
log.info('HOLD: %s', text)
def INFO(self, text):
log.info('INFO: %s', text)
def LOG(self, text):
'''>LOG:{time},{flags},{text}'''
utime, flags, text = text.split(',', 2)
timev = time.strftime('%F %T', time.localtime(int(utime)))
fdecoded = [{'I': 'informational',
'F': 'fatal error',
'N': 'non-fatal error',
'W': 'warning',
'D': 'debug'}.get(c, 'unknown %r' % c) for c in flags]
log.info('LOG: time=%r flags=%r: %s', timev, fdecoded, text)
def NEED_OK(self, text):
'''>NEED-OK:Need 'token-insertion-request' confirmation MSG:Please insert your cryptographic token''' # noqa
need, msg = text.split('MSG:', 1)
tag = need.split("'" if "'" in need else '"', 2)[1]
log.info('NEED-OK: [%s] %s // %s', tag, need, msg)
self.ctl.send('needok %s %s' % (tag, 'cancel')) # cancel every request
def NEED_STR(self, text):
'''>NEED-STR:Need 'name' input MSG:Please specify your name'''
need, msg = text.split('MSG:', 1)
tag = need.split("'" if "'" in need else '"', 2)[1]
log.info('NEED-STR: [%s] %s // %s', tag, need, msg)
self.ctl.send('needstr %s %s' % (tag, tag))
def PASSWORD(self, text):
log.debug('PASSWORD(%r)', text)
if text.startswith("Need 'Auth' username/password SC:"):
text = text.split(':', 1)[-1].strip()
self.run_static_challenge(text)
elif text.startswith('Auth-Token:'):
text = text.split(':', 1)[-1].strip()
self.auth_token = text
log.info('Auth token: %s', text)
elif text.startswith("Verification Failed:"):
text = text.split(':', 1)[-1].strip()
log.info('Verification failed: %s', text)
if text in ("'Auth'", "'Private Key'"):
log.fatal('Oops.')
else:
log.error('Unhandled %r', text)
else:
log.warning('Unhandled PASSWORD entry %r', text)
def STATE(self, text):
log.info('STATE: %s', text)
def INFOMSG(self, text):
log.info('INFOMSG: %s', text)
class OvpnMgmt(OvpnMgmtRealTimeMessagesMixin):
'https://github.com/OpenVPN/openvpn/blob/42f6063f611b75a07c266061dc6a07f6dcfe372c/doc/management-notes.txt' # noqa
def __init__(self, socket, upfile):
self.socket = socket
self.upfile = upfile
self.ctl = Control(socket)
def load_up(self):
with open(self.upfile, 'rt') as f:
return f.read().strip().split()
def send_user(self, user):
self.ctl.send('username "Auth" %s' % (user,))
repl = self.ctl.line(quiet=True)
log.info('+ %s', repl)
return repl
def send_pass(self, pasw, otp=None):
self.ctl.send('password "Auth" "SCRV1:%s:%s"' %
(b64(pasw), b64(otp) if otp else ''))
repl = self.ctl.line(quiet=True)
log.info('+ %s', repl)
return repl
def send_cr_response(self, otp):
self.ctl.send('cr-response %s' % (b64(otp),))
repl = self.ctl.line(quiet=True)
log.info('+ %s', repl)
return repl
def send_status(self):
self.ctl.send('status')
return self.ctl.read_to_end()
def send_state(self):
self.ctl.send('state')
return self.ctl.read_to_end()
def send_user_pass(self, otp=None, echo=None):
user, pasw, secret = self.load_up()
self.send_user(user)
if otp is True:
otp = pin(secret)
if echo:
print(echo, otp, file=sys.stderr)
self.send_pass(pasw, otp)
def run_static_challenge(self, line): # "1,Please enter token PIN"
e, t = line.split(',', 1)
self.send_user_pass(otp=True, echo=t if e == '1' else None)
def run(self): # the only "public" method
try:
self.ctl.connect()
for line in self.ctl.input():
if self.is_rtm(line):
tag, arg = self.get_tag_and_arg(line)
log.debug('tag=%r', tag)
if hasattr(self, tag): # dispatch to a method
getattr(self, tag)(arg)
else:
log.warning('Unsupported %r in %r', tag, line)
else:
log.warning('Unknown %r', line)
except IOError as e:
log.error('Error %s on %r', e, self.socket, exc_info=1)
finally:
self.ctl.close()
def main():
if len(sys.argv) != 3:
print('Usage:', sys.argv[0], '<socket> <up>', file=sys.stderr)
return 1
socket = realpath(sys.argv[1])
upfile = realpath(sys.argv[2])
if not exists(socket):
log.warning('No socket %r', socket)
wait_for_file(socket)
OvpnMgmt(socket, upfile).run()
return 0
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO,
datefmt='%F %T',
format='%(asctime)s %(name)s %(levelname)s'
' %(message)s')
try:
log.info('start')
sys.exit(main())
finally:
log.info('stop')
# vim:set ft=python ai et ts=4 sts=4 sw=4 cc=80:EOF #
@jn0
Copy link
Author

jn0 commented Dec 23, 2021

One should see something like

INFO:OpenVPNctl:start
WARNING:OpenVPNctl:No socket '/home/jno/.jno/vpn/myconfig.socket'
INFO:OpenVPNctl:Waiting for 'myconfig.socket' in '/home/jno/.jno/vpn'...
INFO:OpenVPNctl:File 'myconfig.socket' has been created
INFO:OpenVPNctl:INFO: OpenVPN Management Interface Version 3 -- type 'help' for more info
INFO:OpenVPNctl:+ SUCCESS: 'Auth' username entered, but not yet verified
Enter PIN: 016171
INFO:OpenVPNctl:+ SUCCESS: 'Auth' password entered, but not yet verified
INFO:OpenVPNctl:Auth token: SESS_ID_AT_vpcwQF/f0QfOYVC………ZYEWFlAun2e85

before successful OpenVPN session start.

PS. May differ a bit as I've updated the code. But mostly the same ;)

@jn0
Copy link
Author

jn0 commented Dec 23, 2021

The OpenVPN log should start with

2021-12-23 14:57:30 OpenVPN 2.5.5 x86_64-pc-linux-gnu [SSL (OpenSSL)] [LZO] [LZ4] [EPOLL] [PKCS11] [MH/PKTINFO] [AEAD] built on Dec 15 2021
2021-12-23 14:57:30 library versions: OpenSSL 1.1.1f  31 Mar 2020, LZO 2.10
2021-12-23 14:57:30 MANAGEMENT: unix domain socket listening on myconfig.socket
2021-12-23 14:57:30 Need password(s) from management interface, waiting...
2021-12-23 14:57:30 MANAGEMENT: Client connected from myconig.socket
2021-12-23 14:57:30 MANAGEMENT: CMD 'username "Auth" jjnnoo'
2021-12-23 14:57:30 MANAGEMENT: CMD 'password [...]'
2021-12-23 14:57:30 Outgoing Control Channel Encryption: Cipher 'AES-256-CTR' initialized with 256 bit key
2021-12-23 14:57:30 Outgoing Control Channel Encryption: Using 256 bit message hash 'SHA256' for HMAC authentication
2021-12-23 14:57:30 Incoming Control Channel Encryption: Cipher 'AES-256-CTR' initialized with 256 bit key
2021-12-23 14:57:30 Incoming Control Channel Encryption: Using 256 bit message hash 'SHA256' for HMAC authentication
…

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment