Skip to content

Instantly share code, notes, and snippets.

@NMelis
Created June 20, 2018 09:00
Show Gist options
  • Save NMelis/6ef3b615ce7dffc7951899434dd87700 to your computer and use it in GitHub Desktop.
Save NMelis/6ef3b615ce7dffc7951899434dd87700 to your computer and use it in GitHub Desktop.
import gevent
import websocket
from websocket import create_connection
from locust import HttpLocust, TaskSet, task
from locust.events import request_success, request_failure
class EchoTaskSet(TaskSet):
def on_start(self):
token = self.get_token()
def _receive():
while True:
try:
ws.recv()
except websocket._exceptions.WebSocketTimeoutException:
request_success.fire(
request_type='WS CONNECT',
name='/ws/',
response_time=0,
response_length=0,
)
try:
ws = create_connection("ws://127.0.0.1:8000/ws/", header={'Authorization': 'Bearer ' + token})
ws.timeout = 1
self.ws = ws
gevent.spawn(_receive)
except:
request_failure.fire(
request_type='WS CONNECT',
name='/ws/',
response_time=0,
exception='Took too long to shut down and was killed.'
)
@task
def con(self):
pass
def on_quit(self):
self.ws.close()
def get_token(self, username='test', password='test'):
response = self.client.post("/api/v0/auth/", data={
'username': username,
'password': password,
})
assert response.json()['token']
return response.json()['token']
class MyLocust(HttpLocust):
task_set = EchoTaskSet
min_wait = 5000
max_wait = 15000
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment