Forked from fredo/gist:c8c242c2ca5ba3bba57dcd11d4e42f63
Last active
November 18, 2020 15:40
-
-
Save konradkonrad/a1fae032d7e7c5472504443c54300fbe to your computer and use it in GitHub Desktop.
message_federation_penetrator.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from gevent import monkey # isort:skip | |
monkey.patch_all() # isort:skip | |
from collections import defaultdict | |
from typing import List | |
import gevent | |
from gevent.event import Event | |
from matrix_client.errors import MatrixRequestError | |
from raiden.constants import ( | |
DISCOVERY_DEFAULT_ROOM, | |
PATH_FINDING_BROADCASTING_ROOM, | |
Environment, | |
Networks, | |
) | |
from raiden.network.transport import MatrixTransport | |
from raiden.network.transport.matrix.client import MatrixSyncMessages, Room | |
from raiden.network.transport.matrix.rtc.utils import setup_asyncio_event_loop | |
from raiden.network.transport.matrix.utils import my_place_or_yours | |
from raiden.settings import CapabilitiesConfig, MatrixTransportConfig | |
from raiden.tests.utils.mocks import MockRaidenService | |
from raiden.utils.typing import RoomID | |
setup_asyncio_event_loop() | |
SERVER_PREFIX = "https://transport.transport0" | |
SERVER_SUFFIX = ".raiden.network" | |
RSB_ID_OUT_OF_ORDER = [] | |
NUMBER_OF_USERS = 5 | |
signers = list() | |
message_received_bag = defaultdict(lambda: defaultdict(list)) | |
message_sent_bag = defaultdict(lambda: defaultdict(list)) | |
message_failed_bag = defaultdict(lambda: defaultdict(list)) | |
def _handle_invite(self, room_id: RoomID, state: dict) -> None: | |
try: | |
self._client.join_room(room_id_or_alias=room_id) | |
except MatrixRequestError as ex: | |
print(ex) | |
print(f"COULD NOT JOIN {room_id}") | |
def _handle_message_callback(self, message_batch: MatrixSyncMessages): | |
for room, room_messages in message_batch: | |
for message in room_messages: | |
sender = message["sender"] | |
text = message["content"]["body"] | |
print(f"receiver: {self._client.user_id}, sender: {sender}") | |
if self._client.user_id != sender: | |
message_received_bag[(sender, self._client.user_id)][sender].append( | |
text | |
) | |
def login_all() -> List[MatrixTransport]: | |
transports = [] | |
for i in range(NUMBER_OF_USERS): | |
server_count = i % 5 + 1 | |
if server_count in RSB_ID_OUT_OF_ORDER: | |
continue | |
server_url = f"{SERVER_PREFIX}{server_count}{SERVER_SUFFIX}" | |
transport = MatrixTransport( | |
config=MatrixTransportConfig( | |
broadcast_rooms=[ | |
DISCOVERY_DEFAULT_ROOM, | |
PATH_FINDING_BROADCASTING_ROOM, | |
], | |
retries_before_backoff=2, | |
retry_interval_initial=2, | |
retry_interval_max=2, | |
server=server_url, | |
available_servers=[server_url], | |
capabilities_config=CapabilitiesConfig(web_rtc=False), | |
), | |
environment=Environment.PRODUCTION, | |
) | |
transports.append(transport) | |
return transports | |
def monkey_patch_listeners(): | |
MatrixTransport._handle_invite = _handle_invite | |
MatrixTransport._handle_sync_messages = _handle_message_callback | |
def start_all(transports: List[MatrixTransport]) -> None: | |
greenlets_start_transport = [] | |
for transport in transports: | |
raiden_service = MockRaidenService() | |
raiden_service.rpc_client.chain_id = Networks.GOERLI.value | |
greenlets_start_transport.append( | |
gevent.spawn(transport.start, raiden_service, [], None) | |
) | |
gevent.joinall(greenlets_start_transport, raise_error=True) | |
def create_rooms(transports: List[MatrixTransport]): | |
room_creation_greenlets = [] | |
addresses = [transport._raiden_service.address for transport in transports] | |
for first_transport in transports: | |
addresses.remove(first_transport._raiden_service.address) | |
for second_transport in [ | |
partner | |
for partner in transports | |
if partner._raiden_service.address in addresses | |
]: | |
inviter = my_place_or_yours( | |
first_transport._raiden_service.address, | |
second_transport._raiden_service.address, | |
) | |
inviter_transport = ( | |
first_transport | |
if first_transport._raiden_service.address == inviter | |
else second_transport | |
) | |
invitee_transport = ( | |
first_transport | |
if second_transport._raiden_service.address == inviter | |
else second_transport | |
) | |
room_creation_greenlets.append( | |
gevent.spawn( | |
inviter_transport._client.create_room, | |
invitees=[invitee_transport._client.user_id], | |
) | |
) | |
gevent.joinall(room_creation_greenlets, raise_error=True) | |
def send_messages(transports: List[MatrixTransport], number_of_messages: int = 10): | |
send_greenlets = [] | |
for transport in transports: | |
rooms = transport._client.rooms | |
for room in rooms.values(): | |
send_greenlets.extend(send_batch(room, number_of_messages)) | |
gevent.joinall(send_greenlets) | |
def send_messages_to_device( | |
transports: List[MatrixTransport], number_of_messages: int = 10 | |
): | |
send_greenlets = [] | |
for sender in transports: | |
for receiver_user_id in [ | |
transport._client.user_id for transport in transports if transport != sender | |
]: | |
send_greenlets.extend( | |
send_batch_to_device(sender, receiver_user_id, number_of_messages) | |
) | |
gevent.joinall(send_greenlets) | |
def send_batch(room: Room, number_of_messages: int): | |
return [ | |
gevent.spawn(send_text, room=room, text=f"message {i}") | |
for i in range(number_of_messages) | |
] | |
def send_batch_to_device(transport, user_id, number_of_messages=10): | |
return [ | |
gevent.spawn( | |
send_to_device, transport=transport, user_id=user_id, text=f"message {i}" | |
) | |
for i in range(number_of_messages) | |
] | |
def send_text(room: Room, text: str) -> None: | |
try: | |
room.send_text(text=text) | |
message_sent_bag[room.room_id][room.client.user_id].append(text) | |
except MatrixRequestError as ex: | |
message_failed_bag[room.room_id][room.client.user_id].append(text) | |
def send_to_device(transport, user_id, text): | |
body = {user_id: {"*": {"msgtype": "m.text", "body": text}}} | |
transport._client.api.send_to_device(event_type="m.room.message", messages=body) | |
return | |
def main() -> None: | |
stop_event = Event() | |
monkey_patch_listeners() | |
transports = login_all() | |
start_all(transports) | |
create_rooms(transports) | |
gevent.sleep(5) | |
send_messages(transports, 100) | |
# send_messages_to_device(transports, 100) | |
gevent.sleep(30) | |
for room_id, sender_to_messages in message_sent_bag.items(): | |
for sender, sender_bag in sender_to_messages.items(): | |
print(f"room_id {room_id} sent {len(sender_bag)} messages by {sender}") | |
for room_id, sender_to_messages in message_failed_bag.items(): | |
for sender, sender_bag in sender_to_messages.items(): | |
print(f"room_id {room_id} failed {len(sender_bag)} messages by {sender}") | |
for room_id, sender_to_messages in message_received_bag.items(): | |
for sender, sender_bag in sender_to_messages.items(): | |
print(f"room_id {room_id} received {len(sender_bag)} messages by {sender}") | |
gevent.wait([stop_event], timeout=30) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment