|
# The author (Ethan White) of this code dedicates any and all copyright interest |
|
# in this code to the public domain. The author makes this dedication for the |
|
# benefit of the public at large and to the detriment of his heirs and |
|
# successors. The author intends this dedication to be an overt act of |
|
# relinquishment in perpetuity of all present and future rights to this code |
|
# under copyright law. |
|
# |
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, |
|
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF |
|
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. |
|
# IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR |
|
# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, |
|
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR |
|
# OTHER DEALINGS IN THE SOFTWARE. |
|
|
|
import random |
|
import os |
|
import math |
|
|
|
BITS_PER_FLOAT = 53 |
|
FLOAT_RECIP = 2**-BITS_PER_FLOAT |
|
HALF_SIZE = 16 |
|
BYTE_SIZE = HALF_SIZE // 8 |
|
IS_PY2 = False |
|
USE_STREAM_CIPHER = False |
|
|
|
try: |
|
range = xrange |
|
IS_PY2 = True |
|
except: |
|
pass |
|
|
|
if IS_PY2: |
|
import struct |
|
def from_bytes(bytes_): |
|
return struct.unpack("<L", bytes_ + b"\0" * (4 - len(bytes_)))[0] |
|
else: |
|
def from_bytes(bytes_): |
|
return int.from_bytes(bytes_, "big") |
|
|
|
class RandomPRFFeistelCipher: |
|
def __init__(self, functions): |
|
self.functions = functions |
|
|
|
def encrypt(self, lhs, rhs=None): |
|
if rhs is None: |
|
if type(lhs) is not tuple: |
|
raise ValueError() |
|
lhs, rhs = lhs |
|
|
|
lhs ^= self.functions[0][rhs] |
|
rhs ^= self.functions[1][lhs] |
|
lhs ^= self.functions[2][rhs] |
|
rhs ^= self.functions[3][lhs] |
|
return lhs, rhs |
|
|
|
def decrypt(self, lhs, rhs=None): |
|
if rhs is None: |
|
if type(lhs) is not tuple: |
|
raise ValueError() |
|
lhs, rhs = lhs |
|
|
|
rhs ^= self.functions[3][lhs] |
|
lhs ^= self.functions[2][rhs] |
|
rhs ^= self.functions[1][lhs] |
|
lhs ^= self.functions[0][rhs] |
|
return lhs, rhs |
|
|
|
@staticmethod |
|
def generate(): |
|
return RandomPRFFeistelCipher([ |
|
gen_prf(generate_keystream(2**HALF_SIZE * BYTE_SIZE)) |
|
for i in range(4) |
|
]) |
|
|
|
def generate_keystream(length): |
|
if USE_STREAM_CIPHER: |
|
# Instead of using os.urandom directly, we can also use a stream cipher |
|
# to generate the ~1MB of randomness necessary to generate the PRFs. |
|
# In this case, we use AES-256-CTR, which is used in various places in Tor. |
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes |
|
from cryptography.hazmat.backends.openssl import backend |
|
cipher = Cipher( |
|
algorithms.AES(os.urandom(32)), |
|
modes.CTR(os.urandom(16)), |
|
backend=backend |
|
).encryptor() |
|
ret = cipher.update(b"\0" * length) |
|
cipher.finalize() |
|
return ret |
|
|
|
return os.urandom(length) |
|
|
|
def gen_prf(keystream): |
|
if len(keystream) < 2**HALF_SIZE * BYTE_SIZE: |
|
raise ValueError("Keystream too short.") |
|
|
|
return [from_bytes(keystream[i * BYTE_SIZE:(i + 1) * BYTE_SIZE]) |
|
for i in range(2**HALF_SIZE)] |
|
|
|
|
|
if __name__ == "__main__": |
|
import sys |
|
import time |
|
sys.stdout.write("Generating permutations... ") |
|
sys.stdout.flush() |
|
start = time.time() |
|
cipher = RandomPRFFeistelCipher.generate() |
|
print("Done in %0.4f seconds" % (time.time() - start)) |
|
block = (0, 0) |
|
start = time.time() |
|
for i in range(1000000): |
|
block = cipher.encrypt(block) # Try replacing with cipher.decrypt |
|
print("1 million encryptions took %0.4f seconds." % (time.time() - start)) |