Skip to content

Instantly share code, notes, and snippets.

@chew-z
Created May 29, 2017 12:42
Show Gist options
  • Save chew-z/dc3a8dbef23fb17e3682ba308887c089 to your computer and use it in GitHub Desktop.
Save chew-z/dc3a8dbef23fb17e3682ba308887c089 to your computer and use it in GitHub Desktop.
Do-it-yourself encrypted messaging system
#!/usr/bin/python
# http://depado.markdown blog.com/2015-05-11-aes-cipher-with-python-3-x
import sys
import logging
import argparse
import subprocess
import base64
import hashlib
from Crypto import Random
from Crypto.Cipher import AES
from Crypto.Util import Counter
class AESCipher(object):
"""
A classical AES Cipher. Can use any size of data and any size of password thanks to padding.
Also ensure the coherence and the type of the data with a Unicode to byte converter.
"""
def __init__(self, key):
self.bs = 32
self.key = hashlib.sha256(AESCipher.str_to_bytes(key)).digest()
# AES.MODE_CBC i AES.MODE_CTR is recommended - CTR requires counter
# CFB doen't have any restriction on plaintext and ciphertext lengths
self.mode = AES.MODE_CFB
# self.mode = AES.MODE_CTR
if self.mode == AES.MODE_CTR:
self.ctr = Counter.new(128)
@staticmethod
def str_to_bytes(data):
u_type = type(b''.decode('utf8'))
if isinstance(data, u_type):
return data.encode('utf8')
return data
def _pad(self, s):
return s + (self.bs - len(s) % self.bs) * \
AESCipher.str_to_bytes(chr(self.bs - len(s) % self.bs))
@staticmethod
def _unpad(s):
return s[:-ord(s[len(s) - 1:])]
def encrypt(self, raw):
raw = self._pad(AESCipher.str_to_bytes(raw))
iv = Random.new().read(AES.block_size)
if self.mode == AES.MODE_CTR:
cipher = AES.new(self.key, self.mode, iv, self.ctr)
else:
cipher = AES.new(self.key, self.mode, iv)
# return base64.b64encode(iv + cipher.encrypt(raw)).decode('utf-8')
return base64.b64encode(iv + cipher.encrypt(raw))
def decrypt(self, enc):
enc = base64.b64decode(enc)
iv = enc[:AES.block_size]
if self.mode == AES.MODE_CTR:
cipher = AES.new(self.key, self.mode, iv, self.ctr)
else:
cipher = AES.new(self.key, self.mode, iv)
# return self._unpad(cipher.decrypt(enc[AES.block_size:])).decode('utf-8')
return self._unpad(cipher.decrypt(enc[AES.block_size:]))
def write_to_clipboard(text):
text = str(text)
p = subprocess.Popen(['pbcopy', 'w'], stdin=subprocess.PIPE)
try:
if sys.version_info >= (3, 0):
# works on Python 3 (bytes() requires an encoding)
p.communicate(input=bytes(text, 'utf-8'))
else:
# works on Python 2 (bytes() only takes one argument)
p.communicate(input=bytes(text))
except Exception as e:
logging.exception('Could not get password from keychain')
raise
def read_from_clipboard():
if sys.version_info >= (3, 0):
return subprocess.getoutput('pbpaste')
else:
import commands
return commands.getoutput('pbpaste')
def get_keychain_pass(account=None, server='encrypt.decrypt'):
params = {
'security': '/usr/bin/security',
'command': 'find-internet-password',
'account': account,
'server': server
}
command = "%(security)s %(command)s -g -a %(account)s -s %(server)s -w" % params
# logging.info(command)
try:
if sys.version_info >= (3, 0):
password = subprocess.getoutput(command)
else:
import commands
password = commands.getoutput(command)
# logging.info(password)
return password
except Exception as e:
logging.exception('Could not get password from keychain')
raise
def getArgs(argv=None):
# Command line argumentu.
parser = argparse.ArgumentParser(description='Decrypts and encrypts with AES',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-k', '--key',
help='Optional encryption key (default from keychain)')
parser.add_argument('-i', '--infile', nargs='+',
help='Infile for encryption/decryption. Clipboard if empty.')
parser.add_argument('-o', '--outfile', nargs='+',
help='Outfile for encryption/decryption. Clipboard if empty.')
parser.add_argument('-a', '--action', default='encrypt', choices=['encrypt', 'decrypt'],
help='Choose action decrypt or encrypt (default: %(default)s)')
return parser.parse_args(argv)
if __name__ == '__main__':
FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
logging.basicConfig(filename='aes.log', level=logging.DEBUG,
format=FORMAT, datefmt='%a, %d %b %Y %H:%M:%S',)
logging.info('--- aes.py logging started ---.')
args = getArgs()
logging.info(args)
try:
if args.key is not None:
key = args.key
cipher = AESCipher(key)
else:
key = get_keychain_pass('key')
cipher = AESCipher(key)
# logging.info(key)
if args.infile is not None:
infilename = ' '.join(args.infile)
logging.info(infilename)
with open(infilename, 'rb') as fi:
input = fi.read()
else:
input = read_from_clipboard()
# logging.info(input)
if args.action == 'encrypt':
encrypted = cipher.encrypt(input)
output = encrypted
else:
decrypted = cipher.decrypt(input)
output = decrypted
# logging.info(output)
if args.outfile is not None:
outfilename = ' '.join(args.outfile)
logging.info(outfilename)
with open(outfilename, 'wb') as fo:
fo.write(output)
else:
write_to_clipboard(output)
except Exception as e:
logging.exception("Fatal error in __main__ loop")
logging.info('--- aes.py logging completed ---')
#!/usr/bin/env zsh
# Fuck spaces in filenames
# IFS=$'\n'
#
TEMPFILE=encrypted.txt
# This is a bit raw.
# I prefer coding logic then escaping quotes etc. Lost too much time on quirks
# of argparse etc.
# arguments passed to aes.py and pushover.py should be separated by standalone '--'
# except --link and --outfile. You could also skip all arguments.
# without --infile encrypts clipboard
#
# TODO - instead of wrapping it all in shell script merge the code into single python code
#
args=("$@")
for ((I=0; I<${#args[@]}; I++)); do
# find index of separator '--'
if [[ ${args[I]} == '-o' || ${args[I]} == '--outfile' ]]; then
TEMPFILE=${args[I+1]}
fi
if [[ ${args[I]} == '--' ]]; then
break
fi
done
# echo separated arguments
echo ${args[@]:0:$I}
echo ${args[@]:$I+1}
#
cd $HOME/Documents/Python/utils/
#
# Encrypt (defaults to clipboard if no infile specified)a
# set arguments for aes.py
set -- ${args[@]:0:$I}
echo aes.py "$@" -a encrypt
# python3 aes.py ${args[@]:0:$I} encrypt
python3 aes.py "$@"
#
# upload to GDrive and get a public link
echo "Uploading $TEMPFILE"
LINK=$(gdrive upload --parent 0B6bDWSes13yNTDRGdnpPNWZLNUU --no-progress --share "$TEMPFILE" | grep "at https://" | cut -d " " -f7)
echo $LINK
# encode link - pass link straight to Editorial
ENCODEDLINK=$(python -c "import urllib;print urllib.quote(raw_input())" <<< "$LINK")
# add call to editorial action
ENCODEDLINK="editorial://?command=GDrive%20Decrypt%202&input=$ENCODEDLINK"
echo "$ENCODEDLINK"
LINK="$ENCODEDLINK"
#
# send link with Pushover to iPhone
#
# set arguments for pushover.py
set -- ${args[@]:$I+1}
python3 pushover.py "$@" -u $LINK
# remove temporary file
# or use GNU shred (brew install coreutils) (it's public in Google Drive anyway)
gshred $TEMPFILE -v -z
/usr/local/bin/trash $TEMPFILE
#!/usr/bin/env python3
import argparse
import logging
import subprocess
import http.client
import urllib
def get_keychain_pass(account=None, server='pushover.net'):
params = {
'security': '/usr/bin/security',
'command': 'find-internet-password',
'account': account,
'server': server
}
command = "%(security)s %(command)s -g -a %(account)s -s %(server)s -w" % params
logging.info(command)
try:
# password = commands.getoutput(command)
password = subprocess.getoutput(command)
# logging.info(password)
return password
# outtext = commands.getoutput(command)
# return re.match(r'password: "(.*)"', outtext).group(1)
except Exception as e:
logging.exception('Could not get password from keychain')
raise
def getArgs(argv=None):
# Command line arguments.
parser = argparse.ArgumentParser(description='Send Pushover noifications',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-d', '--device', default='Marakesh',
help='Target device.')
parser.add_argument('-m', '--message', default=None, nargs='+',
help='Notification contents')
parser.add_argument('-u', '--url', default=None,
help='Link addrese')
parser.add_argument('-t', '--title', default=None, nargs='+',
help='Notification title')
return parser.parse_args(argv)
if __name__ == '__main__':
FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
logging.basicConfig(filename='pushover.log', level=logging.DEBUG,
format=FORMAT, datefmt='%a, %d %b %Y %H:%M:%S',)
logging.info('--- pushover.py logging started ---.')
pushover = {}
pushover['token'] = get_keychain_pass('Python')
pushover['user'] = get_keychain_pass('Token')
args = getArgs()
if args.message is not None:
pushover['message'] = ' '.join(args.message)
logging.info(args.message)
else:
raise UserWarning('Message cannot be empty')
if args.title is not None:
pushover['title'] = ' '.join(args.title)
logging.info(args.title)
if args.url is not None:
pushover['url'] = args.url
logging.info(args.url)
if args.device is not None:
pushover['device'] = args.device
logging.info(args.device)
conn = http.client.HTTPSConnection("api.pushover.net:443")
conn.request("POST", "/1/messages.json",
urllib.parse.urlencode(pushover), {"Content-type": "application/x-www-form-urlencoded"})
conn.getresponse()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment