Last active
October 11, 2024 15:32
-
-
Save Cimbali/862a430a0f28ffe07f8ae618e8b73973 to your computer and use it in GitHub Desktop.
A python pinentry wrapper to query your keyring for GPG passphrases, inspired from kwalletcli.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# | |
# Copyright © 2020 Cimbali <[email protected]> | |
# Iniital concept from mirabilos <[email protected]> at https://www.mirbsd.org/kwalletcli.htm | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated | |
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation the | |
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit | |
# persons to whom the Software is furnished to do so, subject to the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the | |
# Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE | |
# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR | |
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR | |
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | |
'''This script wraps a pinentry executable to ask your keyring about the password before prompting you. | |
All further interactions are forwarded to the pinentry process running in the back. | |
''' | |
import os | |
import sys | |
import termios | |
import traceback | |
import subprocess | |
import keyring | |
# configured by command line | |
pinentry_bin = os.environ.get('PINENTRY', 'pinentry') # which backend pinentry to use | |
fulltty = True # whether to prompt unknown passwords via TTY rather than backend pinentry | |
# configured by pinentry interactions | |
params = {} # prompt, desc, error, keyinfo | |
options = {} # ttyname, etc. | |
# reset environment | |
os.environ.pop('LANGUAGE', None) | |
for env in ['ALL', 'NUMERIC ', 'TIME ', 'COLLATE ', 'MONETARY ', 'MESSAGES ', 'PAPER ', 'NAME ', 'ADDRESS ', 'TELEPHONE ', 'MEASUREMENT ', 'IDENTIFICATION']: | |
os.environ.pop(f'LC_{env}', None) | |
os.environ['LANG']='C' | |
os.environ['LC_CTYPE']='en_US.UTF-8' | |
# some utilities | |
logfiles = { | |
'error': os.path.join(os.environ.get('GPGHOME', os.path.expanduser('~/.gnupg')), 'pinentry-keyring.log'), | |
'debug': os.devnull | |
} | |
def log(dest, *args, **kwargs): | |
print(*args, **kwargs, file = logfiles[dest], flush=True) | |
class Coprocess: | |
''' Handles the backend pinentry process | |
''' | |
def __init__(self, args): | |
os.environ['PINENTRY_KEYRING'] = 'set' | |
os.environ.pop('PINENTRY_BINARY', None) | |
try: | |
self.process = subprocess.Popen([pinentry_bin, *args], stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=0, universal_newlines=True) | |
message, *greeting = self.get_line() | |
except: | |
message = None | |
finally: | |
if message != 'OK' and self.process: | |
self.process.kill() | |
self.process = None | |
def __del__(self): | |
if self.process: | |
self.process.kill() | |
self.process = None | |
def get_line(self): | |
pair = self.process.stdout.readline().rstrip('\n').split(None, 1) | |
log('debug', 'pinentry > ' + ' '.join(pair)) | |
return pair if len(pair) == 2 else (pair[0], '') | |
def send(self, data): | |
if not self.process: | |
return ['ERR 14 no coproc'] | |
log('debug', 'pinentry < ' + data) | |
print(data, file=self.process.stdin, flush=True) | |
word, rem = self.get_line() | |
reply = [(word, rem)] | |
while word not in {'ERR', 'OK'}: | |
word, rem = self.get_line() | |
reply.append((word, rem)) | |
return reply | |
def get_pw(self, req): | |
reply = self.send('CONFIRM' if req == 'bool' else 'GETPIN') | |
pw = None | |
if reply[0][0] == 'D' and reply[1][0] == 'OK': | |
pw = reply[0][1] | |
return (pw, [' '.join(l) for l in reply]) | |
class NoShowInput(object): | |
''' Context Manager that sets a TTY’s attributes (lfalgs) to not echo, and restores afterwards | |
''' | |
def __init__(self, fd): | |
self.fd = fd | |
self.attr = termios.tcgetattr(self.fd) | |
def __enter__(self): | |
attr = self.attr[:] | |
attr[3] = attr[3] & ~termios.ECHO | |
termios.tcsetattr(self.fd, termios.TCSADRAIN, attr) | |
return self | |
def __exit__(self, type, value, traceback): | |
termios.tcsetattr(self.fd, termios.TCSADRAIN, self.attr) | |
class GPGTTY(object): | |
''' Context Manager that allows reading and writing to the TTY specified by GPG | |
''' | |
def __enter__(self): | |
self.ttyout = open(options['ttyname'], 'w') | |
self.ttyin = open(options['ttyname'], 'r') | |
return self | |
def __exit__(self, type, value, traceback): | |
self.ttyout.close() | |
self.ttyin.close() | |
def write(self, *args, **kwargs): | |
print(*args, **kwargs, file=self.ttyout, flush=True) | |
def read(self, *args, **kwargs): | |
read = self.ttyin.readline() | |
if not read: raise EOFError | |
assert read.endswith('\n') | |
return read[:-1] | |
def get_pw(self, req): | |
if 'error' in params: | |
self.write('\033[91m' + params['error'] + '\033[0m') | |
if 'desc' in params: | |
self.write(params['desc'].replace('%22', '"').replace('%0A', '\n')) | |
self.write(params.get('prompt', options.get('default-prompt', 'PIN:')), end=' ') | |
try: | |
with NoShowInput(self.ttyin.fileno()): | |
passphrase = self.read() | |
except EOFError: | |
return (False, ['ERR 83886179 Operation cancelled <Pinentry>']) | |
else: | |
return (passphrase, [f'D {passphrase}', 'OK']) | |
finally: | |
self.write() | |
class Keyring: | |
''' Wraps interactions with the keyring module to get and store the password | |
''' | |
@staticmethod | |
def get_pw(req): | |
try: | |
pw = keyring.get_password('pinentry-keyring', params['keyinfo']) | |
except keyring.errors.KeyringError: | |
log('error', 'ERROR Unhandled exception: ' + repr(e)) | |
traceback.print_exc(file=logfiles['debug']) | |
with GPGTTY() as tty: | |
tty.write(f'{os.path.basename(sys.argv[0])}: error looking up password in keyring: {e}', end='\n\n') | |
return (None, [f'ERR 128 keyring error {e}']) | |
else: | |
if pw is None: | |
return (None, ['ERR 128 password not in keyring']) | |
elif req == 'pass': | |
return (pw, [f'D {pw}', 'OK']) | |
else: | |
return (True, ['OK']) | |
@staticmethod | |
def save_pw(pw): | |
saving = False | |
with GPGTTY() as tty: | |
tty.write(f'{os.path.basename(sys.argv[0])}: {options.get("default-pwmngr", "Save pin to keyring").replace("_", "")}? [y/N] ', end='') | |
reply = tty.read() | |
saving = reply.strip() and reply.strip()[0] in {'y', 'Y'} | |
tty.write(f'{os.path.basename(sys.argv[0])}: {"saving..." if saving else "OK, skipping"}', end = ' ' if saving else '\n') | |
if not saving: | |
return | |
try: | |
keyring.set_password('pinentry-keyring', params['keyinfo'], pw) | |
except keyring.errors.KeyringError as e: | |
log('error', 'ERROR Unhandled exception: ' + repr(e)) | |
traceback.print_exc(file=logfiles['debug']) | |
with GPGTTY() as tty: | |
tty.write(f'error: {e}') | |
else: | |
with GPGTTY() as tty: | |
tty.write('done.') | |
def getpass(proc): | |
''' The crux of it all: try our options to get a password or phrase. | |
If it’s not in the keyring, save it to the keyring. | |
''' | |
if options.get('allow-external-password-cache', False): | |
pw, reply = Keyring.get_pw('pass') | |
else: | |
log('debug', 'skipping keyring: external cache disallowed') | |
with GPGTTY() as tty: | |
tty.write(f'{os.path.basename(sys.argv[0])}: skipping keyring: external cache disallowed') | |
pw, reply = (None, ['ERR 1 no external password cache']) | |
if not pw: | |
if not proc or fulltty: | |
with GPGTTY() as tty: | |
pw, reply = tty.get_pw('pass') | |
else: | |
pw, reply = proc.get_pw('pass', proc) | |
if pw and options.get('allow-external-password-cache', False): | |
Keyring.save_pw(pw) | |
del pw # no hanging refs − best security python can afford? | |
return reply | |
def handle_command(action, arg, proc): | |
''' Handle the commands for the subset of the Assuan that interests us | |
''' | |
if action.startswith('SET'): | |
params[action[3:].lower()] = arg | |
elif action == 'GETPIN': | |
return getpass(proc) | |
elif action == 'CONFIRM': | |
pw, reply = Keyring.get_pw('bool') | |
#if not pw and (not proc or fulltty): | |
#with GPGTTY() as tty: | |
# pw, reply = tty.get_pw('bool') | |
if not pw: | |
pw, reply = proc.get_pw('bool', proc) | |
del pw # no hanging refs − best security python can afford? | |
return reply | |
elif action == 'BYE': | |
proc.send(f'{action} {arg}') | |
return ['OK Closing connection'] | |
elif action == 'OPTION': | |
try: | |
opt, val = arg.split('=', 1) | |
except ValueError: | |
opt, val = arg, True | |
if opt == 'ttyname': os.environ['GPG_TTY'] = val | |
if opt == 'ttytype': os.environ['GPG_TERM'] = val | |
if opt == 'lc-ctype': os.environ['LC_CTYPE'] = val | |
if opt == 'lc-messages': os.environ['LC_MESSAGES'] = val | |
options[opt] = val | |
elif action == 'GETINFO' and arg == 'pid': | |
return [f'D {os.getpid()}', 'OK'] | |
elif action == 'GETINFO' and arg == 'ttyinfo': | |
return [f'D {os.environ["GPG_TTY"]} {os.environ.get("GPG_TERM", "")} {os.environ["DISPLAY"]}', 'OK'] | |
elif action == 'GETINFO' and arg == 'flavor': | |
return [f'D keyring', 'OK'] | |
elif action == 'GETINFO' and arg == 'version': | |
if proc: | |
return [' '.join(l) for l in proc.send(f'{action} {arg}')] | |
else: | |
return ['D 1.1.0', 'OK'] # the version we emulate (and thus implement?) | |
else: | |
log('error', f'warning: unknown line {action} {arg}') | |
return ['ERR 1 unrecognised command'] | |
# In general, just pass it on and forget about the reply | |
proc.send(f'{action} {arg}') | |
return ['OK'] | |
def handle_args(args): | |
''' Parse the command line arguments and remove those not destined for pinentry | |
''' | |
iter_args = iter(args[:]) | |
for arg in iter_args: | |
if arg == '--display': | |
os.environ['DISPLAY'] = next(iter_args) | |
elif arg == '--ttyname': | |
os.environ['GPG_TTY'] = next(iter_args) | |
elif arg == '--ttytype': | |
os.environ['GPG_TERM'] = next(iter_args) | |
elif arg == '--lc-type': | |
os.environ['LC_CTYPE'] = next(iter_args) | |
elif arg == '--lc-messages': | |
os.environ['LC_MESSAGES'] = next(iter_args) | |
elif arg in {'-t', '--tty', '-n', '--no-tty'}: | |
fulltty = 'n' not in arg | |
args.remove(arg) | |
elif arg in {'-p', '--pinentry'}: | |
pinentry_bin = next(iter_args) | |
args.remove(arg) | |
args.remove(pinentry_bin) | |
elif arg in {'-v', '--verbose'}: | |
logfiles['debug'] = logfiles['error'] | |
args.remove(arg) | |
elif arg in {'-h', '--help'}: | |
print(__doc__) | |
print(f'Usage: {os.path.basename(sys.argv[0])} [options]') | |
print('-p --pinentry BINARY Use BINARY as the backend pinentry program') | |
print('-t --tty -n --no-tty Prefer the TTY to prompt for the password') | |
print(f'-v --verbose Wrote more information to {logfiles["error"]}') | |
print('Further options are passed directly to pinentry') | |
exit(0) | |
elif arg in {'-d', '-e', '-g', '--debug', '--enhanced', '--no-global-grab'}: | |
pass | |
elif arg in {'-W', '--parent-wid'}: | |
next(iter_args) | |
else: | |
print(f'Warning: unknown argument {arg}') | |
return args | |
def main(args): | |
''' Main loop: while we receive commands on stdin reply to them as best we can. | |
''' | |
# open log files (after argument handling) | |
with open(logfiles['error'], 'w') as logfiles['error'], open(logfiles['debug'], 'w') as logfiles['debug']: | |
# Any reasons why we should not start? | |
if os.environ.get('PINENTRY_KEYRING', None): | |
log('error', 'recursive call') | |
print('ERR 7 trying to call me recursively', flush=True) | |
exit(7) | |
if not os.environ.get('DISPLAY', None): | |
log('error', f'since DISPLAY is not set, replacing with: {pinentry_bin}') | |
exit(subprocess.run([pinentry_bin, args], stdin=sys.stdin, stdout=sys.stdout, stderr=sys.stderr).returncode) | |
# Go ahead: start the pinentry process and handle the commands | |
proc = Coprocess(args) | |
print('OK ready', flush=True) | |
for line in (l.rstrip('\n') for l in sys.stdin): | |
if not line or line.startswith('#'): | |
continue | |
pair = line.split(None, 1) | |
action, arg = (pair if len(pair) == 2 else (pair[0], '')) | |
try: | |
reply = '\n'.join(handle_command(action, arg, proc)) | |
print(reply, flush=True) | |
except BrokenPipeError: | |
break | |
except Exception as e: | |
log('error', 'ERROR Unhandled exception: ' + repr(e)) | |
traceback.print_exc(file=logfiles['debug']) | |
try: | |
print('ERR 8 unhandled exception', flush=True) | |
except BrokenPipeError: | |
break | |
finally: | |
if action == 'BYE': | |
break | |
if __name__ == '__main__'or True: | |
main(handle_args(sys.argv[1:])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
git commit
GPG output:pinentry-keyring.log
:UPDATE: There's more issues.
Here a working version:
And the changes from the above original: