Created
March 6, 2013 23:33
-
-
Save farshidce/5104239 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import random | |
from threading import Thread | |
import unittest | |
from TestInput import TestInputSingleton | |
import mc_bin_client | |
import time | |
import uuid | |
import logger | |
import datetime | |
from membase.api.rest_client import RestConnection, RestHelper | |
from membase.helper.rebalance_helper import RebalanceHelper | |
from membase.helper.bucket_helper import BucketOperationHelper | |
from membase.helper.cluster_helper import ClusterOperationHelper | |
from memcached.helper.data_helper import MemcachedClientHelper, VBucketAwareMemcached | |
from remote.remote_util import RemoteMachineShellConnection, RemoteMachineHelper | |
from results_helper import ResultsHelper | |
class MemcapableTestBase(object): | |
log = None | |
keys = None | |
servers = None | |
input = None | |
test = None | |
bucket_port = None | |
bucket_name = None | |
def setUp_bucket(self, bucket_name, port, bucket_type, unittest): | |
self.log = logger.Logger.get_logger() | |
self.input = TestInputSingleton.input | |
unittest.assertTrue(self.input, msg="input parameters missing...") | |
self.test = unittest | |
self.master = self.input.servers[0] | |
self.bucket_port = port | |
self.bucket_name = bucket_name | |
ClusterOperationHelper.cleanup_cluster([self.master]) | |
BucketOperationHelper.delete_all_buckets_or_assert([self.master], self.test) | |
self._create_default_bucket(unittest) | |
def _create_default_bucket(self, unittest): | |
name = "default" | |
master = self.master | |
rest = RestConnection(master) | |
helper = RestHelper(RestConnection(master)) | |
if not helper.bucket_exists(name): | |
node_ram_ratio = BucketOperationHelper.base_bucket_ratio(TestInputSingleton.input.servers) | |
info = rest.get_nodes_self() | |
available_ram = info.memoryQuota * node_ram_ratio | |
rest.create_bucket(bucket=name, ramQuotaMB=int(available_ram)) | |
ready = BucketOperationHelper.wait_for_memcached(master, name) | |
BucketOperationHelper.wait_for_vbuckets_ready_state(master, name) | |
unittest.assertTrue(ready, msg="wait_for_memcached failed") | |
unittest.assertTrue(helper.bucket_exists(name), | |
msg="unable to create {0} bucket".format(name)) | |
def set_test(self, key, exp, flags, values): | |
serverInfo = self.master | |
client = MemcachedClientHelper.proxy_client(serverInfo, self.bucket_name,) | |
# self.log.info('Waitting 15 seconds for memcached started') | |
# time.sleep(15) | |
for v in values: | |
for f in flags: | |
client.set(key, exp, f, v) | |
flags_v, cas_v, get_v = client.get(key) | |
if get_v == v: | |
if flags_v == f: | |
self.log.info('Flags is set to {0}; and when run get {1}'.format(f, flags_v)) | |
else: | |
self.test.fail('FAILED. Flags is set to {0}; and when run get {1}'.format(f, flags_v)) | |
self.log.info('Value is set {0}; and when run get {1}'.format(v, get_v)) | |
else: | |
self.test.fail('FAILED. Value is set to {0}; and when run get {1}'.format(v, get_v)) | |
# Instead of checking the value before incrementing, | |
# you can simply ADD it instead before incrementing each time. | |
# If it's already there, your ADD is ignored, and if it's not there, it's set. | |
def incr_test(self, key, exp, flags, value, incr_amt, decr_amt, incr_time): | |
global update_value | |
serverInfo = self.master | |
client = MemcachedClientHelper.proxy_client(serverInfo, self.bucket_name) | |
# self.log.info('Waitting 15 seconds for memcached started') | |
# time.sleep(15) | |
if key != 'no_key': | |
client.set(key, exp, flags, value) | |
if exp: | |
self.log.info('Wait {0} seconds for the key expired' .format(exp + 2)) | |
time.sleep(exp + 2) | |
if decr_amt: | |
c, d = client.decr(key, decr_amt) | |
self.log.info('decr amt {0}' .format(c)) | |
try: | |
i = 0 | |
while i < incr_time: | |
update_value, cas = client.incr(key, incr_amt) | |
i += 1 | |
self.log.info('incr {0} times with value {1}'.format(incr_time, incr_amt)) | |
return update_value | |
except mc_bin_client.MemcachedError as error: | |
self.log.info('memcachedError : {0}'.format(error.status)) | |
self.test.fail("unable to increment value: {0}".format(incr_amt)) | |
def decr_test(self, key, exp, flags, value, incr_amt, decr_amt, decr_time): | |
global update_value | |
serverInfo = self.master | |
client = MemcachedClientHelper.proxy_client(serverInfo, self.bucket_name) | |
if key != 'no_key': | |
client.set(key, exp, flags, value) | |
if exp: | |
self.log.info('Wait {0} seconds for the key expired' .format(exp + 2)) | |
time.sleep(exp + 2) | |
if incr_amt: | |
c, d = client.incr(key, incr_amt) | |
self.log.info('incr amt {0}' .format(c)) | |
i = 0 | |
while i < decr_time: | |
update_value, cas = client.decr(key, decr_amt) | |
i += 1 | |
self.log.info('decr {0} times with value {1}'.format(decr_time, decr_amt)) | |
return update_value | |
class SimpleSetMembaseBucketDefaultPort(unittest.TestCase): | |
memcapableTestBase = None | |
log = logger.Logger.get_logger() | |
def setUp(self): | |
self.memcapableTestBase = MemcapableTestBase() | |
self.memcapableTestBase.setUp_bucket('default', 11211, 'membase', self) | |
def test_set_pos_int_value_pos_flag_key_never_expired(self): | |
key_test = 'has_key' | |
valuesList = ['0', '000', '4', '678', '6560987', '32456754', '0000000000', '00001000'] | |
exp_time = 0 | |
flagsList = [0, 0000, 00001, 34532, 453456, 0001000, 1100111100, 4294967295] | |
self.memcapableTestBase.set_test(key_test, exp_time, flagsList, valuesList) | |
def test_set_neg_int_value_pos_flag_key_never_expired(self): | |
key_test = 'has_key' | |
valuesList = ['-0', '-000', '-4', '-678', '-6560987', '-32456754', '-0000000000', '-00001000'] | |
exp_time = 0 | |
flagsList = [0, 0000, 00001, 34532, 453456, 0001000, 1100111100, 4294967295] | |
self.memcapableTestBase.set_test(key_test, exp_time, flagsList, valuesList) | |
def test_set_pos_float_value_pos_flag_key_never_expired(self): | |
key_test = 'has_key' | |
valuesList = ['0.00', '000.0', '4.6545', '678.87967', '6560987.0', '32456754.090987', '0000000000.0000001', | |
'00001000.008'] | |
exp_time = 0 | |
flagsList = [0, 0000, 00001, 34532, 453456, 0001000, 1100111100, 4294967295] | |
self.memcapableTestBase.set_test(key_test, exp_time, flagsList, valuesList) | |
def test_set_neg_float_value_pos_flag_key_never_expired(self): | |
key_test = 'has_key' | |
valuesList = ['-0.00', '-000.0', '-4.6545', '-678.87967', '-6560987.0', '-32456754.090987', | |
'-0000000000.0000001', '-00001000.008'] | |
exp_time = 0 | |
flagsList = [0, 0000, 00001, 34532, 453456, 0001000, 1100111100, 4294967295] | |
self.memcapableTestBase.set_test(key_test, exp_time, flagsList, valuesList) | |
class SimpleIncrMembaseBucketDefaultPort(unittest.TestCase): | |
memcapableTestBase = None | |
log = logger.Logger.get_logger() | |
def setUp(self): | |
self.memcapableTestBase = MemcapableTestBase() | |
self.memcapableTestBase.setUp_bucket('default', 11211, 'membase', self) | |
def test_incr_an_exist_key_never_exp(self): | |
key_test = 'has_key' | |
value = '10' | |
decr_amt = 0 | |
incr_amt = 5 | |
incr_time = 10 | |
update_v = self.memcapableTestBase.incr_test(key_test, 0, 0, value, incr_amt, decr_amt, incr_time) | |
if update_v == (int(value) + incr_amt * incr_time): | |
self.log.info('Value update correctly. Expected value {0}. Tested value {1}'\ | |
.format((int(value) + incr_amt * incr_time), update_v)) | |
else: | |
self.fail("FAILED test_incr_an_exist_key_never_exp. Original value %s. \ | |
Expected value %d" % (value, int(value) + incr_amt * incr_time)) | |
def test_incr_non_exist_key(self): | |
key_test = 'no_key' | |
value = '10' | |
decr_amt = 0 | |
incr_amt = 5 | |
incr_time = 10 | |
update_v = self.memcapableTestBase.incr_test(key_test, 0, 0, value, incr_amt, decr_amt, incr_time) | |
if update_v == incr_amt * (incr_time - 1): | |
self.log.info('Value update correctly. Expected value {0}. Tested value {1}'\ | |
.format(incr_amt * (incr_time - 1), update_v)) | |
else: | |
self.fail("FAILED test_incr_non_exist_key") | |
def test_incr_with_exist_key_and_expired(self): | |
key_test = 'expire_key' | |
value = '10' | |
exp_time = 5 | |
decr_amt = 0 | |
incr_amt = 5 | |
incr_time = 10 | |
update_v = self.memcapableTestBase.incr_test(key_test, exp_time, 0, value, incr_amt, decr_amt, incr_time) | |
if update_v == incr_amt * (incr_time - 1): | |
self.log.info('Value update correctly. Expected value {0}. Tested value {1}'\ | |
.format(incr_amt * (incr_time - 1), update_v)) | |
else: | |
self.fail("FAILED test_incr_with_exist_key_and_expired") | |
def test_incr_with_exist_key_decr_then_incr_never_expired(self): | |
key_test = 'has_key' | |
value = '101' | |
exp_time = 0 | |
decr_amt = 10 | |
incr_amt = 5 | |
incr_time = 10 | |
update_v = self.memcapableTestBase.incr_test(key_test, exp_time, 0, value, incr_amt, decr_amt, incr_time) | |
if update_v == (int(value) - decr_amt + incr_amt * incr_time): | |
self.log.info('Value update correctly. Expected value {0}. Tested value {1}'\ | |
.format(int(value) - decr_amt + incr_amt * incr_time, update_v)) | |
else: | |
self.fail("FAILED test_incr_with_exist_key_and_expired") | |
## this test will fail as expected | |
# def test_incr_with_non_int_key(self): | |
# key_test = 'has_key' | |
# value = 'abcd' | |
# exp_time = 0 | |
# decr_amt = 0 | |
# incr_amt = 5 | |
# incr_time = 10 | |
# self.assertRaises(self.memcapableTestBase.incr_test(key_test, exp_time, 0, value, incr_amt, decr_amt, incr_time)) | |
## self.assertRaises('Expected FAILED. Can not incr with string value') | |
class GetlTests(unittest.TestCase): | |
memcapableTestBase = None | |
log = logger.Logger.get_logger() | |
def setUp(self): | |
self.memcapableTestBase = MemcapableTestBase() | |
self.memcapableTestBase.setUp_bucket('default', 11211, 'membase', self) | |
#set an item for 5 seconds | |
#getl for 15 seconds and verify that setting the item again | |
#throes Data exists | |
def _getl_body(self, prefix, getl_timeout, expiration): | |
node = self.memcapableTestBase.master | |
mc = MemcachedClientHelper.direct_client(node, "default") | |
key = "{0}_{1}".format(prefix, str(uuid.uuid4())) | |
self.log.info("setting key {0} with expiration {1}".format(key, expiration)) | |
mc.set(key, expiration, 0, key) | |
self.log.info("getl key {0} timeout {1}".format(key, getl_timeout)) | |
try: | |
mc.getl(key, getl_timeout) | |
except Exception as ex: | |
if getl_timeout < 0: | |
print ex | |
else: | |
raise | |
self.log.info("get key {0} which is locked now".format(key)) | |
flags_v, cas_v, get_v = mc.get(key) | |
self.assertEquals(get_v, key) | |
i = 0 | |
while i < 40: | |
self.log.info("setting key {0} with new value {1}".format(key, '*')) | |
try: | |
mc.set(key, 0, 0, '*') | |
break | |
except Exception as ex: | |
print ex | |
time.sleep(1) | |
print i | |
i += 1 | |
if getl_timeout > 30: | |
self.log.info("sleep for {0} seconds".format(30)) | |
time.sleep(30) | |
elif getl_timeout > 0: | |
self.log.info("sleep for {0} seconds".format(15 - getl_timeout)) | |
self.log.info("sleep for {0} seconds".format(15)) | |
time.sleep(getl_timeout) | |
else: | |
self.log.info("sleep for {0} seconds".format(15)) | |
time.sleep(15) | |
self.log.info("lock should have timed out by now . try to set the item again") | |
new_value = "*" | |
self.log.info("setting key {0} with new value {1}".format(key, new_value)) | |
mc.set(key, 0, 0, new_value) | |
self.log.info("get key {0}".format(key)) | |
flags_v, cas_v, get_v = mc.get(key) | |
self.assertEquals(get_v, "*") | |
def test_getl_minus_one(self): | |
self._getl_body("getl_-1", -1, 0) | |
def test_getl_zero(self): | |
self._getl_body("getl_0", 0, 0) | |
def test_getl_five(self): | |
self._getl_body("getl_5", 15, 0) | |
def test_getl_ten(self): | |
self._getl_body("getl_10", 10, 0) | |
def test_getl_fifteen(self): | |
self._getl_body("getl_15", 15, 0) | |
def test_getl_thirty(self): | |
self._getl_body("getl_30", 30, 0) | |
def test_getl_sixty(self): | |
self._getl_body("getl_60", 60, 0) | |
def test_getl_expired_item(self): | |
prefix = "getl_expired_item" | |
expiration = 5 | |
getl_timeout = 15 | |
node = self.memcapableTestBase.master | |
mc = MemcachedClientHelper.direct_client(node, "default") | |
key = "{0}_{1}".format(prefix, str(uuid.uuid4())) | |
self.log.info("setting key {0} with expiration {1}".format(key, expiration)) | |
mc.set(key, expiration, 0, key) | |
self.log.info("getl key {0} timeout {1}".format(key, getl_timeout)) | |
mc.getl(key, getl_timeout) | |
self.log.info("get key {0} which is locked now".format(key)) | |
flags_v, cas_v, get_v = mc.get(key) | |
self.assertEquals(get_v, key) | |
if getl_timeout > 30: | |
self.log.info("sleep for {0} seconds".format(30)) | |
time.sleep(30) | |
elif getl_timeout > 0: | |
self.log.info("sleep for {0} seconds".format(getl_timeout)) | |
time.sleep(getl_timeout) | |
else: | |
self.log.info("sleep for {0} seconds".format(15)) | |
time.sleep(15) | |
self.log.info("get key {0} which was locked when it expired. should fail".format(key)) | |
try: | |
mc.get(key) | |
self.fail("get {0} should have raised not_found error".format(key)) | |
except mc_bin_client.MemcachedError as error: | |
self.log.info("raised exception as expected : {0}".format(error)) | |
self.log.info("item expired and lock should have timed out by now . try to set the item again") | |
new_value = "*" | |
self.log.info("setting key {0} with new value {1}".format(key, new_value)) | |
mc.set(key, 0, 0, new_value) | |
self.log.info("get key {0}".format(key)) | |
flags_v, cas_v, get_v = mc.get(key) | |
self.assertEquals(get_v, "*") | |
class GetrTests(unittest.TestCase): | |
memcapableTestBase = None | |
log = logger.Logger.get_logger() | |
NO_REBALANCE = 0 | |
DURING_REBALANCE = 1 | |
AFTER_REBALANCE = 2 | |
def setUp(self): | |
self.servers = TestInputSingleton.input.servers | |
self.input = TestInputSingleton.input | |
self.master = self.servers[0] | |
self.bucket_name = "default" | |
self.memcapableTestBase = MemcapableTestBase() | |
self.rest = RestConnection(self.master) | |
ClusterOperationHelper.cleanup_cluster([self.master]) | |
BucketOperationHelper.delete_all_buckets_or_assert([self.master], self) | |
def tearDown(self): | |
BucketOperationHelper.delete_all_buckets_or_assert([self.master], self) | |
ClusterOperationHelper.cleanup_cluster([self.master]) | |
def test_getr_1(self): | |
# GETR_TEST_1, 1 replica, all items retrieved | |
self._getr_body(item_count=10000, | |
replica_count=1, | |
expiration=0, | |
delay=0, | |
eject=False, | |
delete=False, | |
mutate=False, | |
warmup=False, | |
skipload=False, | |
rebalance=GetrTests.NO_REBALANCE) | |
def test_getr_2(self): | |
# GETR_TEST_2, 2 replica, all items retrieved | |
self._getr_body(item_count=10000, | |
replica_count=2, | |
expiration=0, | |
delay=0, | |
eject=False, | |
delete=False, | |
mutate=False, | |
warmup=False, | |
skipload=False, | |
rebalance=GetrTests.NO_REBALANCE) | |
def test_getr_3(self): | |
# GETR_TEST_3, 3 replica, all items retrieved | |
self._getr_body(item_count=10000, | |
replica_count=3, | |
expiration=0, | |
delay=0, | |
eject=False, | |
delete=False, | |
mutate=False, | |
warmup=False, | |
skipload=False, | |
rebalance=GetrTests.NO_REBALANCE) | |
def test_getr_4(self): | |
# GETR_TEST_4, 1 replica eject items, all items retrieved | |
self._getr_body(item_count=10000, | |
replica_count=1, | |
expiration=0, | |
delay=0, | |
eject=True, | |
delete=False, | |
mutate=False, | |
warmup=False, | |
skipload=False, | |
rebalance=GetrTests.NO_REBALANCE) | |
def test_getr_5(self): | |
# GETR_TEST_5, 1 replica expiration, all items retrieved | |
self._getr_body(item_count=10000, | |
replica_count=1, | |
expiration=300, | |
delay=0, | |
eject=False, | |
delete=False, | |
mutate=False, | |
warmup=False, | |
skipload=False, | |
rebalance=GetrTests.NO_REBALANCE) | |
def test_getr_6(self): | |
# GETR_TEST_6, 1 replica during rebalance in, all items retrieved | |
self._getr_body(item_count=10000, | |
replica_count=1, | |
expiration=0, | |
delay=0, | |
eject=False, | |
delete=False, | |
mutate=False, | |
warmup=False, | |
skipload=False, | |
rebalance=GetrTests.DURING_REBALANCE) | |
def test_getr_7(self): | |
# GETR_TEST_7, 1 replica after rebalance in, all items retrieved | |
self._getr_body(item_count=10000, | |
replica_count=1, | |
expiration=0, | |
delay=0, | |
eject=False, | |
delete=False, | |
mutate=False, | |
warmup=False, | |
skipload=False, | |
rebalance=GetrTests.AFTER_REBALANCE) | |
def test_getr_8(self): | |
# GETR_TEST_8, 1 replica warmup, all items retrieved | |
self._getr_body(item_count=10000, | |
replica_count=1, | |
expiration=0, | |
delay=0, | |
eject=False, | |
delete=False, | |
mutate=False, | |
warmup=True, | |
skipload=False, | |
rebalance=GetrTests.NO_REBALANCE) | |
def test_getr_9(self): | |
# GETR_TEST_9, 1 replica mutates, all items retrieved | |
self._getr_body(item_count=10000, | |
replica_count=1, | |
expiration=0, | |
delay=0, | |
eject=False, | |
delete=False, | |
mutate=True, | |
warmup=False, | |
skipload=False, | |
rebalance=GetrTests.NO_REBALANCE) | |
def test_getr_10(self): | |
# GETR_TEST_10, 1 replica deletes, all items retrieved | |
self._getr_body(item_count=10000, | |
replica_count=1, | |
expiration=0, | |
delay=0, | |
eject=False, | |
delete=True, | |
mutate=False, | |
warmup=False, | |
skipload=False, | |
rebalance=GetrTests.NO_REBALANCE) | |
def test_getr_1n(self): | |
# GETR_TESTN_1, non-existing items, ERR_NOT_FOUND | |
self._getr_body(item_count=10000, | |
replica_count=1, | |
expiration=0, | |
delay=0, | |
eject=False, | |
delete=False, | |
mutate=False, | |
warmup=False, | |
skipload=True, | |
rebalance=GetrTests.NO_REBALANCE) | |
def test_getr_2n(self): | |
# GETR_TESTN_2, 1 replica expired, ERR_NOT_FOUND | |
self._getr_body(item_count=10000, | |
replica_count=1, | |
expiration=15, | |
delay=35, | |
eject=False, | |
delete=False, | |
mutate=False, | |
warmup=False, | |
skipload=False, | |
rebalance=GetrTests.NO_REBALANCE) | |
def test_getr_3n(self): | |
# GETR_TESTN_3, 1 replica expired rebalance in, ERR_NOT_FOUND | |
self._getr_body(item_count=10000, | |
replica_count=1, | |
expiration=15, | |
delay=35, | |
eject=False, | |
delete=False, | |
mutate=False, | |
warmup=False, | |
skipload=False, | |
rebalance=GetrTests.AFTER_REBALANCE) | |
def test_getr_4n(self): | |
# GETR_TESTN_4, 0 replica, ENGINE_NOT_MY_VBUCKET | |
self._getr_body(item_count=10000, | |
replica_count=0, | |
expiration=0, | |
delay=0, | |
eject=False, | |
delete=False, | |
mutate=False, | |
warmup=False, | |
skipload=False, | |
rebalance=GetrTests.NO_REBALANCE) | |
def test_getr_5n(self): | |
# GETR_TESTN_5, 1 replica delete, ERR_NOT_FOUND | |
self._getr_body(item_count=10000, | |
replica_count=1, | |
expiration=0, | |
delay=0, | |
eject=False, | |
delete=True, | |
mutate=False, | |
warmup=False, | |
skipload=False, | |
rebalance=GetrTests.NO_REBALANCE) | |
def test_getr(self): | |
self._getr_body(item_count=self.input.param("item_count", 10000), | |
replica_count=self.input.param("replica_count", 1), | |
expiration=self.input.param("expiration", 0), | |
delay=self.input.param("delay", 0), | |
eject=self.input.param("eject", 0), | |
delete=self.input.param("delete", 0), | |
mutate=self.input.param("mutate", 0), | |
warmup=self.input.param("warmup", 0), | |
skipload=self.input.param("skipload", 0), | |
rebalance=self.input.param("rebalance", 0)) | |
def _getr_body(self, item_count, replica_count, expiration, delay, eject, delete, mutate, warmup, skipload, rebalance): | |
negative_test = False | |
if delay > expiration: | |
negative_test = True | |
if delete and not mutate: | |
negative_test = True | |
if skipload and not mutate: | |
negative_test = True | |
prefix = str(uuid.uuid4())[:7] | |
BucketOperationHelper.delete_all_buckets_or_assert([self.master], self) | |
BucketOperationHelper.create_bucket(self.master, name=self.bucket_name, replica=replica_count, port=11210, test_case=self, bucket_ram= -1, password="") | |
if rebalance == GetrTests.DURING_REBALANCE or rebalance == GetrTests.AFTER_REBALANCE: | |
# leave 1 node unclustered for rebalance in | |
ClusterOperationHelper.begin_rebalance_out(self.master, self.servers[-1:]) | |
ClusterOperationHelper.end_rebalance(self.master) | |
ClusterOperationHelper.begin_rebalance_in(self.master, self.servers[:-1]) | |
ClusterOperationHelper.end_rebalance(self.master) | |
else: | |
ClusterOperationHelper.begin_rebalance_in(self.master, self.servers) | |
ClusterOperationHelper.end_rebalance(self.master) | |
vprefix = "" | |
if not skipload: | |
self._load_items(item_count=item_count, expiration=expiration, prefix=prefix, vprefix=vprefix) | |
if not expiration: | |
RebalanceHelper.wait_for_stats_int_value(self.master, self.bucket_name, "curr_items_tot", item_count * (replica_count + 1), "<=", 600, True) | |
if delete: | |
self._delete_items(item_count=item_count, prefix=prefix) | |
if mutate: | |
vprefix = "mutated" | |
self._load_items(item_count=item_count, expiration=expiration, prefix=prefix, vprefix=vprefix) | |
self.assertTrue(RebalanceHelper.wait_for_replication(self.rest.get_nodes(), timeout=180), | |
msg="replication did not complete") | |
if eject: | |
self._eject_items(item_count=item_count, prefix=prefix) | |
if delay: | |
self.log.info("delaying for {0} seconds".format(delay)) | |
time.sleep(delay) | |
if rebalance == GetrTests.DURING_REBALANCE: | |
ClusterOperationHelper.begin_rebalance_in(self.master, self.servers) | |
if rebalance == GetrTests.AFTER_REBALANCE: | |
ClusterOperationHelper.end_rebalance(self.master) | |
if warmup: | |
self.log.info("restarting memcached") | |
command = "[rpc:multicall(ns_port_sup, restart_port_by_name, [memcached], 20000)]." | |
memcached_restarted = self.rest.diag_eval(command) | |
#wait until memcached starts | |
self.assertTrue(memcached_restarted, "unable to restart memcached process through diag/eval") | |
RebalanceHelper.wait_for_stats(self.master, self.bucket_name, "curr_items_tot", item_count * (replica_count + 1), 600) | |
count = self._getr_items(item_count=item_count, replica_count=replica_count, prefix=prefix, vprefix=vprefix) | |
if negative_test: | |
self.assertTrue(count == 0, "found items, expected none") | |
else: | |
self.assertTrue(count == replica_count * item_count, "expected {0} items, got {1} items".format(replica_count * item_count, count)) | |
if rebalance == GetrTests.DURING_REBALANCE: | |
ClusterOperationHelper.end_rebalance(self.master) | |
def _load_items(self, item_count, expiration, prefix, vprefix=""): | |
flags = 0 | |
client = MemcachedClientHelper.proxy_client(self.master, self.bucket_name) | |
time_start = time.time() | |
for i in range(item_count): | |
timeout_end = time.time() + 10 | |
passed = False | |
while time.time() < timeout_end and not passed: | |
try: | |
client.set(prefix + "_key_" + str(i), expiration, flags, vprefix + "_value_" + str(i)) | |
passed = True | |
except Exception as e: | |
self.log.error("failed to set key {0}, error: {1}".format(prefix + "_key_" + str(i), e)) | |
time.sleep(2) | |
self.assertTrue(passed, "exit due to set errors after {0} seconds".format(time.time() - time_start)) | |
self.log.info("loaded {0} items in {1} seconds".format(item_count, time.time() - time_start)) | |
def _get_items(self, item_count, prefix, vprefix=""): | |
client = MemcachedClientHelper.proxy_client(self.master, self.bucket_name) | |
time_start = time.time() | |
get_count = 0 | |
last_error = "" | |
error_count = 0 | |
for i in range(item_count): | |
try: | |
value = client.get(prefix + "_key_" + str(i))[2] | |
assert(value == vprefix + "_value_" + str(i)) | |
get_count += 1 | |
except Exception as e: | |
last_error = "failed to getr key {0}, error: {1}".format(prefix + "_key_" + str(i), e) | |
error_count += 1 | |
if error_count > 0: | |
self.log.error("got {0} errors, last error: {1}".format(error_count, last_error)) | |
self.log.info("got {0} replica items in {1} seconds".format(get_count, time.time() - time_start)) | |
return get_count | |
def _getr_items(self, item_count, replica_count, prefix, vprefix=""): | |
time_start = time.time() | |
get_count = 0 | |
last_error = "" | |
error_count = 0 | |
awareness = VBucketAwareMemcached(self.rest, self.bucket_name) | |
for r in range(replica_count): | |
for i in range(item_count): | |
retry = True | |
key = prefix + "_key_" + str(i) | |
while retry: | |
client = awareness.memcached(key, r) | |
try: | |
value = client.getr(prefix + "_key_" + str(i))[2] | |
assert(value == vprefix + "_value_" + str(i)) | |
get_count += 1 | |
retry = False | |
except mc_bin_client.MemcachedError as e: | |
last_error = "failed to getr key {0}, error: {1}".format(prefix + "_key_" + str(i), e) | |
error_count += 1 | |
if e.status == 7: | |
self.log.info("getting new vbucket map {0}") | |
awareness.reset(self.rest) | |
else: | |
retry = False | |
except Exception as e: | |
last_error = "failed to getr key {0}, error: {1}".format(prefix + "_key_" + str(i), e) | |
error_count += 1 | |
retry = False | |
if error_count > 0: | |
self.log.error("got {0} errors, last error: {1}".format(error_count, last_error)) | |
self.log.info("got {0} replica items in {1} seconds".format(get_count, time.time() - time_start)) | |
awareness.done() | |
return get_count | |
def _delete_items(self, item_count, prefix): | |
client = MemcachedClientHelper.proxy_client(self.master, self.bucket_name) | |
time_start = time.time() | |
for i in range(item_count): | |
timeout_end = time.time() + 10 | |
passed = False | |
while time.time() < timeout_end and not passed: | |
try: | |
client.delete(prefix + "_key_" + str(i)) | |
passed = True | |
except Exception as e: | |
self.log.error("failed to delete key {0}, error: {1}".format(prefix + "_key_" + str(i), e)) | |
time.sleep(2) | |
self.assertTrue(passed, "exit due to delete errors after {0} seconds".format(time.time() - time_start)) | |
self.log.info("deleted {0} items in {1} seconds".format(item_count, time.time() - time_start)) | |
def _eject_items(self, item_count, prefix): | |
client = MemcachedClientHelper.proxy_client(self.master, self.bucket_name) | |
time_start = time.time() | |
for i in range(item_count): | |
timeout_end = time.time() + 10 | |
passed = False | |
while time.time() < timeout_end and not passed: | |
try: | |
client.evict_key(prefix + "_key_" + str(i)) | |
passed = True | |
except Exception as e: | |
self.log.error("failed to eject key {0}, error: {1}".format(prefix + "_key_" + str(i), e)) | |
time.sleep(2) | |
self.assertTrue(passed, "exit due to eject errors after {0} seconds".format(time.time() - time_start)) | |
self.log.info("ejected {0} items in {1} seconds".format(item_count, time.time() - time_start)) | |
def _next_parameter(self, d, di): | |
carry = True | |
for k in d: | |
if carry: | |
di[k] += 1 | |
if di[k] >= len(d[k]): | |
carry = True | |
di[k] = 0 | |
else: | |
carry = False | |
return not carry | |
class SimpleDecrMembaseBucketDefaultPort(unittest.TestCase): | |
memcapableTestBase = None | |
log = logger.Logger.get_logger() | |
def setUp(self): | |
self.memcapableTestBase = MemcapableTestBase() | |
self.memcapableTestBase.setUp_bucket('default', 11211, 'membase', self) | |
def test_decr_an_exist_key_never_exp(self): | |
key_test = 'has_key' | |
value = '100' | |
exp_time = 0 | |
decr_amt = 5 | |
incr_amt = 0 | |
decr_time = 10 | |
update_v = self.memcapableTestBase.decr_test(key_test, exp_time, 0, value, incr_amt, decr_amt, decr_time) | |
if update_v == (int(value) - decr_amt * decr_time): | |
self.log.info('Value update correctly. Expected value {0}. Tested value {1}'\ | |
.format((int(value) - decr_amt * decr_time), update_v)) | |
else: | |
self.fail("FAILED test_decr_an_exist_key_never_exp. Original value %s. \ | |
Expected value %d" % (value, int(value) - decr_amt * decr_time)) | |
def test_decr_non_exist_key(self): | |
key_test = 'no_key' | |
value = '100' | |
exp_time = 0 | |
decr_amt = 5 | |
incr_amt = 0 | |
decr_time = 10 | |
update_v = self.memcapableTestBase.decr_test(key_test, exp_time, 0, value, incr_amt, decr_amt, decr_time) | |
if not update_v: | |
self.log.info('Value update correctly. Expected value 0. Tested value {0}'\ | |
.format(update_v)) | |
else: | |
self.fail('FAILED test_decr_non_exist_key. Expected value 0') | |
def test_decr_with_exist_key_and_expired(self): | |
key_test = 'has_key' | |
value = '100' | |
exp_time = 5 | |
decr_amt = 5 | |
incr_amt = 0 | |
decr_time = 10 | |
update_v = self.memcapableTestBase.decr_test(key_test, exp_time, 0, value, incr_amt, decr_amt, decr_time) | |
if not update_v: | |
self.log.info('Value update correctly. Expected value 0. Tested value {0}'\ | |
.format(update_v)) | |
else: | |
self.fail('FAILED test_decr_with_exist_key_and_expired. Expected value 0') | |
def test_decr_with_exist_key_incr_then_decr_never_expired(self): | |
key_test = 'has_key' | |
value = '100' | |
exp_time = 0 | |
decr_amt = 5 | |
incr_amt = 50 | |
decr_time = 10 | |
update_v = self.memcapableTestBase.decr_test(key_test, exp_time, 0, value, incr_amt, decr_amt, decr_time) | |
if update_v == (int(value) + incr_amt - decr_amt * decr_time): | |
self.log.info('Value update correctly. Expected value {0}. Tested value {1}'\ | |
.format(int(value) + incr_amt - decr_amt * decr_time, update_v)) | |
else: | |
self.fail( | |
"Expected value %d. Test result %d" % (int(value) + incr_amt - decr_amt * decr_time, update_v)) | |
class StatsAggregationDuringMemcachedOps(unittest.TestCase): | |
def setUp(self): | |
self.log = logger.Logger.get_logger() | |
self.params = TestInputSingleton.input.test_params | |
self.master = TestInputSingleton.input.servers[0] | |
rest = RestConnection(self.master) | |
rest.init_cluster(self.master.rest_username, self.master.rest_password) | |
info = rest.get_nodes_self() | |
rest.init_cluster_memoryQuota(self.master.rest_username, self.master.rest_password, | |
memoryQuota=info.mcdMemoryReserved) | |
ClusterOperationHelper.cleanup_cluster([self.master]) | |
ClusterOperationHelper.wait_for_ns_servers_or_assert([self.master], self) | |
self._create_default_bucket() | |
self.onenodemc = MemcachedClientHelper.direct_client(self.master, "default") | |
self.shutdown_load_data = False | |
if "results" in self.params: | |
self.results = ResultsHelper(self.params["results"]) | |
def _create_default_bucket(self): | |
name = "default" | |
master = self.master | |
rest = RestConnection(master) | |
helper = RestHelper(RestConnection(master)) | |
if not helper.bucket_exists(name): | |
node_ram_ratio = BucketOperationHelper.base_bucket_ratio(TestInputSingleton.input.servers) | |
info = rest.get_nodes_self() | |
available_ram = info.memoryQuota * node_ram_ratio | |
rest.create_bucket(bucket=name, ramQuotaMB=int(available_ram)) | |
ready = BucketOperationHelper.wait_for_memcached(master, name) | |
self.assertTrue(ready, msg="wait_for_memcached failed") | |
self.assertTrue(helper.bucket_exists(name), | |
msg="unable to create {0} bucket".format(name)) | |
self.threads = [] | |
self.shutdown_load_data = False | |
def test_stats_during_load(self): | |
#how many items | |
#how many iterations | |
#couchdb url | |
#which stats | |
if "items" in self.params: | |
items = int(self.params["items"]) | |
else: | |
items = 100 * 1000 | |
if "iterations" in self.params: | |
iterations = int(self.params["iterations"]) | |
else: | |
iterations = -1 | |
self.onenodemc.flush(1) | |
time.sleep(2) | |
rest = RestConnection(self.master) | |
tasks = rest.active_tasks() | |
while tasks: | |
self.log.info("found active tasks, waiting for them to complete. {0}".format(tasks)) | |
time.sleep(10) | |
self.log.info("set, items {0} , iterations {1}".format(items, iterations)) | |
t = Thread(name="loader-thread", target=self._load_data, args=(items, iterations)) | |
t.daemon = True | |
start = time.time() | |
t.start() | |
# shell = RemoteMachineShellConnection(self.master) | |
# mc_pid = RemoteMachineHelper(shell).is_process_running("memcached").pid | |
# beam_pid = RemoteMachineHelper(shell).is_process_running("beam.smp").pid | |
# m = Thread(name="cpu-stat-memcached", target=self._extract_proc_info, args=(shell,mc_pid,60)) | |
# b = Thread(name="cpu-stat-beam", target=self._extract_proc_info, args=(shell,beam_pid,60)) | |
# m.start() | |
# b.start() | |
# self.threads.extend([t, b, m]) | |
# time.sleep(1000) | |
t.join() | |
end = time.time() | |
msg = "set took {0} seconds to iterate over {1} items {2} times" | |
self.log.info(msg.format((end - start), items, iterations)) | |
result_data = { | |
"version":self.params["version"], | |
"product":"couchbase", | |
"nodes":1, | |
"operation":"set", | |
"items":items, | |
"value_size":256, | |
"key_size":36, | |
"json":False, | |
"active_task":None, | |
"ops_per_second":self.ops_per_second, | |
} | |
if self.results: | |
percent_change = self.results.compare_perf(result_data, | |
limit=1, | |
same_keys=["product", | |
"nodes", | |
"operation", | |
"value_size", | |
"key_size", | |
"json", | |
"active_task"], | |
different_keys={}, | |
result_key="ops_per_second") | |
self.results.add_perf(result_data) | |
for change in percent_change: | |
self.assertTrue(change >= 0.9, msg="test performace is less than 90% of previous run") | |
def test_stats_during_get(self): | |
#how many items | |
#how many iterations | |
#couchdb url | |
#which stats | |
if "items" in self.params: | |
items = int(self.params["items"]) | |
else: | |
items = 100 * 1000 | |
if "iterations" in self.params: | |
iterations = int(self.params["iterations"]) | |
else: | |
iterations = -1 | |
self.onenodemc.flush(1) | |
time.sleep(2) | |
self.log.info("preloading {0} items".format(items)) | |
keys = [str(uuid.uuid4()) for i in range(0, items)] | |
value = MemcachedClientHelper.create_value("*", 256) | |
for key in keys: | |
self.onenodemc.set(key, 0, 0, value) | |
RebalanceHelper.wait_for_stats(self.master, 'default', 'curr_items', len(keys)) | |
RebalanceHelper.wait_for_stats(self.master, 'default', 'ep_queue_size', 0) | |
rest = RestConnection(self.master) | |
tasks = rest.active_tasks() | |
while tasks: | |
self.log.info("found active tasks, waiting for them to complete. {0}".format(tasks)) | |
time.sleep(10) | |
self.log.info("get items {0} , iterations {1}".format(items, iterations)) | |
t = Thread(name="getter-thread", target=self._get_data, args=(keys, iterations)) | |
t.daemon = True | |
start = time.time() | |
t.start() | |
# shell = RemoteMachineShellConnection(self.master) | |
# mc_pid = RemoteMachineHelper(shell).is_process_running("memcached").pid | |
# beam_pid = RemoteMachineHelper(shell).is_process_running("beam.smp").pid | |
# m = Thread(name="cpu-stat-memcached", target=self._extract_proc_info, args=(shell,mc_pid,60)) | |
# b = Thread(name="cpu-stat-beam", target=self._extract_proc_info, args=(shell,beam_pid,60)) | |
# m.start() | |
# b.start() | |
# self.threads.extend([t, b, m]) | |
# time.sleep(1000) | |
t.join() | |
end = time.time() | |
msg = "get took {0} seconds to iterate over {1} items {2} times" | |
self.log.info(msg.format((end - start), items, iterations)) | |
result_data = { | |
"version":self.params["version"], | |
"product":"couchbase", | |
"nodes":1, | |
"operation":"get", | |
"items":items, | |
"value_size":256, | |
"key_size":36, | |
"json":False, | |
"active_task":None, | |
"ops_per_second":self.ops_per_second, | |
} | |
if self.results: | |
percent_change = self.results.compare_perf(result_data, | |
limit=1, | |
same_keys=["product", | |
"nodes", | |
"operation", | |
"value_size", | |
"key_size", | |
"json", | |
"active_task"], | |
different_keys={}, | |
result_key="ops_per_second") | |
self.results.add_perf(result_data) | |
for change in percent_change: | |
self.assertTrue(change >= 0.9, msg="test performace is less than 90% of previous run") | |
def tearDown(self): | |
self.shutdown_load_data = True | |
if self.threads: | |
for t in self.threads: | |
t.join() | |
def _extract_proc_info(self, shell, pid, frequency): | |
while not self.shutdown_load_data: | |
time.sleep(frequency) | |
o, r = shell.execute_command("cat /proc/{0}/stat".format(pid)) | |
# shell.log_command_output(o, r) | |
fields = ('pid comm state ppid pgrp session tty_nr tpgid flags minflt ' | |
'cminflt majflt cmajflt utime stime cutime cstime priority ' | |
'nice num_threads itrealvalue starttime vsize rss rsslim ' | |
'startcode endcode startstack kstkesp kstkeip signal blocked ' | |
'sigignore sigcatch wchan nswap cnswap exit_signal ' | |
'processor rt_priority policy delayacct_blkio_ticks ' | |
'guest_time cguest_time ').split(' ') | |
d = dict(zip(fields, o[0].split(' '))) | |
print d | |
def _load_data(self, items, iterations): | |
iteration = 0 | |
self.ops_per_second = 0 | |
ops_per_second = [] | |
keys = [str(uuid.uuid4()) for i in range(0, items)] | |
value = MemcachedClientHelper.create_value("*", 256) | |
while iteration < iterations: | |
start = time.time() | |
for key in keys: | |
self.onenodemc.set(key, 0, 0, value) | |
iteration += 1 | |
end = time.time() | |
msg = "set iteration #{0} took {1} seconds to iterate over {2} items" | |
self.log.info(msg.format(iteration, (end - start), items)) | |
ops_per_second.append(len(keys) / float(end - start)) | |
self.ops_per_second = sum(ops_per_second) / float(iterations) | |
def _get_data(self, keys, iterations): | |
iteration = 0 | |
self.ops_per_second = 0 | |
ops_per_second = [] | |
while iteration < iterations: | |
start = time.time() | |
for key in keys: | |
self.onenodemc.get(key) | |
iteration += 1 | |
end = time.time() | |
msg = "get iteration #{0} took {1} seconds to iterate over {2} items" | |
self.log.info(msg.format(iteration, (end - start), len(keys))) | |
ops_per_second.append(len(keys) / float(end - start)) | |
self.ops_per_second = sum(ops_per_second) / float(iterations) | |
class AppendTests(unittest.TestCase): | |
def setUp(self): | |
self.log = logger.Logger.get_logger() | |
self.params = TestInputSingleton.input.test_params | |
self.master = TestInputSingleton.input.servers[0] | |
rest = RestConnection(self.master) | |
rest.init_cluster(self.master.rest_username, self.master.rest_password) | |
info = rest.get_nodes_self() | |
rest.init_cluster_memoryQuota(self.master.rest_username, self.master.rest_password, | |
memoryQuota=info.mcdMemoryReserved) | |
ClusterOperationHelper.cleanup_cluster([self.master]) | |
ClusterOperationHelper.wait_for_ns_servers_or_assert([self.master], self) | |
self._create_default_bucket() | |
self.onenodemc = MemcachedClientHelper.direct_client(self.master, "default") | |
def _create_default_bucket(self): | |
name = "default" | |
master = self.master | |
rest = RestConnection(master) | |
helper = RestHelper(RestConnection(master)) | |
if not helper.bucket_exists(name): | |
node_ram_ratio = BucketOperationHelper.base_bucket_ratio(TestInputSingleton.input.servers) | |
info = rest.get_nodes_self() | |
available_ram = info.memoryQuota * node_ram_ratio | |
rest.create_bucket(bucket=name, ramQuotaMB=int(available_ram)) | |
ready = BucketOperationHelper.wait_for_memcached(master, name) | |
self.assertTrue(ready, msg="wait_for_memcached failed") | |
self.assertTrue(helper.bucket_exists(name), | |
msg="unable to create {0} bucket".format(name)) | |
self.load_thread = None | |
self.shutdown_load_data = False | |
def test_append_wrong_cas(self): | |
#monitor the memory usage , it should not go beyond | |
#doing append 20,000 times ( each 5k) mem_used should not increase more than | |
#10 percent | |
# | |
stats = self.onenodemc.stats() | |
initial_mem_used = -1 | |
if "mem_used" in stats: | |
initial_mem_used = int(stats["mem_used"]) | |
self.assertTrue(initial_mem_used > 0) | |
key = str(uuid.uuid4()) | |
size = 5 * 1024 | |
value = MemcachedClientHelper.create_value("*", size) | |
self.onenodemc.set(key, 0, 0, value) | |
flags_v, cas_v, get_v = self.onenodemc.get(key) | |
self.onenodemc.append(key, value, cas_v) | |
iteration = 50000 | |
for i in range(0, iteration): | |
try: | |
self.onenodemc.append(key, value, random.randint(0, 1000)) | |
except: | |
#ignoring the error here | |
pass | |
stats = self.onenodemc.stats() | |
if "mem_used" in stats: | |
delta = int(stats["mem_used"]) - initial_mem_used | |
self.log.info("initial mem_used {0}, current mem_used {1}".format(initial_mem_used, stats["mem_used"])) | |
self.log.info(delta) | |
def test_append_with_delete(self): | |
#monitor the memory usage , it should not go beyond | |
#doing append 20,000 times ( each 5k) mem_used should not increase more than | |
#10 percent | |
# | |
if "iteration" in self.params: | |
iteration = int(self.params["iteration"]) | |
else: | |
iteration = 50000 | |
if "items" in self.params: | |
items = int(self.params["items"]) | |
else: | |
items = 10000 | |
if "append_size" in self.params: | |
append_size = int(self.params["append_size"]) | |
else: | |
append_size = 5 * 1024 | |
append_iteration_before_delete = 100 | |
keys = [str(uuid.uuid4()) for i in range(0, items)] | |
size = 5 * 1024 | |
stats = self.onenodemc.stats() | |
initial_mem_used = -1 | |
self.log.info("items : {0} , iteration : {1} ".format(items, iteration)) | |
if "mem_used" in stats: | |
initial_mem_used = int(stats["mem_used"]) | |
self.assertTrue(initial_mem_used > 0) | |
for i in range(0, iteration): | |
for key in keys: | |
self.onenodemc.set(key, 0, 0, os.urandom(size)) | |
try: | |
for append_iteration in range(0, append_iteration_before_delete): | |
appened_value = MemcachedClientHelper.create_value("*", append_size) | |
for key in keys: | |
self.onenodemc.append(key, appened_value) | |
self.onenodemc.get(key) | |
except: | |
#ignoring the error here | |
pass | |
stats = None | |
for t in range(0, 10): | |
try: | |
stats = self.onenodemc.stats() | |
break | |
except: | |
pass | |
if stats and "mem_used" in stats: | |
delta = int(stats["mem_used"]) - initial_mem_used | |
#only print out if delta is more than 20% than it should be | |
# delta ahould be #items * size + #items * append | |
expected_delta = items * (size + append_iteration_before_delete * append_size * 1.0) | |
msg = "initial mem_used {0}, current mem_used {1} , delta : {2} , expected delta : {3} , increase percentage {4}" | |
self.log.info( | |
msg.format(initial_mem_used, stats["mem_used"], delta, expected_delta, delta / expected_delta)) | |
# if delta > (1.2 * expected_delta): | |
# self.fail("too much memory..") | |
# for key in keys: | |
# self.onenodemc.delete(key) | |
def tearDown(self): | |
self.shutdown_load_data = True | |
if self.load_thread: | |
self.load_thread.join() | |
# BucketOperationHelper.delete_all_buckets_or_assert([self.master], self) | |
class WarmUpMemcachedTest(unittest.TestCase): | |
def setUp(self): | |
self.log = logger.Logger.get_logger() | |
self.params = TestInputSingleton.input.test_params | |
self.master = TestInputSingleton.input.servers[0] | |
rest = RestConnection(self.master) | |
rest.init_cluster(self.master.rest_username, self.master.rest_password) | |
info = rest.get_nodes_self() | |
rest.init_cluster_memoryQuota(self.master.rest_username, self.master.rest_password, | |
memoryQuota=info.mcdMemoryReserved) | |
ClusterOperationHelper.cleanup_cluster([self.master]) | |
ClusterOperationHelper.wait_for_ns_servers_or_assert([self.master], self) | |
BucketOperationHelper.delete_all_buckets_or_assert([self.master], self) | |
# recreate bucket instead of flush | |
self._create_default_bucket() | |
self.onenodemc = MemcachedClientHelper.direct_client(self.master, "default") | |
self._log_start() | |
def tearDown(self): | |
ClusterOperationHelper.cleanup_cluster([self.master]) | |
BucketOperationHelper.delete_all_buckets_or_assert([self.master], self) | |
self._log_finish() | |
def _log_start(self): | |
try: | |
msg = "{0} : {1} started ".format(datetime.datetime.now(), self._testMethodName) | |
RestConnection(self.servers[0]).log_client_error(msg) | |
except: | |
pass | |
def _log_finish(self): | |
try: | |
msg = "{0} : {1} finished ".format(datetime.datetime.now(), self._testMethodName) | |
RestConnection(self.servers[0]).log_client_error(msg) | |
except: | |
pass | |
def _create_default_bucket(self): | |
name = "default" | |
master = self.master | |
rest = RestConnection(master) | |
node_ram_ratio = BucketOperationHelper.base_bucket_ratio(TestInputSingleton.input.servers) | |
info = rest.get_nodes_self() | |
available_ram = info.memoryQuota * node_ram_ratio | |
rest.create_bucket(bucket=name, ramQuotaMB=int(available_ram)) | |
ready = BucketOperationHelper.wait_for_memcached(master, name) | |
self.assertTrue(ready, msg="wait_for_memcached failed") | |
self.load_thread = None | |
self.shutdown_load_data = False | |
def _insert_data(self, howmany): | |
# prefix = str(uuid.uuid4()) | |
items = ["{0}-{1}".format(str(uuid.uuid4()), i) for i in range(0, howmany)] | |
for item in items: | |
self.onenodemc.set(item, 0, 0, item) | |
self.log.info("inserted {0} items".format(howmany)) | |
def _do_warmup(self, howmany, timeout_in_seconds=1800): | |
# max_time is in micro seconds | |
self._insert_data(howmany) | |
curr_items = int(self.onenodemc.stats()["curr_items"]) | |
uptime = int(self.onenodemc.stats()["uptime"]) | |
RebalanceHelper.wait_for_stats(self.master, "default", 'ep_queue_size', 0) | |
RebalanceHelper.wait_for_stats(self.master, "default", 'ep_flusher_todo', 0) | |
self.log.info(curr_items) | |
self.log.info("sleeping for 10 seconds") | |
time.sleep(10) | |
rest = RestConnection(self.master) | |
command = "[erlang:exit(element(2, X), kill) || X <- supervisor:which_children(ns_port_sup)]." | |
memcached_restarted = rest.diag_eval(command) | |
self.assertTrue(memcached_restarted, "unable to restart memcached/moxi process through diag/eval") | |
#wait until memcached starts | |
start = time.time() | |
memcached_restarted = False | |
while time.time() - start < 60: | |
try: | |
self.onenodemc = MemcachedClientHelper.direct_client(self.master, "default") | |
value = int(self.onenodemc.stats()["uptime"]) | |
if value < uptime: | |
self.log.info("memcached restarted...") | |
memcached_restarted = True | |
break | |
self.onenodemc.close() | |
except Exception: | |
time.sleep(1) | |
self.assertTrue(memcached_restarted, "memcached restarted and uptime is now reset") | |
# Warmup till curr_items match | |
self.onenodemc = MemcachedClientHelper.direct_client(self.master, "default") | |
stats = self.onenodemc.stats() | |
present_count = int(stats["curr_items"]) | |
self.log.info("ep curr_items : {0}, inserted_items {1}".format(present_count, curr_items)) | |
start = time.time() | |
while present_count < curr_items: | |
if (time.time() - start) <= timeout_in_seconds: | |
stats = self.onenodemc.stats() | |
present_count = int(stats["curr_items"]) | |
self.log.info("curr_items : {0}".format(present_count)) | |
time.sleep(1) | |
else: | |
self.fail("Timed out waiting for warmup") | |
self.log.info("ep curr_items : {0}, inserted_items {1}".format(present_count, curr_items)) | |
stats = self.onenodemc.stats() | |
warmup_time = int(stats["ep_warmup_time"]) | |
self.log.info("ep_warmup_time is {0}".format(warmup_time)) | |
def do_warmup_2(self): | |
self._do_warmup(2) | |
def do_warmup_10k(self): | |
self._do_warmup(10000) | |
def do_warmup_100k(self): | |
self._do_warmup(100000) | |
def do_warmup_1M(self): | |
self._do_warmup(1000000) | |
class MultiGetNegativeTest(unittest.TestCase): | |
def setUp(self): | |
self.log = logger.Logger.get_logger() | |
self.params = TestInputSingleton.input.test_params | |
self.master = TestInputSingleton.input.servers[0] | |
rest = RestConnection(self.master) | |
rest.init_cluster(self.master.rest_username, self.master.rest_password) | |
info = rest.get_nodes_self() | |
rest.init_cluster_memoryQuota(self.master.rest_username, self.master.rest_password, | |
memoryQuota=info.mcdMemoryReserved) | |
ClusterOperationHelper.cleanup_cluster([self.master]) | |
ClusterOperationHelper.wait_for_ns_servers_or_assert([self.master], self) | |
self._create_default_bucket() | |
self.keys_cleanup = [] | |
self.onenodemc = MemcachedClientHelper.direct_client(self.master, "default", timeout=600) | |
self.onenodemoxi = MemcachedClientHelper.proxy_client(self.master, "default", timeout=600) | |
def tearDown(self): | |
if self.onenodemc: | |
#delete the keys | |
for key in self.keys_cleanup: | |
try: | |
self.onenodemc.delete(key) | |
except Exception: | |
pass | |
def _create_default_bucket(self): | |
name = "default" | |
master = self.master | |
rest = RestConnection(master) | |
helper = RestHelper(RestConnection(master)) | |
if not helper.bucket_exists(name): | |
node_ram_ratio = BucketOperationHelper.base_bucket_ratio(TestInputSingleton.input.servers) | |
info = rest.get_nodes_self() | |
available_ram = info.memoryQuota * node_ram_ratio | |
rest.create_bucket(bucket=name, ramQuotaMB=int(available_ram)) | |
ready = BucketOperationHelper.wait_for_memcached(master, name) | |
self.assertTrue(ready, msg="wait_for_memcached failed") | |
self.assertTrue(helper.bucket_exists(name), | |
msg="unable to create {0} bucket".format(name)) | |
def _insert_data(self, client, howmany): | |
prefix = str(uuid.uuid4()) | |
keys = ["{0}-{1}".format(prefix, i) for i in range(0, howmany)] | |
value = MemcachedClientHelper.create_value("*", 1024) | |
for key in keys: | |
client.set(key, 0, 0, value) | |
self.log.info("inserted {0} items".format(howmany)) | |
return keys | |
def test_mc_multi_get(self): | |
self._test_multi_get(self.onenodemc, 10) | |
self._test_multi_get(self.onenodemc, 100) | |
self._test_multi_get(self.onenodemc, 1000) | |
self._test_multi_get(self.onenodemc, 10 * 1000) | |
self._test_multi_get(self.onenodemc, 20 * 1000) | |
self._test_multi_get(self.onenodemc, 30 * 1000) | |
self._test_multi_get(self.onenodemc, 40 * 1000) | |
def test_mx_multi_get(self): | |
self._test_multi_get(self.onenodemoxi, 10) | |
self._test_multi_get(self.onenodemoxi, 100) | |
self._test_multi_get(self.onenodemoxi, 1000) | |
self._test_multi_get(self.onenodemoxi, 10 * 1000) | |
self._test_multi_get(self.onenodemoxi, 20 * 1000) | |
self._test_multi_get(self.onenodemoxi, 30 * 1000) | |
self._test_multi_get(self.onenodemoxi, 40 * 1000) | |
def _test_multi_get(self, client, howmany): | |
mc_message = "memcached virt memory size {0} : resident memory size : {1}" | |
mx_message = "moxi virt memory size {0} : resident memory size : {1}" | |
shell = RemoteMachineShellConnection(self.master) | |
moxi_pid = RemoteMachineHelper(shell).is_process_running("moxi").pid | |
mc_pid = RemoteMachineHelper(shell).is_process_running("memcached").pid | |
keys = self._insert_data(client, howmany) | |
self.keys_cleanup.extend(keys) | |
self.log.info("printing moxi and memcached stats before running multi-get") | |
moxi_sys_stats = self._extract_proc_info(shell, moxi_pid) | |
memcached_sys_stats = self._extract_proc_info(shell, mc_pid) | |
self.log.info(mc_message.format(int(memcached_sys_stats["rss"]) * 4096, | |
memcached_sys_stats["vsize"])) | |
self.log.info(mx_message.format(int(moxi_sys_stats["rss"]) * 4096, | |
moxi_sys_stats["vsize"])) | |
self.log.info("running multiget to get {0} keys".format(howmany)) | |
gets = client.getMulti(keys) | |
self.log.info("recieved {0} keys".format(len(gets))) | |
self.log.info(gets) | |
self.assertEquals(len(gets), len(keys)) | |
self.log.info("printing moxi and memcached stats after running multi-get") | |
moxi_sys_stats = self._extract_proc_info(shell, moxi_pid) | |
memcached_sys_stats = self._extract_proc_info(shell, mc_pid) | |
self.log.info(mc_message.format(int(memcached_sys_stats["rss"]) * 4096, | |
memcached_sys_stats["vsize"])) | |
self.log.info(mx_message.format(int(moxi_sys_stats["rss"]) * 4096, | |
moxi_sys_stats["vsize"])) | |
def _extract_proc_info(self, shell, pid): | |
o, r = shell.execute_command("cat /proc/{0}/stat".format(pid)) | |
fields = ('pid comm state ppid pgrp session tty_nr tpgid flags minflt ' | |
'cminflt majflt cmajflt utime stime cutime cstime priority ' | |
'nice num_threads itrealvalue starttime vsize rss rsslim ' | |
'startcode endcode startstack kstkesp kstkeip signal blocked ' | |
'sigignore sigcatch wchan nswap cnswap exit_signal ' | |
'processor rt_priority policy delayacct_blkio_ticks ' | |
'guest_time cguest_time ').split(' ') | |
d = dict(zip(fields, o[0].split(' '))) | |
return d | |
class MemcachedValueSizeLimitTest(unittest.TestCase): | |
def setUp(self): | |
self.log = logger.Logger.get_logger() | |
self.params = TestInputSingleton.input.test_params | |
self.master = TestInputSingleton.input.servers[0] | |
rest = RestConnection(self.master) | |
rest.init_cluster(self.master.rest_username, self.master.rest_password) | |
info = rest.get_nodes_self() | |
rest.init_cluster_memoryQuota(self.master.rest_username, self.master.rest_password, | |
memoryQuota=info.mcdMemoryReserved) | |
ClusterOperationHelper.cleanup_cluster([self.master]) | |
ClusterOperationHelper.wait_for_ns_servers_or_assert([self.master], self) | |
self._create_default_bucket() | |
self.keys_cleanup = [] | |
self.onenodemc = MemcachedClientHelper.direct_client(self.master, "default") | |
def tearDown(self): | |
if self.onenodemc: | |
#delete the keys | |
for key in self.keys_cleanup: | |
try: | |
self.onenodemc.delete(key) | |
except Exception: | |
pass | |
def _create_default_bucket(self): | |
name = "default" | |
master = self.master | |
rest = RestConnection(master) | |
helper = RestHelper(RestConnection(master)) | |
if not helper.bucket_exists(name): | |
node_ram_ratio = BucketOperationHelper.base_bucket_ratio(TestInputSingleton.input.servers) | |
info = rest.get_nodes_self() | |
available_ram = info.memoryQuota * node_ram_ratio | |
rest.create_bucket(bucket=name, ramQuotaMB=int(available_ram)) | |
ready = BucketOperationHelper.wait_for_memcached(master, name) | |
self.assertTrue(ready, msg="wait_for_memcached failed") | |
self.assertTrue(helper.bucket_exists(name), | |
msg="unable to create {0} bucket".format(name)) | |
def test_append_till_20_mb(self): | |
value = MemcachedClientHelper.create_value("*", 1024 * 1024) | |
key = str(uuid.uuid4()) | |
self.keys_cleanup.append(key) | |
self.onenodemc.set(key, 0, 0, value) | |
a,b,c = self.onenodemc.get(key) | |
print "key has %s characters" % (len(c)) | |
#create 10 byte value | |
value = '1234567890' | |
for i in range(0, (100*1024 - 1)): | |
if i % 10 == 0 : | |
stats = self.onenodemc.stats('allocator') | |
lines = stats['detailed'].split('\n') | |
print "%s th append" % (i) | |
for line in lines: | |
if line.find("MALLOC:") >= 0: | |
print line | |
self.onenodemc.append(key, value) | |
try: | |
self.onenodemc.append(key, value) | |
a,b,c = self.onenodemc.get(key) | |
print "key has %s characters" % (len(c)) | |
self.fail("memcached did not raise an error") | |
except mc_bin_client.MemcachedError as err: | |
self.assertEquals(err.status, 5) | |
def test_prepend_till_20_mb(self): | |
initial_value = "12345678" | |
key = str(uuid.uuid4()) | |
self.keys_cleanup.append(key) | |
self.onenodemc.set(key, 0, 0, initial_value) | |
#for 20 * 1024 times append 1024 chars each time | |
value = MemcachedClientHelper.create_value("*", 1024 * 20) | |
for i in range(0, (1024 - 1)): | |
self.onenodemc.prepend(key, value) | |
try: | |
self.onenodemc.prepend(key, value) | |
self.fail("memcached did not raise an error") | |
except mc_bin_client.MemcachedError as err: | |
self.assertEquals(err.status, 5) | |
# def test_incr_till_max(self): | |
# initial_value = '0' | |
# max_value = pow(2, 64) | |
# step = max_value / 1024 | |
# self.log.info("step : {0}") | |
# key = str(uuid.uuid4()) | |
# self.keys_cleanup.append(key) | |
# self.onenodemc.set(key, 0, 0, initial_value) | |
# for i in range(0, (2 * 1024 - 1)): | |
# self.onenodemc.incr(key, amt=step) | |
# a, b, c = self.onenodemc.get(key) | |
# delta = long(c) - max_value | |
# self.log.info("delta = value - pow(2,64) = {0}".format(delta)) | |
# a, b, c = self.onenodemc.get(key) | |
# self.log.info("key : {0} value {1}".format(key, c)) | |
# try: | |
# self.onenodemc.incr(key, step) | |
# self.fail("memcached did not raise an error") | |
# except mc_bin_client.MemcachedError as err: | |
# self.assertEquals(err.status, 5) | |
# | |
# def test_decr_till_max(self): | |
# initial_value = '1' | |
# max_value = pow(2, 64) | |
# step = max_value / 1024 | |
# key = str(uuid.uuid4()) | |
# self.keys_cleanup.append(key) | |
# self.onenodemc.set(key, 0, 0, initial_value) | |
# for i in range(0, (1024 - 1)): | |
# self.onenodemc.decr(key, amt=step) | |
# a, b, c = self.onenodemc.get(key) | |
# self.log.info("key : {0} value {1}".format(key, c)) | |
# try: | |
# self.onenodemc.incr(key, step) | |
# self.fail("memcached did not raise an error") | |
# except mc_bin_client.MemcachedError as err: | |
# self.assertEquals(err.status, 5) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment