Last active
June 25, 2018 06:44
-
-
Save maqp/d3d0fc9efe5bb7ab18a54840f980484e to your computer and use it in GitHub Desktop.
v3_onion_test.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.6 | |
# -*- coding: utf-8 -*- | |
import base64 | |
import binascii | |
import hashlib | |
import os | |
import random | |
import shlex | |
import socket | |
import subprocess | |
import tempfile | |
import time | |
import stem | |
from stem.control import Controller | |
from stem import SocketClosed | |
from stem import process as stem_process | |
CURSOR_UP_ONE_LINE = '\x1b[1A' | |
CLEAR_ENTIRE_LINE = '\x1b[2K' | |
def get_available_port(min_port: int, max_port: int) -> str: | |
with socket.socket() as tmpsock: | |
while True: | |
try: | |
tmpsock.bind(('127.0.0.1', random.randint(min_port, max_port))) | |
break | |
except OSError: | |
pass | |
_, port = tmpsock.getsockname() | |
return str(port) | |
class Tor(object): | |
"""Tor class manages the starting and stopping of Tor client with Stem.""" | |
def __init__(self) -> None: | |
self.tor_data_directory = None | |
self.tor_process = None | |
self.controller = None | |
def connect(self, port: str) -> bool: | |
"""Launch Tor as a subprocess.""" | |
self.tor_data_directory = tempfile.TemporaryDirectory() | |
tor_control_socket = os.path.join(self.tor_data_directory.name, 'control_socket') | |
try: | |
self.tor_process = stem_process.launch_tor_with_config( | |
config={'DataDirectory': self.tor_data_directory.name, | |
'SocksPort': port, | |
'ControlSocket': tor_control_socket, | |
'AvoidDiskWrites': '1', | |
'Log': 'notice stdout', | |
'GeoIPFile': '/usr/share/tor/geoip', | |
'GeoIPv6File ': '/usr/share/tor/geoip6'}, | |
tor_cmd='{}/stemtest/tor/src/or/tor'.format(os.getenv("HOME"))) | |
except OSError: | |
return False | |
start_ts = time.monotonic() | |
self.controller = Controller.from_socket_file(path=tor_control_socket) | |
self.controller.authenticate() | |
while True: | |
time.sleep(0.1) | |
try: | |
response = self.controller.get_info("status/bootstrap-phase") | |
except SocketClosed: | |
raise SystemExit("Tor socket closed.") | |
res_parts = shlex.split(response) | |
summary = res_parts[4].split('=')[1] | |
if summary == 'Done': | |
return True | |
if time.monotonic() - start_ts > 15: | |
return False | |
def stop(self) -> None: | |
"""Stop the Tor subprocess.""" | |
if self.tor_process: | |
self.tor_process.terminate() | |
time.sleep(0.1) | |
if not self.tor_process.poll(): | |
self.tor_process.kill() | |
def stem_compatible_base64_blob_from_private_key(private_key: bytes) -> str: | |
"""Create stem compatible key blob from private key. | |
This code is based on Tor's testing code at | |
https://github.com/torproject/tor/blob/8e84968ffbf6d284e8a877ddcde6ded40b3f5681/src/test/ed25519_exts_ref.py#L48 | |
""" | |
b = 256 | |
def bit(h: bytes, i: int) -> int: | |
return (h[i // 8] >> (i % 8)) & 1 | |
def encode_int(y: int) -> bytes: | |
bits = [(y >> i) & 1 for i in range(b)] | |
return b''.join([bytes([(sum([bits[i * 8 + j] << j for j in range(8)]))]) for i in range(b // 8)]) | |
def expand_private_key(sk: bytes) -> bytes: | |
h = hashlib.sha512(sk).digest() | |
a = 2 ** (b - 2) + sum(2 ** i * bit(h, i) for i in range(3, b - 2)) | |
k = b''.join([bytes([h[i]]) for i in range(b // 8, b // 4)]) | |
assert len(k) == 32 | |
return encode_int(a) + k | |
expanded_private_key = expand_private_key(private_key) | |
return base64.b64encode(expanded_private_key).decode() | |
def kill_background_tor(): | |
"""Kill any Tor instances left open.""" | |
try: | |
pids = subprocess.check_output("ps aux |grep '[t]orrc' | awk '{print $2}' 2>/dev/null", shell=True).split(b'\n') | |
for pid in pids: | |
subprocess.Popen("kill {}".format(int(pid)), shell=True).wait() | |
except ValueError: | |
pass | |
def main() -> None: | |
"""Repeatedly launch v3 Tor Onion Service using Stem. | |
When the Onion Service is up, bring it down and re-launch it. If the | |
stress test causes HSDirs to return "UPLOAD_REJECTED", create new | |
private key for Onion Services and start from scratch with longer | |
delay between re-launching of Onion Service. | |
Related tickets: | |
https://trac.torproject.org/projects/tor/ticket/25124 | |
https://trac.torproject.org/projects/tor/ticket/25552 | |
Once #25552 is fixed and rev counters are removed, this program | |
should continue iterating more or less forever without the | |
UPLOAD_REJECTED appearing, regardless of delay. | |
""" | |
kill_background_tor() | |
# Setting the initial delay between re-launches to 3600 seems to fix | |
# the UPLOAD_REJECTED issue, but as such, having to wait for an hour | |
# between bringing the service online is pretty much unusable for | |
# things like OnionShare and IM clients like TFC. | |
delay = 3 | |
while True: | |
private_key = os.urandom(32) | |
stem_key_data = stem_compatible_base64_blob_from_private_key(private_key) | |
print('\nPrivate key in hex: {}'.format(binascii.hexlify(private_key).decode())) | |
print("Delay between descriptor publishing is {} seconds.".format(delay)) | |
i = 0 | |
try: | |
while True: | |
i += 1 | |
print("\nIteration {}".format(i)) | |
tor_port = get_available_port(1000, 65535) | |
tor = Tor() | |
if not tor.connect(tor_port): | |
print("\nTor timed out!") | |
continue | |
print('Starting Onion Service... ', end='', flush=True) | |
try: | |
service = tor.controller.create_ephemeral_hidden_service(ports={80: 5000}, | |
key_type='ED25519-V3', | |
key_content=stem_key_data, | |
await_publication=True) | |
except stem.OperationFailed as e: | |
print("OperationFailed:\n{}".format(e)) | |
tor.stop() | |
delay *= 2 | |
raise | |
print('Onion Service {} is now online'.format(service.service_id)) | |
tor.controller.remove_hidden_service(service.service_id) | |
tor.stop() | |
for t in range(delay): | |
print("Sleeping {}".format(delay-t)) | |
time.sleep(1) | |
print(CURSOR_UP_ONE_LINE + CLEAR_ENTIRE_LINE + CURSOR_UP_ONE_LINE) | |
except stem.OperationFailed: | |
continue | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment