Created
November 8, 2011 15:40
-
-
Save mrmekon/1348090 to your computer and use it in GitHub Desktop.
PGP Key Extractor -- A (partial) Python implementation of OpenPGP
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2.7 | |
# | |
# PGP Key Extractor -- A (partial) Python implementation of OpenPGP | |
# | |
# Copyright 2011 Trevor Bentley | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# | |
# | |
# DESCRIPTION: | |
# | |
# A quick-n-dirty python script that breaks a PGP message up into packets, | |
# and decodes the secret key message. | |
# | |
# This was just a research project into how PGP messages are formatted. It | |
# is full of bad design and magic numbers, and supports very few of the | |
# permitted PGP combinations. | |
# | |
# The only combination expected to work is a secret key using the DSA algorithm, | |
# encrypted with 3DES, and hashing the passphrase with 'iterated+salted' SHA1. | |
# Compressed data must be ZIP or ZLIB format. | |
# | |
# There is nothing secure about this program. Your secret key will probably | |
# end up stored in RAM unencrypted after running this. Don't run it on | |
# a key you care about if you think infiltrators are scouring your RAM. | |
# | |
# Export your secret key to a file with: | |
# $ gpg --export-secret-key 'Your Name' > seckey.pgp | |
# | |
# Run script with: | |
# $ ./pgp_key_extract seckey.pgp file_to_decrypt.pgp | |
# | |
# Depends on PyCrypto | |
# Also depends on a non-portable C extension to interact with libgcrypt. | |
# | |
import sys | |
import struct | |
import math | |
from Crypto.Hash import SHA | |
from Crypto.Cipher import DES3 | |
import getpass | |
import spam | |
import zlib | |
import tempfile | |
global packetList | |
packetList = [] | |
class PacketHeader: | |
'''Represents the header of a PGP packet''' | |
packetTagStrings = { | |
0:"Reserved", | |
1:"PUB ENC Session", | |
2:"Signature", | |
3:"SYM ENC Session", | |
4:"One-Pass Signature", | |
5:"Secret Key", | |
6:"Public Key", | |
7:"Secret Subkey", | |
8:"Compressed Data", | |
9:"SYM ENC Data", | |
10:"Marker", | |
11:"Literal Data", | |
12:"Trust", | |
13:"User ID", | |
14:"Public Subkey", | |
17:"User Attribute", | |
18:"SYM ENC INTEG Data", | |
19:"Modification Detection Code", | |
} | |
'''Packet tag identifies the type of packet''' | |
def __init__(self): | |
'''Set required fields to None''' | |
self.rawPacketTagByte = None | |
self.newStyle = None | |
self.tag = None | |
self.headerLength = None | |
self.length = None | |
self.isPartial = None | |
def loadHeaderFromFile(self, f): | |
'''Load a packet header from an open file''' | |
tagByte = f.read(1) | |
if (len(tagByte) == 0): | |
return False | |
tagByte = ord(tagByte) | |
if (not (tagByte & 0x80)): | |
raise Exception("INVALID TAG BYTE: 0x%x" % tagByte) | |
self.rawPacketTagByte = tagByte | |
self.newStyle = tagByte & 0x40 | |
if (self.newStyle): | |
self.tag = (tagByte) & 0x1F | |
else: | |
self.tag = (tagByte >> 2) & 0x0F | |
self.loadLengthFromFile(f) | |
return True | |
def loadLengthFromFile(self, f): | |
'''Call appropriate function to read length (variable length)''' | |
if (self.newStyle): | |
self.length = self.loadNewLengthFromFile(f) | |
else: | |
lentype = self.rawPacketTagByte & 0x03 | |
if (lentype == 0): | |
self.headerLength = 2 | |
elif (lentype == 1): | |
self.headerLength = 3 | |
elif (lentype == 2): | |
self.headerLength = 5 | |
else: | |
self.headerLength = 1 | |
self.length = 0 | |
return | |
self.length = self.loadOldLengthFromFile(f) | |
def loadNewLengthFromFile(self, f): | |
'''For new-style packets, value of each byte tells us how many more to read''' | |
self.isPartial = False | |
bytes = f.read(1) | |
val = ord(bytes[0]) | |
if (val <= 191): # one byte length | |
self.headerLength = 2 | |
return val | |
elif (val >= 192 and val <= 223): # two byte length | |
self.headerLength = 3 | |
bytes += f.read(1) | |
val = ((val-192)<<8)+ord(bytes[0])+192 | |
return val | |
elif (val == 255): # 4 byte length | |
self.headerLength = 6 | |
bytes = f.read(4) | |
val = ord(bytes[0])<<24 | ord(bytes[1])<<16 | ord(bytes[2])<<8 | ord(bytes[3]) | |
#val = ord(bytes[0])<<0 | ord(bytes[1])<<8 | ord(bytes[2])<<16 | ord(bytes[3])<<24 | |
return val | |
else: | |
# Oh joy -- "partial length header". | |
self.headerLength = 2 | |
self.isPartial = True | |
bytes = 1 << (val & 0x1F) | |
return bytes | |
def loadOldLengthFromFile(self, f): | |
'''For old style packets, bits in tag tell us how many bytes to read''' | |
numbytes = self.headerLength - 1 | |
bytes = f.read(numbytes) | |
val = 0 | |
for i in range(numbytes): | |
val <<= 8 | |
val += ord(bytes[i]) | |
return val | |
def tagString(self): | |
'''Print string description of header tag''' | |
try: | |
return PacketHeader.packetTagStrings[self.tag] | |
except KeyError: | |
return "UNKNOWN" | |
def __str__(self): | |
'''Print formatted description of this header''' | |
return "HEADER TYPE (%s) HEADER SIZE (%d) DATA LEN (%d)" % (self.tagString(), | |
self.headerLength, | |
self.length) | |
class Packet: | |
'''Stores content of a PGP packet, and a copy of its header''' | |
algorithmStrings = { | |
1:"RSA", | |
2:"RSA Encrypt-Only", | |
3:"RSA Sign-Only", | |
16:"Elgamal", | |
17:"DSA", | |
18:"Elliptic Curve", | |
19:"ECDSA", | |
20:"Elgamal OLD", | |
21:"Diffie-Hellman", | |
} | |
'''Asymmetric ciphers''' | |
encryptionStrings = { | |
0:"Plaintext", | |
1:"IDEA", | |
2:"TripleDES", | |
3:"CAST5", | |
4:"Blowfish", | |
7:"AES-128", | |
8:"AES-192", | |
9:"AES-256", | |
10:"Twofish", | |
} | |
'''Symetric ciphers''' | |
hashStrings = { | |
1:"MD5", | |
2:"SHA-1", | |
3:"RIPE-MD/160", | |
8:"SHA256", | |
9:"SHA384", | |
10:"SHA512", | |
11:"SHA224", | |
} | |
'''Hash algorithms''' | |
compressedStrings = { | |
0:"Uncompressed", | |
1:"ZIP", | |
2:"ZLIB", | |
3:"BZip2" | |
} | |
def __init__(self): | |
'''Create empty packet with an empty header''' | |
self.header = PacketHeader() | |
self.data = None | |
def loadPacketFromFile(self, f): | |
'''Fills in a packet from contents of a file. Starts with the header.''' | |
if (not self.header.loadHeaderFromFile(f)): | |
return False | |
if (self.header.length > 0): | |
self.data = f.read(self.header.length) | |
while (self.header.isPartial): | |
bytes = self.header.loadNewLengthFromFile(f) | |
self.header.length += bytes | |
self.data += f.read(bytes) | |
else: | |
self.data = f.read(1024*1024*1024) | |
print self.header | |
if (self.header.tag == 5 or self.header.tag == 7): # Secret key | |
self.loadSecretKeyFromPacket(self.data) | |
elif (self.header.tag == 1): # Pub key encrypted session key | |
self.loadSessionKey(self.data) | |
elif (self.header.tag == 18): # sym enc int data packet | |
self.loadEncryptedDataPacket(self.data) | |
elif (self.header.tag == 8): # Compressed | |
self.loadCompressedPacket(self.data) | |
elif (self.header.tag == 11): # Literal data | |
self.loadLiteralDataPacket(self.data) | |
return True | |
def loadLiteralDataPacket(self,data): | |
print "Literal Data packet" | |
idx = 0 | |
self.format = ord(data[idx]) | |
idx += 1 | |
print data[0:64] | |
f = open("./pgp_extract.out", "w") | |
f.write(data) | |
f.close() | |
def loadCompressedPacket(self,data): | |
print "Compressed packet" | |
idx = 0 | |
self.algo = ord(data[idx]) | |
idx += 1 | |
print self.compressedString() | |
uncompressed = None | |
if (self.algo == 1): # ZIP | |
# Magic "wbits=-15" tells it to do a raw decompress without | |
# ZIP headers and that nonsense | |
uncompressed = zlib.decompress(data[idx:], -15) | |
elif (self.algo == 2): # ZLIB | |
uncompressed = zlib.decompress(data[idx:]) | |
if (uncompressed != None): | |
tfile = tempfile.TemporaryFile() | |
tfile.write(uncompressed) | |
tfile.seek(0) | |
loadPacketsFromFileIntoList(tfile) | |
def loadEncryptedDataPacket(self, data): | |
idx = 0 | |
self.version = ord(data[idx]) | |
idx += 1 | |
self.encdata = data[idx:] | |
idx += len(self.encdata) | |
# find symmetric key from session packet | |
p = getPacketWithSessionKey() | |
if (p == None): | |
raise Exception("No valid session key found! Password wrong?") | |
result = spam.decryptData('\x00'*16, p.sessionkey, self.encdata, p.algo) | |
if (result[14:16] != result[16:18]): | |
raise Exception("Decrypted data invalid!"); | |
result = result[18:] | |
tfile = tempfile.TemporaryFile() | |
tfile.write(result) | |
tfile.seek(0) | |
loadPacketsFromFileIntoList(tfile) | |
def loadSessionKey(self, data): | |
self.sessionkey = None | |
idx = 0 | |
self.version = ord(data[idx]) | |
idx += 1 | |
self.keyid = data[idx:idx+8] | |
idx += 8 | |
print " * Encrypted with key: 0x%X" % struct.unpack(">Q", self.keyid) | |
self.algo = ord(data[idx]) | |
idx += 1 | |
print " * Algorithm: %s" % self.algoString() | |
p = getPacketWithKeyId(self.keyid) | |
if (p and p.isValid): | |
print " * Found matching secret key!" | |
else: | |
print " * No matching key." | |
return | |
self.encdata1 = self.readMPIFromBuffer(data[idx:]) | |
idx += len(self.encdata1) | |
self.encdata2 = self.readMPIFromBuffer(data[idx:]) | |
idx += len(self.encdata2) | |
if p.algo != 16: | |
raise Exception("Only supporting Elgamal sessions at the moment...") | |
if p.algo == 16: # Elgamal | |
self.frame = spam.decryptElgamalSessionKey(p.p,p.g,p.y,p.x,self.encdata1, self.encdata2) | |
n = 0 | |
n += 2 # skip the length bytes of MPI | |
# Later versions encode the DEK like this: | |
# 0 2 RND(n bytes) 0 A DEK(k bytes) CSUM(2 bytes) | |
# Preceding 0 is stripped by libgcrypt already | |
if (ord(self.frame[n]) != 2): | |
raise Exception("Invalid session key!") | |
# Determine 'n' by counting until we hit the first 0 | |
n += 1 | |
while (n < len(self.frame) and ord(self.frame[n]) != 0): | |
n += 1 | |
n += 1 | |
# Keylength is frame size minus: | |
# * 1 (algorithm) | |
# * 2 (checksum) | |
self.keylen = len(self.frame) - n - 3 | |
self.algo = ord(self.frame[n]) | |
n += 1 | |
self.sessionkey = self.frame[n:-2] | |
self.keychecksum = struct.unpack(">H",self.frame[-2:])[0] | |
checksum = sum([ord(x) for x in self.sessionkey]) % 65536 | |
if (self.keychecksum != checksum): | |
raise Exception("Session key checksum mismatch!") | |
print " * Session key: ", | |
for x in self.sessionkey: print "%.2x"%ord(x), | |
print "" | |
def loadSecretKeyFromPacket(self, data): | |
'''Load contents of a secret key -- decrypt encrypted contents.''' | |
self.isValid = False | |
# Key version | |
idx = 0 | |
if (ord(data[idx]) != 4): | |
raise Exception("VERSION %c KEYS UNSUPPORTED!" % data[idx]) | |
self.version = ord(data[idx]) | |
idx += 1 | |
# Creation time | |
self.creationTime = struct.unpack(">L", data[idx:idx+4])[0] | |
idx += 4 | |
# Public key algorithm | |
self.algo = ord(data[idx]) | |
idx += 1 | |
print " * Algorithm: %s" % self.algoString() | |
# Read MPIs for the algorithm | |
if (self.algo == 17): #DSA | |
# prime p | |
print "p idx: %u" % idx | |
self.p = self.readMPIFromBuffer(data[idx:]) | |
idx += len(self.p) | |
# order q | |
print "q idx: %u" % idx | |
self.q = self.readMPIFromBuffer(data[idx:]) | |
idx += len(self.q) | |
# generator g | |
self.g = self.readMPIFromBuffer(data[idx:]) | |
idx += len(self.g) | |
# value y | |
self.y = self.readMPIFromBuffer(data[idx:]) | |
idx += len(self.y) | |
pass | |
elif (self.algo == 1): #RSA | |
# mod n | |
# exp e | |
pass | |
elif (self.algo == 16): #Elgamal | |
# prime p | |
self.p = self.readMPIFromBuffer(data[idx:]) | |
idx += len(self.p) | |
# generator g | |
self.g = self.readMPIFromBuffer(data[idx:]) | |
idx += len(self.g) | |
# pub key val y | |
self.y = self.readMPIFromBuffer(data[idx:]) | |
idx += len(self.y) | |
else: | |
raise Exception("Unsupported key algorithm %s (%d)" % (self.algoString(self.algo), self.algo)) | |
# String-to-Key usage (tells whether key is encrypted) | |
self.s2k = ord(data[idx]) | |
idx += 1 | |
if (self.s2k == 0): | |
print " * Encryption: None" | |
self.encryption = 0 | |
elif (self.s2k == 255 or self.s2k == 254): | |
# Symmetric encryption algorithm | |
self.encryption = ord(data[idx]) | |
idx += 1 | |
print " * Encryption: %s" % self.encryptionString() | |
else: | |
self.encryption = self.s2k | |
print " * Encryption: %s" % self.encryptionString() | |
if (self.encryption != 0): # Key is encrypted -- decrypt | |
# String-to-key specifier type | |
self.specifier = ord(data[idx]) | |
idx += 1 | |
print " * Specifier: %d" % self.specifier | |
# S2K hash algorithm | |
self.hash = ord(data[idx]) | |
idx += 1 | |
print " * Hash: %s" % self.hashString() | |
if (self.specifier == 1): # salted | |
self.salt = struct.unpack(">Q", data[idx:idx+8])[0] | |
idx += 8 | |
print " * Salt: 0x%X" % self.salt | |
elif (self.specifier == 3): # salted and iterated | |
# Read salt | |
self.salt = struct.unpack(">Q", data[idx:idx+8])[0] | |
idx += 8 | |
print " * Salt: 0x%X" % self.salt | |
# Read salt count (number of bytes to hash) | |
self.s2k_count = ord(data[idx]) | |
idx += 1 | |
print " * Salt count: %d" % (self.s2k_count) | |
# Apply s2k formulat to convert count to number of bytes | |
self.hash_bytes = (16+(self.s2k_count&15)) << ((self.s2k_count>>4) + 6) | |
print " * Hash bytes: %d" % self.hash_bytes | |
if (self.hash == 2): # sha-1 | |
self.iv = struct.unpack(">Q", data[idx:idx+8])[0] | |
idx += 8 | |
print " * IV: 0x%X" % self.iv | |
# Get password from user | |
password = getpass.getpass(" * Enter secret key passphrase: ") | |
# Concatenate salt and passphrase | |
hashchunk = struct.pack(">Q",self.salt) + password | |
# Copy hashchunk until we reach the right number of bytes to hash | |
hashval = hashchunk*(self.hash_bytes/len(hashchunk)) | |
hashval += hashchunk[0:self.hash_bytes%len(hashchunk)] | |
# Perform SHA hash | |
self.hashresult = SHA.new(hashval).digest() | |
if (self.encryption == 3): #CAST5 | |
self.hashresult = self.hashresult[0:16] | |
elif (self.encryption == 2): # 3DES | |
# Documented in RFC 4880 3.7.1.1 | |
# Append '0' byte to the front of the full | |
# hash value, hash it, and take the first 4 bytes to pad the 20-byte | |
# SHA-1 result up to 24-bytes for DES3's key. | |
hashval = '\x00' + hashval | |
self.hashresult += SHA.new(hashval).digest()[0:4] | |
print " * Cipher Key: ", | |
for x in self.hashresult: print "%x"%ord(x), | |
print "" | |
# Read secret MPIs | |
if (self.algo == 17 or self.algo == 16): # DSA or Elgamal | |
# exponent x | |
if (self.encryption == 0): # not encrypted, just read | |
self.x = self.readMPIFromBuffer(data[idx:]) | |
idx += len(self.x) | |
else: # encrypted, must decrypt | |
enc_x = data[idx:] | |
idx += len(enc_x) | |
# Decrypt in Python. This does NOT work | |
#des = DES3.new(self.hashresult, DES3.MODE_CFB, struct.pack(">Q",self.iv)) | |
#plaintext = des.decrypt(enc_x) | |
#print " * Plaintext: ", | |
#for x in plaintext: print "%.2x"%ord(x), | |
#print "" | |
#self.x = self.readMPIFromBuffer(plaintext) | |
#print " * Unencrypted x: ", | |
#for x in self.x: print "%.2x"%ord(x), | |
#print "" | |
#checkhash = SHA.new(plaintext[:-20]).digest() | |
#decrypted_hash = plaintext[-20:] | |
#if (checkhash != decrypted_hash): | |
# print "ERROR: Encrypted hash does not match!" | |
# Decrypt in libgcrypt (c extension). This DOES work. | |
plaintext = spam.decryptSecretKey(struct.pack(">Q",self.iv),self.hashresult,enc_x,self.encryption) | |
print " * Plaintext: ", | |
for x in plaintext: print "%.2x"%ord(x), | |
print "" | |
self.x = self.readMPIFromBuffer(plaintext) | |
print " * Unencrypted x: ", | |
for x in self.x: print "%.2x"%ord(x), | |
print "" | |
checkhash = SHA.new(plaintext[:-20]).digest() | |
decrypted_hash = plaintext[-20:] | |
if (checkhash != decrypted_hash): | |
print "ERROR: Encrypted hash does not match!" | |
else: | |
self.isValid = True | |
if (idx != len(data)): | |
raise Exception("Finished decoding, but packet still has data!") | |
else: | |
raise Exception("Algorithm %d not supported" % self.algo) | |
# Calculate fingerprint | |
# version, creation time, algorithm, length of all mpis | |
fingerLength = None | |
fingerData = None | |
if self.algo == 17: | |
fingerLength = 6 + \ | |
len(self.p) + \ | |
len(self.q) + \ | |
len(self.g) + \ | |
len(self.y) | |
fingerData = '\x99' + chr((fingerLength >> 8)&0xFF) + \ | |
chr(fingerLength & 0xFF) + \ | |
chr(self.version) + \ | |
struct.pack(">L",self.creationTime) + \ | |
chr(self.algo) + \ | |
self.p + self.q + self.g + self.y | |
elif self.algo == 16: | |
fingerLength = 6 + \ | |
len(self.p) + \ | |
len(self.g) + \ | |
len(self.y) | |
fingerData = '\x99' + chr((fingerLength >> 8)&0xFF) + \ | |
chr(fingerLength & 0xFF) + \ | |
chr(self.version) + \ | |
struct.pack(">L",self.creationTime) + \ | |
chr(self.algo) + \ | |
self.p + self.g + self.y | |
if fingerData != None: | |
self.fingerprint = SHA.new(fingerData).digest() | |
print " * Fingerprint: ", | |
for x in self.fingerprint: print "%.2x"%ord(x), | |
print "" | |
def readMPIFromBuffer(self, data): | |
'''Reads a multi-precision integer from a buffer of bytes.''' | |
# First two bytes are number of bits to read | |
bits = struct.unpack(">H", data[0:2])[0] | |
print " * MPI bits: %d" % bits | |
# Convert bits to bytes, add 2 for the header | |
bytes = int((bits+7)/8)+2 | |
return data[0:bytes] | |
def algoString(self): | |
'''Convert asymmetric algorithm index to string''' | |
try: | |
return Packet.algorithmStrings[self.algo] | |
except Exception: | |
return "UNKNOWN - %d" % self.algo | |
def encryptionString(self): | |
'''Convert symmetric algorithm index to string''' | |
try: | |
return Packet.encryptionStrings[self.encryption] | |
except Exception: | |
return "UNKNOWN - %d" % self.encryption | |
def hashString(self): | |
'''Convert hash algorithm index to string''' | |
try: | |
return Packet.hashStrings[self.hash] | |
except Exception: | |
return "UNKNOWN - %d" % self.hash | |
def compressedString(self): | |
'''Convert asymmetric algorithm index to string''' | |
try: | |
return Packet.compressedStrings[self.algo] | |
except Exception: | |
return "UNKNOWN - %d" % self.algo | |
def getPacketWithKeyId(keyid): | |
global packetList | |
for p in packetList: | |
if p.header.tag == 5 or p.header.tag == 7: | |
if (p.fingerprint[-8:] == keyid): | |
return p | |
return None | |
def getPacketWithSessionKey(): | |
global packetList | |
for p in packetList: | |
if p.header.tag == 1: | |
if (p.sessionkey != None): | |
return p | |
return None | |
def loadPacketsFromFileIntoList(f): | |
global packetList | |
print "Packets:" | |
while (True): | |
p = Packet() | |
if (p.loadPacketFromFile(f) == False): | |
break | |
packetList.append(p) | |
print "Bytes read: %d" % f.tell() | |
print "DONE!" | |
def readPacketsFromFile(filename): | |
global packetList | |
f = None | |
try: | |
f = open(filename, "r") | |
if (f == None): | |
raise Exception("Unable to open file: %s" % filename) | |
except: | |
raise | |
print "Analyzing file: %s" % filename | |
print "" | |
# Iterate over packets in the PGP message | |
loadPacketsFromFileIntoList(f) | |
f.close() | |
if __name__ == "__main__": | |
print "-----------------------------------------" | |
print "PGP Key Extractor -- Trevor Bentley, 2011" | |
print "-----------------------------------------" | |
# Takes a file containing a secret key as its first argument | |
if (len(sys.argv) < 2): | |
raise Exception("No PGP key file given!") | |
for i in range(len(sys.argv)-1): | |
readPacketsFromFile(sys.argv[i+1]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2.7 | |
# | |
# PGP Key Extractor -- A (partial) Python implementation of OpenPGP | |
# | |
# Copyright 2011 Trevor Bentley | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# | |
from distutils.core import setup, Extension | |
module1 = Extension('spam', | |
include_dirs = ['/opt/local/include'], | |
libraries = ['gcrypt', 'gpg-error'], | |
library_dirs = ['/opt/local/lib'], | |
sources = ['spam.c']) | |
setup (name = 'PackageName', | |
version = '1.0', | |
description = 'This is a demo package', | |
ext_modules = [module1]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* Passes buffers from Python to libgcrypt's gcry_cipher_decrypt() function. | |
* | |
* This is a nasty hack to tie Python OpenPGP implementation to | |
* libgcrypt with C. Named stupidly, uncommented, no error checking, | |
* and only supports very specific combinations. | |
* | |
* Use with pgp_key_extract.py | |
* | |
* PGP Key Extractor -- A (partial) Python implementation of OpenPGP | |
* | |
* Copyright 2011 Trevor Bentley | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
* | |
* Code copied from Python C-extension example. Didn't bother renaming | |
* anything. Probably not portable. Tested on OS X 10.6.1. | |
*/ | |
#include <Python.h> | |
#include <gcrypt.h> | |
#include <gpg-error.h> | |
#include <stdio.h> | |
static PyObject *decryptSecretKey(PyObject *self, PyObject *args); | |
static PyObject* decryptElgamalSessionKey(PyObject *self, | |
PyObject *args); | |
static PyObject *decryptData(PyObject *self, PyObject *args); | |
static PyMethodDef SpamMethods[] = { | |
{"decryptSecretKey", decryptSecretKey, METH_VARARGS, | |
"Decrypt a secret key packet with S2K cipher."}, | |
{"decryptElgamalSessionKey", decryptElgamalSessionKey, | |
METH_VARARGS, "Decrypt an ElGamal encrypted session key."}, | |
{"decryptData", decryptData, | |
METH_VARARGS, "Decrypt data packet"}, | |
{NULL, NULL, 0, NULL} /* Sentinel */ | |
}; | |
PyMODINIT_FUNC | |
initspam(void) | |
{ | |
(void) Py_InitModule("spam", SpamMethods); | |
} | |
static PyObject* decryptElgamalSessionKey(PyObject *self, | |
PyObject *args) { | |
const Py_buffer p,g,y,x,d1,d2; | |
gcry_sexp_t key, data, result; | |
gcry_mpi_t mpi_p,mpi_g,mpi_y,mpi_x,mpi_dat1,mpi_dat2; | |
char *retval; | |
unsigned long retval_len = 0; | |
gcry_mpi_t cipher_key; | |
int i; | |
if (!PyArg_ParseTuple(args, "s*s*s*s*s*s*", &p, &g, &y, &x, &d1, &d2)) | |
return NULL; | |
gcry_mpi_scan (&mpi_p, GCRYMPI_FMT_PGP, p.buf, p.len, NULL); | |
gcry_mpi_scan (&mpi_g, GCRYMPI_FMT_PGP, g.buf, g.len, NULL); | |
gcry_mpi_scan (&mpi_y, GCRYMPI_FMT_PGP, y.buf, y.len, NULL); | |
gcry_mpi_scan (&mpi_x, GCRYMPI_FMT_PGP, x.buf, x.len, NULL); | |
gcry_mpi_scan (&mpi_dat1, GCRYMPI_FMT_PGP, d1.buf, d1.len, NULL); | |
gcry_mpi_scan (&mpi_dat2, GCRYMPI_FMT_PGP, d2.buf, d2.len, NULL); | |
gcry_sexp_build(&key, NULL, | |
"(private-key(elg(p%m)(g%m)(y%m)(x%m)))", | |
mpi_p, mpi_g, mpi_y, mpi_x); | |
gcry_sexp_build (&data, NULL, | |
"(enc-val(elg(a%m)(b%m)))", mpi_dat1, mpi_dat2); | |
gcry_pk_decrypt (&result, data, key); | |
cipher_key = gcry_sexp_nth_mpi (result, 0, GCRYMPI_FMT_STD); | |
for (i=0;i<64;i++) { | |
if (i&&i%16==0)printf("\n"); | |
printf("%.2X ",((unsigned char*)result)[i]); | |
} | |
printf("\n\n"); | |
for (i=0;i<64;i++) { | |
if (i&&i%16==0)printf("\n"); | |
printf("%.2X ",((unsigned char*)cipher_key)[i]); | |
} | |
printf("\n"); | |
gcry_mpi_print(GCRYMPI_FMT_PGP, NULL, 0, &retval_len, cipher_key); | |
retval = malloc(retval_len); | |
gcry_mpi_print(GCRYMPI_FMT_PGP, retval, retval_len, NULL, cipher_key); | |
gcry_sexp_release(key); | |
gcry_sexp_release(data); | |
gcry_sexp_release(result); | |
return Py_BuildValue("s#", retval, retval_len); | |
} | |
static PyObject *decryptData(PyObject *self, PyObject *args) | |
{ | |
const Py_buffer iv; | |
const Py_buffer key; | |
const Py_buffer encdata; | |
unsigned char *data; | |
unsigned long i; | |
unsigned long algo; | |
int sts; | |
gcry_cipher_hd_t cipher_hd; | |
gcry_error_t err; | |
if (!PyArg_ParseTuple(args, "s*s*s*l", &iv, &key, &encdata, &algo)) | |
return NULL; | |
/* Open a cipher object with mode TripleDES */ | |
err = gcry_cipher_open (&cipher_hd, | |
GCRY_CIPHER_AES256, | |
GCRY_CIPHER_MODE_CFB, | |
(GCRY_CIPHER_SECURE | GCRY_CIPHER_ENABLE_SYNC | | |
GCRY_CIPHER_ENABLE_SYNC)); | |
if (err != GPG_ERR_NO_ERROR) | |
printf("Error opening cipher!\n"); | |
printf("Opened cipher\n"); | |
/* Set TripleDES key */ | |
err = gcry_cipher_setkey (cipher_hd, key.buf, key.len); | |
if (err != GPG_ERR_NO_ERROR) { | |
printf("Error setting key\n"); | |
} | |
printf("Set key\n"); | |
/* Set TripleDES IV */ | |
err = gcry_cipher_setiv ( cipher_hd, iv.buf, iv.len ); | |
if (err != GPG_ERR_NO_ERROR) { | |
printf("Error setting iv\n"); | |
} | |
printf("Set IV\n"); | |
for (i=0;i<64;i++) { | |
if (i&&i%16==0)printf("\n"); | |
printf("%.2X ",((unsigned char*)encdata.buf)[i]); | |
} | |
printf("\n"); | |
/* Run decryption */ | |
data = malloc(encdata.len); | |
if (data == NULL) return NULL; | |
//err = gcry_cipher_decrypt ( cipher_hd, data, encdata.len, encdata.buf, encdata.len ); | |
err = gcry_cipher_decrypt ( cipher_hd, data, encdata.len, encdata.buf, 18 ); | |
if (err != GPG_ERR_NO_ERROR) { | |
printf("Error decrypting\n"); | |
} | |
//gcry_cipher_sync (cipher_hd); | |
for (i=0;i<64;i++) { | |
if (i&&i%16==0)printf("\n"); | |
printf("%.2X ",((unsigned char*)(encdata.buf+18))[i]); | |
} | |
printf("\n"); | |
err = gcry_cipher_decrypt ( cipher_hd, data+18, encdata.len-18, encdata.buf+18, encdata.len-18 ); | |
if (err != GPG_ERR_NO_ERROR) { | |
printf("Error decrypting\n"); | |
} | |
printf("Ran decrypt\n"); | |
printf("Data count: %d\n", encdata.len); | |
for (i=0;i<64;i++) { | |
if (i&&i%16==0)printf("\n"); | |
printf("%.2X ",((unsigned char*)data)[i]); | |
} | |
printf("\n"); | |
/* Return decrypted data */ | |
return Py_BuildValue("s#", data, encdata.len); | |
} | |
static PyObject * | |
decryptSecretKey(PyObject *self, PyObject *args) | |
{ | |
const Py_buffer iv; | |
const Py_buffer key; | |
const Py_buffer encdata; | |
unsigned char data[2048]; | |
unsigned long pgp_cipher, gcrypt_cipher; | |
unsigned long i; | |
int sts; | |
gcry_cipher_hd_t cipher_hd; | |
gcry_error_t err; | |
if (!PyArg_ParseTuple(args, "s*s*s*l", &iv, &key, &encdata, &pgp_cipher)) | |
return NULL; | |
switch (pgp_cipher) { | |
case 1: gcrypt_cipher = GCRY_CIPHER_IDEA; break; | |
case 2: gcrypt_cipher = GCRY_CIPHER_3DES; break; | |
case 3: gcrypt_cipher = GCRY_CIPHER_CAST5; break; | |
case 4: gcrypt_cipher = GCRY_CIPHER_BLOWFISH; break; | |
case 7: gcrypt_cipher = GCRY_CIPHER_AES128; break; | |
case 8: gcrypt_cipher = GCRY_CIPHER_AES192; break; | |
case 9: gcrypt_cipher = GCRY_CIPHER_AES256; break; | |
case 10: gcrypt_cipher = GCRY_CIPHER_TWOFISH; break; | |
default: | |
printf("ERROR: Invalid symmetric cipher!\n"); | |
return NULL; | |
} | |
/* Open a cipher object with mode TripleDES */ | |
err = gcry_cipher_open (&cipher_hd, | |
gcrypt_cipher, | |
GCRY_CIPHER_MODE_CFB, | |
(GCRY_CIPHER_SECURE | GCRY_CIPHER_ENABLE_SYNC)); | |
if (err != GPG_ERR_NO_ERROR) | |
printf("Error opening cipher!\n"); | |
/* Set TripleDES key */ | |
err = gcry_cipher_setkey (cipher_hd, key.buf, key.len); | |
if (err != GPG_ERR_NO_ERROR) { | |
printf("Error setting key\n"); | |
} | |
/* Set TripleDES IV */ | |
err = gcry_cipher_setiv ( cipher_hd, iv.buf, iv.len ); | |
if (err != GPG_ERR_NO_ERROR) { | |
printf("Error setting iv\n"); | |
} | |
/* Run decryption */ | |
err = gcry_cipher_decrypt ( cipher_hd, (char*)data, encdata.len, encdata.buf, encdata.len ); | |
if (err != GPG_ERR_NO_ERROR) { | |
printf("Error decrypting\n"); | |
} | |
/* Return decrypted data */ | |
return Py_BuildValue("s#", data,encdata.len); | |
} | |
I'm afraid I haven't thought about this thing in over a decade. I'll certainly leave it up for reference, but I can't remember anything about how it works and I'm not going to update it. Feel free to post any patches here if you make them, though.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@mrmekon When decrypting secret key, your code didn't work in python because the segment size of CFB mode in PyCryptodome (which is the new PyCrypto) defaults to 8 bits (CFB8). You can change this behavior by adding a segment_size parameter, and make it equal to the block size of the cipher. For example in AES it would be:
aes = AES.new(self.hashresult, AES.MODE_CFB, struct.pack(">Q",self.iv), segment_size=128)
I guess in DES it should be 64