Created
May 10, 2017 18:02
-
-
Save skyrocknroll/9d434a76400f64b7947bf9ae0b6128ed to your computer and use it in GitHub Desktop.
grpc locust
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from locust.stats import RequestStats | |
from locust import Locust, TaskSet, task, events | |
import os | |
import sys, getopt, argparse | |
from random import randint,random | |
import json | |
from locust.events import EventHook | |
import requests | |
import re | |
import grpc | |
from seldon.rpc import seldon_pb2 | |
from google.protobuf import any_pb2 | |
import time | |
def parse_arguments(): | |
parser = argparse.ArgumentParser(prog='locust') | |
parser.add_argument('--host') | |
parser.add_argument('--clients') | |
parser.add_argument('--hatch-rate') | |
parser.add_argument('--master', action='store_true') | |
args, unknown = parser.parse_known_args() | |
#args = parser.parse_args() | |
opts = vars(args) | |
print args | |
return args.host, int(args.clients), int(args.hatch_rate) | |
HOST, MAX_USERS_NUMBER, USERS_PER_SECOND = parse_arguments() | |
slaves_connect = [] | |
slave_report = EventHook() | |
ALL_SLAVES_CONNECTED = False | |
SLAVES_NUMBER = 1 | |
def on_my_event(client_id,data): | |
""" | |
Waits for all slaves to be connected and launches the swarm | |
:param client_id: | |
:param data: | |
:return: | |
""" | |
global ALL_SLAVES_CONNECTED | |
if not ALL_SLAVES_CONNECTED: | |
print "Event was fired with arguments" | |
if client_id not in slaves_connect: | |
slaves_connect.append(client_id) | |
if len(slaves_connect) == SLAVES_NUMBER: | |
print "All Slaves Connected" | |
ALL_SLAVES_CONNECTED = True | |
print events.slave_report._handlers | |
header = {'Content-Type': 'application/x-www-form-urlencoded'} | |
r = requests.post('http://127.0.0.1:8089/swarm',data={'hatch_rate':USERS_PER_SECOND,'locust_count':MAX_USERS_NUMBER},headers=header) | |
import resource | |
rsrc = resource.RLIMIT_NOFILE | |
soft, hard = resource.getrlimit(rsrc) | |
print 'RLIMIT_NOFILE soft limit starts as :', soft | |
#resource.setrlimit(rsrc, (65535, hard)) #limit to one kilobyte | |
soft, hard = resource.getrlimit(rsrc) | |
print 'RLIMIT_NOFILE soft limit changed to :', soft | |
events.slave_report += on_my_event # Register method in slaves report event | |
class GrpcLocust(Locust): | |
def __init__(self, *args, **kwargs): | |
super(GrpcLocust, self).__init__(*args, **kwargs) | |
class ApiUser(GrpcLocust): | |
min_wait=900 # Min time between requests of each user | |
max_wait=1100 # Max time between requests of each user | |
stop_timeout= 1000000 # Stopping time | |
class task_set(TaskSet): | |
def getEnviron(self,key,default): | |
if key in os.environ: | |
return os.environ[key] | |
else: | |
return default | |
def getToken(self): | |
consumer_key = self.getEnviron('SELDON_OAUTH_KEY',"oauthkey") | |
consumer_secret = self.getEnviron('SELDON_OAUTH_SECRET',"oauthsecret") | |
params = {} | |
params["consumer_key"] = consumer_key | |
params["consumer_secret"] = consumer_secret | |
url = self.oauth_endpoint+"/token" | |
r = requests.get(url,params=params) | |
if r.status_code == requests.codes.ok: | |
j = json.loads(r.text) | |
print j | |
return j["access_token"] | |
else: | |
print "failed call to get token" | |
return None | |
def on_start(self): | |
""" | |
get token | |
:return: | |
""" | |
print "on start" | |
self.oauth_endpoint = self.getEnviron('SELDON_OAUTH_ENDPOINT',"http://127.0.0.1:30015") | |
self.token = self.getToken() | |
self.grpc_endpoint = self.getEnviron('SELDON_GRPC_ENDPOINT',"127.0.0.1:30017") | |
self.data_size = int(self.getEnviron('SELDON_DEFAULT_DATA_SIZE',"784")) | |
@task | |
def get_prediction(self): | |
channel = grpc.insecure_channel(self.grpc_endpoint) | |
stub = seldon_pb2.SeldonStub(channel) | |
fake_data = [random() for i in range(0,self.data_size)] | |
data = seldon_pb2.DefaultCustomPredictRequest(values=fake_data) | |
dataAny = any_pb2.Any() | |
dataAny.Pack(data) | |
meta = seldon_pb2.ClassificationRequestMeta(puid=str(randint(0,99999999))) | |
metadata = [(b'oauth_token', self.token)] | |
request = seldon_pb2.ClassificationRequest(meta=meta,data=dataAny) | |
start_time = time.time() | |
try: | |
reply = stub.Classify(request,999,metadata=metadata) | |
except xmlrpclib.Fault as e: | |
total_time = int((time.time() - start_time) * 1000) | |
events.request_failure.fire(request_type="grpc", name=HOST, response_time=total_time, exception=e) | |
else: | |
total_time = int((time.time() - start_time) * 1000) | |
events.request_success.fire(request_type="grpc", name=HOST, response_time=total_time, response_length=0) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
As locust uses gevent, grpc needs to be inited with gevent support:
After this, my adaptation of this code started to work. If I don't do this, grpc fails to read my request_iterator as the gevent monkey patching breaks thread fork model for grpc.
This is with
grpcio==1.23.0
.