-
-
Save vipseixas/a8fadf868a1afa6b23ebc8e833c01e67 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# This is a little script to populate Firefox Sync with | |
# fake password records. Use it like so: | |
# | |
# $> pip install PyFxA syncclient cryptography | |
# $> python ./upload_fake_passwords.py 20 | |
# | |
# It will prompt for your Firefox Account email address and | |
# password, generate and upload 20 fake password records, then | |
# sync down and print all password records stored in sync. | |
# | |
import os | |
import time | |
import json | |
import random | |
import string | |
import getpass | |
import hmac | |
import hashlib | |
import base64 | |
import uuid | |
from binascii import hexlify | |
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes | |
from cryptography.hazmat.primitives import padding | |
from cryptography.hazmat.backends import default_backend | |
import fxa.core | |
import fxa.crypto | |
import syncclient.client | |
CRYPTO_BACKEND = default_backend() | |
EMAIL = raw_input("Email: ") | |
PASSWORD = getpass.getpass("Password: ") | |
# Here you can customize how fake password records are generated. | |
def make_fake_password_record(): | |
now = int(time.time() * 1000) | |
hostname = make_random_hostname() | |
return { | |
"id": "{%s}" % (uuid.uuid4(),), | |
"username": "fakeTester%d" % (random.randint(0, 100),), | |
"password": make_random_password(), | |
"hostname": hostname, | |
"formSubmitURL": hostname, | |
"usernameField": "username", | |
"passwordField": "password", | |
"timeCreated": now, | |
"timePasswordChanged": now, | |
"httpRealm": None, | |
} | |
def make_random_password(): | |
size = random.randint(8, 20) | |
return "".join(random.choice(string.letters + string.digits + string.punctuation) for _ in xrange(size)) | |
def make_random_hostname(): | |
size = random.randint(5, 30) | |
return ".".join([ | |
random.choice(["https://www", "http://www", "https://", "https://accounts"]), | |
"".join(random.choice(string.lowercase) for _ in xrange(size)), | |
random.choice(["com", "net", "org", "co.uk"]) | |
]) | |
# Below here is all the mechanics of uploading them to the sync server. | |
def main(count=20): | |
creds = login() | |
upload_fake_password_records(count, *creds) | |
def login(): | |
client = fxa.core.Client() | |
print "Signing in as", EMAIL, "..." | |
session = client.login(EMAIL, PASSWORD, keys=True) | |
try: | |
status = session.get_email_status() | |
while not status["verified"]: | |
print "Please click through the confirmation email." | |
if raw_input("Hit enter when done, or type 'resend':").strip() == "resend": | |
session.resend_email_code() | |
status = session.get_email_status() | |
assertion = session.get_identity_assertion("https://token.services.mozilla.com/") | |
_, kB = session.fetch_keys() | |
finally: | |
session.destroy_session() | |
return assertion, kB | |
def upload_fake_password_records(count, assertion, kB): | |
# Connect to sync. | |
xcs = hexlify(hashlib.sha256(kB).digest()[:16]) | |
client = syncclient.client.SyncClient(assertion, xcs) | |
# Fetch /crypto/keys. | |
raw_sync_key = fxa.crypto.derive_key(kB, "oldsync", 64) | |
root_key_bundle = KeyBundle( | |
raw_sync_key[:32], | |
raw_sync_key[32:], | |
) | |
keys_bso = client.get_record("crypto", "keys") | |
keys = root_key_bundle.decrypt_bso(keys_bso) | |
default_key_bundle = KeyBundle( | |
base64.b64decode(keys["default"][0]), | |
base64.b64decode(keys["default"][1]), | |
) | |
# Make a lot of password records. | |
for i in xrange(1, count + 1): | |
print "Uploading", i, "of", count, "fake password records..." | |
r = make_fake_password_record() | |
er = default_key_bundle.encrypt_bso(r) | |
assert default_key_bundle.decrypt_bso(er) == r | |
client.put_record("passwords", er) | |
print "Synced password records:" | |
for er in client.get_records("passwords"): | |
r = default_key_bundle.decrypt_bso(er) | |
print " %s: %r (%s)" % (r["username"], r["password"].encode('utf8'), r["hostname"]) | |
print "Done!" | |
class KeyBundle: | |
"""A little helper class to hold a sync key bundle.""" | |
def __init__(self, enc_key, mac_key): | |
self.enc_key = enc_key | |
self.mac_key = mac_key | |
def decrypt_bso(self, data): | |
payload = json.loads(data["payload"]) | |
mac = hmac.new(self.mac_key, payload["ciphertext"], hashlib.sha256) | |
if mac.hexdigest() != payload["hmac"]: | |
raise ValueError("hmac mismatch: %r != %r" % (mac.hexdigest(), payload["hmac"])) | |
iv = base64.b64decode(payload["IV"]) | |
cipher = Cipher( | |
algorithms.AES(self.enc_key), | |
modes.CBC(iv), | |
backend=CRYPTO_BACKEND | |
) | |
decryptor = cipher.decryptor() | |
plaintext = decryptor.update(base64.b64decode(payload["ciphertext"])) | |
plaintext += decryptor.finalize() | |
unpadder = padding.PKCS7(128).unpadder() | |
plaintext = unpadder.update(plaintext) + unpadder.finalize() | |
return json.loads(plaintext) | |
def encrypt_bso(self, data): | |
plaintext = json.dumps(data) | |
padder = padding.PKCS7(128).padder() | |
plaintext = padder.update(plaintext) + padder.finalize() | |
iv = os.urandom(16) | |
cipher = Cipher( | |
algorithms.AES(self.enc_key), | |
modes.CBC(iv), | |
backend=CRYPTO_BACKEND | |
) | |
encryptor = cipher.encryptor() | |
ciphertext = encryptor.update(plaintext) | |
ciphertext += encryptor.finalize() | |
b64_ciphertext = base64.b64encode(ciphertext) | |
mac = hmac.new(self.mac_key, b64_ciphertext, hashlib.sha256).hexdigest() | |
return { | |
"id": data["id"], | |
"payload": json.dumps({ | |
"ciphertext": b64_ciphertext, | |
"IV": base64.b64encode(iv), | |
"hmac": mac, | |
}) | |
} | |
if __name__ == "__main__": | |
import sys | |
count = 20 | |
if len(sys.argv) > 1: | |
count = int(sys.argv[1]) | |
main(count) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment