Created
June 8, 2017 12:51
-
-
Save bradhowes/bea76e850d3360a39e0077666e64c973 to your computer and use it in GitHub Desktop.
Simple Python 3 Performance Test Script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import websockets | |
import json | |
import pickle | |
import sys | |
import multiprocessing | |
import random | |
import time | |
from collections import deque | |
from datetime import datetime | |
from urllib import parse | |
from aiohttp import ClientSession | |
from cannedAccounts import cannedAccountsInfo | |
class Timer(object): | |
def __init__(self): | |
self.pendingTags = deque() | |
self.log = [] | |
@staticmethod | |
def now(): | |
return datetime.utcnow() | |
def begin(self, tag): | |
self.pendingTags.append((tag, self.now())) | |
def end(self, tag): | |
tag, startTime = self.pendingTags.pop() | |
endTime = self.now() | |
delta = endTime - startTime | |
self.log.append((endTime.isoformat(), tag, delta)) | |
def exceptioned(self, exception): | |
endTime = self.now() | |
endTimeFormatted = endTime.isoformat() | |
while len(self.pendingTags) > 0: | |
tag, startTime = self.pendingTags.pop() | |
self.timer.append((endTimeFormatted, tag + ' *** failed: {}'.format(exception.status), 0.0)) | |
def __iter__(self): | |
return iter(self.log) | |
def __len__(self): | |
return len(self.log) | |
class RESTFailure(Exception): | |
def __init__(self, resp): | |
self.status = resp.status | |
class Mattermost(object): | |
def __init__(self, loop, team = 'PersonalTouch'): | |
self.base = 'https://enrico.teaches-yoga.com/api/v3/' | |
self.loop = loop | |
self.session = ClientSession(loop = loop) | |
self.tasks = [] | |
self.headers = {'Content-Type': 'application/json'} | |
self._authToken = None | |
@property | |
def authToken(self): | |
return self._authToken | |
@authToken.setter | |
def authToken(self, value): | |
self._authToken = value | |
self.headers['Authorization'] = 'Bearer ' + value | |
def enqueue(self, method, *args): | |
self.tasks.append(asyncio.ensure_future(method(*args))) | |
def done(self): | |
self.loop.run_until_complete(asyncio.wait(self.tasks)) | |
results = [each.result() for each in self.tasks] | |
self.tasks = [] | |
# print(results) | |
return results | |
async def login(self, team, loginId, password): | |
self.team = team | |
self.loginId = loginId | |
self.password = password | |
url = '{base}users/login'.format(base = self.base) | |
payload = {'name': team, 'login_id': loginId, 'password': password} | |
async with self.session.post(url, headers = self.headers, data = json.dumps(payload)) as resp: | |
if resp.status != 200: | |
raise RESTFailure(resp) | |
payload = await resp.json() | |
return resp.headers['Token'], payload | |
async def fetchTeams(self): | |
url = '{base}teams/members'.format(base = self.base) | |
async with self.session.get(url, headers = self.headers) as resp: | |
if resp.stats != 200: | |
raise RESTFailure(resp) | |
payload = await resp.json() | |
return payload | |
async def fetchChannels(self): | |
url = '{base}teams/{teamId}/channels/'.format(base = self.base, teamId = self.teamId) | |
async with self.session.get(url, headers = self.headers) as resp: | |
if resp.status != 200: | |
raise RESTFailure(resp) | |
payload = await resp.json() | |
return payload | |
async def fetchRoster(self): | |
url = '{base}teams/{teamId}/channels/{channelId}/users/0/100'.format( | |
base = self.base, teamId = self.teamId, channelId = self.channelId) | |
async with self.session.get(url, headers = self.headers) as resp: | |
if resp.status != 200: | |
raise RESTFailure(resp) | |
payload = await resp.json() | |
return payload | |
async def fetchPosts(self): | |
url = '{base}teams/{teamId}/channels/{channelId}/posts/since/{since}'.format( | |
base = self.base, teamId = self.teamId, channelId = self.channelId, since = 0) | |
async with self.session.get(url, headers = self.headers) as resp: | |
if resp.status != 200: | |
raise RESTFailure(resp) | |
payload = await resp.json() | |
return payload | |
async def markChannelRead(self): | |
url = '{base}teams/{teamId}/channels/view'.format( | |
base = self.base, teamId = self.teamId) | |
payload = {'channel_id': self.channelId} | |
async with self.session.post(url, headers = self.headers, data = json.dumps(payload)) as resp: | |
if resp.status != 200: | |
raise RESTFailure(resp) | |
payload = await resp.json() | |
return payload | |
async def makeWebSocket(self): | |
url = '{base}users/websocket'.format(base = self.base) | |
parts = list(parse.urlparse(url)) | |
parts[0] = 'wss' | |
url = parse.urlunparse(parts) | |
async with websockets.connect(url, extra_headers = self.headers ) as websocket: | |
payload = { | |
'seq': 1, | |
'action': 'authentication_challenge', | |
'data': {'token': self.authToken} | |
} | |
await websocket.send(json.dumps(payload, ensure_ascii = False)) | |
while True: | |
response = json.loads(await websocket.recv()) | |
if response.get('event') == 'posted': | |
tmp = json.loads(response['data']['post']) | |
if tmp['message'].endswith('Perf test message'): | |
return response | |
async def newMessage(self): | |
url = '{base}teams/{teamId}/channels/{channelId}/posts/create'.format( | |
base = self.base, teamId = self.teamId, channelId = self.channelId) | |
payload = {'id': '', | |
'create_at': 0, | |
'update_at': 0, | |
'delete_at': 0, | |
'user_id': self.userId, | |
'channel_id': self.channelId, | |
'root_id': '', | |
'parent_id': '', | |
'original_id': '', | |
'type': '', | |
'props': {}, | |
'message': Timer.now().isoformat() + ' Perf test message' | |
} | |
async with self.session.post(url, headers = self.headers, data = json.dumps(payload)) as resp: | |
if resp.status != 200: | |
raise RESTFailure(resp) | |
response = await resp.json() | |
return response | |
def run(pid, delay, userName, password): | |
timer = Timer() | |
try: | |
loop = asyncio.get_event_loop() | |
mm = Mattermost(loop) | |
timer.begin('delay') | |
time.sleep(delay) | |
timer.end('delay') | |
# Login | |
# | |
timer.begin('total') | |
timer.begin('login') | |
mm.enqueue(mm.login, 'PersonalTouch', userName, password) | |
mm.authToken, us = mm.done()[0] | |
mm.userId = us['id'] | |
timer.end('login') | |
# Fetch Team ID | |
# | |
timer.begin('fetchTeams') | |
mm.enqueue(mm.fetchTeams) | |
mm.teamId = mm.done()[0][0]['team_id'] | |
timer.end('fetchTeams') | |
# Fetch Channels | |
# | |
timer.begin('fetchChannels') | |
mm.enqueue(mm.fetchChannels) | |
results = mm.done()[0] | |
timer.end('fetchChannels') | |
for each in results: | |
if each['name'].startswith('taskme'): | |
mm.channelId = each['id'] | |
break | |
else: | |
assert hasattr(mm, 'channelId') | |
# Create WebSocket, fetch roster, fetch posts | |
# | |
timer.begin('fetchPosts') | |
mm.websocket = asyncio.ensure_future(mm.makeWebSocket()) | |
mm.enqueue(mm.fetchRoster) | |
mm.enqueue(mm.fetchPosts) | |
results = mm.done() | |
timer.end('fetchPosts') | |
# Mark channel as 'read' | |
# | |
timer.begin('markChannelRead') | |
mm.enqueue(mm.markChannelRead) | |
results = mm.done() | |
timer.end('markChannelRead') | |
# Post new message | |
# | |
timer.begin('newMessage') | |
mm.enqueue(mm.newMessage) | |
results = mm.done() | |
timer.end('newMessage') | |
# Receive message over WebSocket | |
# | |
timer.begin('receiveMessage') | |
loop.run_until_complete(asyncio.wait([mm.websocket])) | |
result = mm.websocket.result() | |
timer.end('receiveMessage') | |
timer.end('total') | |
except (RESTFailure,) as exception: | |
timer.exceptioned(exception) | |
finally: | |
mm.session.close() | |
loop.close() | |
pickle.dump(timer.log, open('perf_user_{}.pkl'.format(pid), 'wb')) | |
if __name__ == '__main__': | |
count = int(sys.argv[1]) | |
duration = int(sys.argv[2]) | |
lmbda = count / duration | |
delays = [] | |
delay = 0.1 | |
for each in range(count): | |
delays.append(delay) | |
delay += random.expovariate(lmbda) | |
print(delays) | |
customers = cannedAccountsInfo['customers'] | |
processes = [] | |
for index in range(count): | |
customer = customers[index] | |
p = multiprocessing.Process(target = run, args = (index, delays[index], customer['userName'], | |
customer['password'])) | |
p.start() | |
processes.append(p) | |
with open('perf_times_{}_{}.csv'.format(count, duration), 'w') as output: | |
for index, p in enumerate(processes): | |
p.join() | |
log = pickle.load(open('perf_user_{}.pkl'.format(index), 'rb')) | |
if index == 0: | |
tags = ['Id'] + [z[1] for z in log] | |
output.write(','.join(tags) + '\n') | |
output.write(','.join([str(index)] + [str(z[-1]) for z in log]) + '\n') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment