Skip to content

Instantly share code, notes, and snippets.

@alejandrobernardis
Created November 10, 2014 21:18
Show Gist options
  • Save alejandrobernardis/8158b5ed8264cb03f6e2 to your computer and use it in GitHub Desktop.
Save alejandrobernardis/8158b5ed8264cb03f6e2 to your computer and use it in GitHub Desktop.
Python, Crypto Methods (sync/async)
#!/usr/bin/env python2.7
# -*- coding: utf-8 -*-
# Copyright (c) 2014 Asumi Kamikaze Inc.
# Licensed under the MIT License.
# Author: Alejandro M. Bernardis
# Email: alejandro (dot) bernardis (at) asumikamikaze (dot) com
# Created: 20/Oct/2014 12:22
from base64 import encodestring, decodestring
from concurrent.futures import ThreadPoolExecutor
from Crypto import Random
from Crypto.Cipher import PKCS1_OAEP
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
__all__ = (
'generate_rsa',
'encrypt',
'decrypt',
'sign',
'verify_sign',
'CryptoThreadPoolExecutor'
)
TEXT_SUPPORTED = (basestring,)
def want_bytes(s, encoding='utf-8', errors='strict'):
if isinstance(s, unicode):
s = s.encode(encoding, errors)
return s
def message_sanitize(func):
def decorator(message, *args, **kwargs):
if not isinstance(message, TEXT_SUPPORTED):
raise TypeError('The "message" argument must be a basestring')
return func(want_bytes(message), *args, **kwargs)
return decorator
def generate_rsa(bits=2048, passphrase=None, pkcs=1):
"""
:return: tuple with private an public keys base64 encode string
"""
rsa = RSA.generate(bits, Random.new().read)
private = rsa.exportKey('PEM', passphrase, pkcs)
public = rsa.publickey().exportKey('PEM', passphrase, pkcs)
return encodestring(private), encodestring(public)
@message_sanitize
def encrypt(message, public_key, passphrase=None):
"""
:return: base64 encode string
"""
public_key = StringIO(decodestring(public_key)).read()
public_key = RSA.importKey(public_key, passphrase)
public_key = PKCS1_OAEP.new(public_key)
return encodestring(public_key.encrypt(message))
@message_sanitize
def decrypt(message, private_key, passphrase=None):
"""
:return: base64 decode string
"""
private_key = StringIO(decodestring(private_key)).read()
private_key = RSA.importKey(private_key, passphrase)
private_key = PKCS1_OAEP.new(private_key)
return private_key.decrypt(decodestring(message))
@message_sanitize
def sign(message, private_key, passphrase=None):
"""
:return: base64 encode string
"""
private_key = decodestring(StringIO(private_key).read())
private_key = RSA.importKey(private_key, passphrase)
return encodestring(PKCS1_v1_5.new(private_key).sign(SHA256.new(message)))
@message_sanitize
def verify_sign(message, signature, public_key, passphrase=None):
"""
:return: boolean
"""
message = SHA256.new(message)
signature = decodestring(signature)
public_key = decodestring(StringIO(public_key).read())
public_key = RSA.importKey(public_key, passphrase)
public_key = PKCS1_v1_5.new(public_key)
try:
return public_key.verify(message, signature) > 0
except:
return False
class CryptoThreadPoolExecutor(ThreadPoolExecutor):
def generate_rsa(self, bits=2048, passphrase=None, pkcs=1):
return self.submit(generate_rsa, bits, passphrase, pkcs)
def encrypt(self, message, public_key, passphrase=None):
return self.submit(encrypt, message, public_key, passphrase)
def decrypt(self, message, private_key, passphrase=None):
return self.submit(decrypt, message, private_key, passphrase)
def sign(self, message, private_key, passphrase=None):
return self.submit(sign, message, private_key, passphrase)
def verify_sign(self, message, signature, public_key, passphrase=None):
return self.submit(verify_sign, message, signature, public_key,
passphrase)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment