Created
July 17, 2018 11:22
-
-
Save ostronom/308ecc654d5ee8b07a06121d5a74d9cd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#/usr/bin/python | |
import sys | |
sys.path.append("/usr/local/lib/python2.7/site-packages") | |
import random | |
import grpc | |
from interface import authentication_pb2 | |
from interface import authentication_pb2_grpc | |
from interface import groups_pb2 | |
from interface import groups_pb2_grpc | |
from interface import messaging_pb2 | |
from interface import messaging_pb2_grpc | |
from interface import peers_pb2 | |
class Smoke(object): | |
def __init__(self, address): | |
self.chan = grpc.insecure_channel(address) | |
self.auth = authentication_pb2_grpc.AuthenticationStub(self.chan) | |
self.groups = groups_pb2_grpc.GroupsStub(self.chan) | |
self.messaging = messaging_pb2_grpc.MessagingStub(self.chan) | |
@property | |
def metadata(self): | |
return (('x-auth-ticket', self.token),) | |
def authenticate_connection(self): | |
resp = self.auth.RegisterDeprecatedDevice( | |
authentication_pb2.RegisterDeprecatedDeviceRequest() | |
) | |
self.token = resp.token | |
self.auth_id = resp.auth_id | |
def phone_auth(self): | |
self.used_phone = random.randint(700000000, 800000000) | |
transaction = self.auth.StartPhoneAuth( | |
authentication_pb2.RequestStartPhoneAuth( | |
phone_number = self.used_phone, | |
app_id = 1, | |
api_key = "api_key", | |
device_title = "device_title" | |
), metadata=self.metadata | |
) | |
try: | |
self.auth.ValidateCode( | |
authentication_pb2.RequestValidateCode( | |
transaction_hash=transaction.transaction_hash, | |
code = '1234' | |
), metadata=self.metadata | |
) | |
except grpc._channel._Rendezvous as e: | |
# Yikes.. | |
if e.details() == "Unnoccupied phone number.": | |
pass | |
else: | |
raise | |
resp = self.auth.SignUp( | |
authentication_pb2.RequestSignUp( | |
transaction_hash=transaction.transaction_hash, | |
name="Adolf Hitler" | |
), metadata=self.metadata | |
) | |
self.user = resp.user | |
self.config = resp.config | |
print self.user | |
def random_id(self): | |
return random.randint(0, sys.maxsize) | |
def create_group(self, name): | |
return self.groups.CreateGroup( | |
groups_pb2.RequestCreateGroup( | |
rid=self.random_id(), | |
title=name, | |
group_type=groups_pb2.GROUPTYPE_GROUP | |
), metadata=self.metadata | |
) | |
def send_message(self, peer, text): | |
message = messaging_pb2.MessageContent( | |
textMessage=messaging_pb2.TextMessage(text=text) | |
) | |
print self.messaging.SendMessage( | |
messaging_pb2.RequestSendMessage( | |
peer=peer, | |
rid=self.random_id(), | |
message=message | |
), metadata=self.metadata | |
) | |
def run(self): | |
self.authenticate_connection() | |
self.phone_auth() | |
group = self.create_group("MY GROUP") | |
self.send_message( | |
peers_pb2.OutPeer( | |
type=peers_pb2.PEERTYPE_GROUP, | |
id=group.group.id, | |
access_hash=group.group.access_hash | |
), | |
"HELLO FROM GRPC" | |
) | |
print 'ok', self.token, self.auth_id | |
if __name__ == '__main__': | |
Smoke('35.228.128.125:8080').run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment