Skip to content

Instantly share code, notes, and snippets.

@farshidce
Created March 6, 2013 23:33
Show Gist options
  • Save farshidce/5104239 to your computer and use it in GitHub Desktop.
Save farshidce/5104239 to your computer and use it in GitHub Desktop.
import os
import random
from threading import Thread
import unittest
from TestInput import TestInputSingleton
import mc_bin_client
import time
import uuid
import logger
import datetime
from membase.api.rest_client import RestConnection, RestHelper
from membase.helper.rebalance_helper import RebalanceHelper
from membase.helper.bucket_helper import BucketOperationHelper
from membase.helper.cluster_helper import ClusterOperationHelper
from memcached.helper.data_helper import MemcachedClientHelper, VBucketAwareMemcached
from remote.remote_util import RemoteMachineShellConnection, RemoteMachineHelper
from results_helper import ResultsHelper
class MemcapableTestBase(object):
log = None
keys = None
servers = None
input = None
test = None
bucket_port = None
bucket_name = None
def setUp_bucket(self, bucket_name, port, bucket_type, unittest):
self.log = logger.Logger.get_logger()
self.input = TestInputSingleton.input
unittest.assertTrue(self.input, msg="input parameters missing...")
self.test = unittest
self.master = self.input.servers[0]
self.bucket_port = port
self.bucket_name = bucket_name
ClusterOperationHelper.cleanup_cluster([self.master])
BucketOperationHelper.delete_all_buckets_or_assert([self.master], self.test)
self._create_default_bucket(unittest)
def _create_default_bucket(self, unittest):
name = "default"
master = self.master
rest = RestConnection(master)
helper = RestHelper(RestConnection(master))
if not helper.bucket_exists(name):
node_ram_ratio = BucketOperationHelper.base_bucket_ratio(TestInputSingleton.input.servers)
info = rest.get_nodes_self()
available_ram = info.memoryQuota * node_ram_ratio
rest.create_bucket(bucket=name, ramQuotaMB=int(available_ram))
ready = BucketOperationHelper.wait_for_memcached(master, name)
BucketOperationHelper.wait_for_vbuckets_ready_state(master, name)
unittest.assertTrue(ready, msg="wait_for_memcached failed")
unittest.assertTrue(helper.bucket_exists(name),
msg="unable to create {0} bucket".format(name))
def set_test(self, key, exp, flags, values):
serverInfo = self.master
client = MemcachedClientHelper.proxy_client(serverInfo, self.bucket_name,)
# self.log.info('Waitting 15 seconds for memcached started')
# time.sleep(15)
for v in values:
for f in flags:
client.set(key, exp, f, v)
flags_v, cas_v, get_v = client.get(key)
if get_v == v:
if flags_v == f:
self.log.info('Flags is set to {0}; and when run get {1}'.format(f, flags_v))
else:
self.test.fail('FAILED. Flags is set to {0}; and when run get {1}'.format(f, flags_v))
self.log.info('Value is set {0}; and when run get {1}'.format(v, get_v))
else:
self.test.fail('FAILED. Value is set to {0}; and when run get {1}'.format(v, get_v))
# Instead of checking the value before incrementing,
# you can simply ADD it instead before incrementing each time.
# If it's already there, your ADD is ignored, and if it's not there, it's set.
def incr_test(self, key, exp, flags, value, incr_amt, decr_amt, incr_time):
global update_value
serverInfo = self.master
client = MemcachedClientHelper.proxy_client(serverInfo, self.bucket_name)
# self.log.info('Waitting 15 seconds for memcached started')
# time.sleep(15)
if key != 'no_key':
client.set(key, exp, flags, value)
if exp:
self.log.info('Wait {0} seconds for the key expired' .format(exp + 2))
time.sleep(exp + 2)
if decr_amt:
c, d = client.decr(key, decr_amt)
self.log.info('decr amt {0}' .format(c))
try:
i = 0
while i < incr_time:
update_value, cas = client.incr(key, incr_amt)
i += 1
self.log.info('incr {0} times with value {1}'.format(incr_time, incr_amt))
return update_value
except mc_bin_client.MemcachedError as error:
self.log.info('memcachedError : {0}'.format(error.status))
self.test.fail("unable to increment value: {0}".format(incr_amt))
def decr_test(self, key, exp, flags, value, incr_amt, decr_amt, decr_time):
global update_value
serverInfo = self.master
client = MemcachedClientHelper.proxy_client(serverInfo, self.bucket_name)
if key != 'no_key':
client.set(key, exp, flags, value)
if exp:
self.log.info('Wait {0} seconds for the key expired' .format(exp + 2))
time.sleep(exp + 2)
if incr_amt:
c, d = client.incr(key, incr_amt)
self.log.info('incr amt {0}' .format(c))
i = 0
while i < decr_time:
update_value, cas = client.decr(key, decr_amt)
i += 1
self.log.info('decr {0} times with value {1}'.format(decr_time, decr_amt))
return update_value
class SimpleSetMembaseBucketDefaultPort(unittest.TestCase):
memcapableTestBase = None
log = logger.Logger.get_logger()
def setUp(self):
self.memcapableTestBase = MemcapableTestBase()
self.memcapableTestBase.setUp_bucket('default', 11211, 'membase', self)
def test_set_pos_int_value_pos_flag_key_never_expired(self):
key_test = 'has_key'
valuesList = ['0', '000', '4', '678', '6560987', '32456754', '0000000000', '00001000']
exp_time = 0
flagsList = [0, 0000, 00001, 34532, 453456, 0001000, 1100111100, 4294967295]
self.memcapableTestBase.set_test(key_test, exp_time, flagsList, valuesList)
def test_set_neg_int_value_pos_flag_key_never_expired(self):
key_test = 'has_key'
valuesList = ['-0', '-000', '-4', '-678', '-6560987', '-32456754', '-0000000000', '-00001000']
exp_time = 0
flagsList = [0, 0000, 00001, 34532, 453456, 0001000, 1100111100, 4294967295]
self.memcapableTestBase.set_test(key_test, exp_time, flagsList, valuesList)
def test_set_pos_float_value_pos_flag_key_never_expired(self):
key_test = 'has_key'
valuesList = ['0.00', '000.0', '4.6545', '678.87967', '6560987.0', '32456754.090987', '0000000000.0000001',
'00001000.008']
exp_time = 0
flagsList = [0, 0000, 00001, 34532, 453456, 0001000, 1100111100, 4294967295]
self.memcapableTestBase.set_test(key_test, exp_time, flagsList, valuesList)
def test_set_neg_float_value_pos_flag_key_never_expired(self):
key_test = 'has_key'
valuesList = ['-0.00', '-000.0', '-4.6545', '-678.87967', '-6560987.0', '-32456754.090987',
'-0000000000.0000001', '-00001000.008']
exp_time = 0
flagsList = [0, 0000, 00001, 34532, 453456, 0001000, 1100111100, 4294967295]
self.memcapableTestBase.set_test(key_test, exp_time, flagsList, valuesList)
class SimpleIncrMembaseBucketDefaultPort(unittest.TestCase):
memcapableTestBase = None
log = logger.Logger.get_logger()
def setUp(self):
self.memcapableTestBase = MemcapableTestBase()
self.memcapableTestBase.setUp_bucket('default', 11211, 'membase', self)
def test_incr_an_exist_key_never_exp(self):
key_test = 'has_key'
value = '10'
decr_amt = 0
incr_amt = 5
incr_time = 10
update_v = self.memcapableTestBase.incr_test(key_test, 0, 0, value, incr_amt, decr_amt, incr_time)
if update_v == (int(value) + incr_amt * incr_time):
self.log.info('Value update correctly. Expected value {0}. Tested value {1}'\
.format((int(value) + incr_amt * incr_time), update_v))
else:
self.fail("FAILED test_incr_an_exist_key_never_exp. Original value %s. \
Expected value %d" % (value, int(value) + incr_amt * incr_time))
def test_incr_non_exist_key(self):
key_test = 'no_key'
value = '10'
decr_amt = 0
incr_amt = 5
incr_time = 10
update_v = self.memcapableTestBase.incr_test(key_test, 0, 0, value, incr_amt, decr_amt, incr_time)
if update_v == incr_amt * (incr_time - 1):
self.log.info('Value update correctly. Expected value {0}. Tested value {1}'\
.format(incr_amt * (incr_time - 1), update_v))
else:
self.fail("FAILED test_incr_non_exist_key")
def test_incr_with_exist_key_and_expired(self):
key_test = 'expire_key'
value = '10'
exp_time = 5
decr_amt = 0
incr_amt = 5
incr_time = 10
update_v = self.memcapableTestBase.incr_test(key_test, exp_time, 0, value, incr_amt, decr_amt, incr_time)
if update_v == incr_amt * (incr_time - 1):
self.log.info('Value update correctly. Expected value {0}. Tested value {1}'\
.format(incr_amt * (incr_time - 1), update_v))
else:
self.fail("FAILED test_incr_with_exist_key_and_expired")
def test_incr_with_exist_key_decr_then_incr_never_expired(self):
key_test = 'has_key'
value = '101'
exp_time = 0
decr_amt = 10
incr_amt = 5
incr_time = 10
update_v = self.memcapableTestBase.incr_test(key_test, exp_time, 0, value, incr_amt, decr_amt, incr_time)
if update_v == (int(value) - decr_amt + incr_amt * incr_time):
self.log.info('Value update correctly. Expected value {0}. Tested value {1}'\
.format(int(value) - decr_amt + incr_amt * incr_time, update_v))
else:
self.fail("FAILED test_incr_with_exist_key_and_expired")
## this test will fail as expected
# def test_incr_with_non_int_key(self):
# key_test = 'has_key'
# value = 'abcd'
# exp_time = 0
# decr_amt = 0
# incr_amt = 5
# incr_time = 10
# self.assertRaises(self.memcapableTestBase.incr_test(key_test, exp_time, 0, value, incr_amt, decr_amt, incr_time))
## self.assertRaises('Expected FAILED. Can not incr with string value')
class GetlTests(unittest.TestCase):
memcapableTestBase = None
log = logger.Logger.get_logger()
def setUp(self):
self.memcapableTestBase = MemcapableTestBase()
self.memcapableTestBase.setUp_bucket('default', 11211, 'membase', self)
#set an item for 5 seconds
#getl for 15 seconds and verify that setting the item again
#throes Data exists
def _getl_body(self, prefix, getl_timeout, expiration):
node = self.memcapableTestBase.master
mc = MemcachedClientHelper.direct_client(node, "default")
key = "{0}_{1}".format(prefix, str(uuid.uuid4()))
self.log.info("setting key {0} with expiration {1}".format(key, expiration))
mc.set(key, expiration, 0, key)
self.log.info("getl key {0} timeout {1}".format(key, getl_timeout))
try:
mc.getl(key, getl_timeout)
except Exception as ex:
if getl_timeout < 0:
print ex
else:
raise
self.log.info("get key {0} which is locked now".format(key))
flags_v, cas_v, get_v = mc.get(key)
self.assertEquals(get_v, key)
i = 0
while i < 40:
self.log.info("setting key {0} with new value {1}".format(key, '*'))
try:
mc.set(key, 0, 0, '*')
break
except Exception as ex:
print ex
time.sleep(1)
print i
i += 1
if getl_timeout > 30:
self.log.info("sleep for {0} seconds".format(30))
time.sleep(30)
elif getl_timeout > 0:
self.log.info("sleep for {0} seconds".format(15 - getl_timeout))
self.log.info("sleep for {0} seconds".format(15))
time.sleep(getl_timeout)
else:
self.log.info("sleep for {0} seconds".format(15))
time.sleep(15)
self.log.info("lock should have timed out by now . try to set the item again")
new_value = "*"
self.log.info("setting key {0} with new value {1}".format(key, new_value))
mc.set(key, 0, 0, new_value)
self.log.info("get key {0}".format(key))
flags_v, cas_v, get_v = mc.get(key)
self.assertEquals(get_v, "*")
def test_getl_minus_one(self):
self._getl_body("getl_-1", -1, 0)
def test_getl_zero(self):
self._getl_body("getl_0", 0, 0)
def test_getl_five(self):
self._getl_body("getl_5", 15, 0)
def test_getl_ten(self):
self._getl_body("getl_10", 10, 0)
def test_getl_fifteen(self):
self._getl_body("getl_15", 15, 0)
def test_getl_thirty(self):
self._getl_body("getl_30", 30, 0)
def test_getl_sixty(self):
self._getl_body("getl_60", 60, 0)
def test_getl_expired_item(self):
prefix = "getl_expired_item"
expiration = 5
getl_timeout = 15
node = self.memcapableTestBase.master
mc = MemcachedClientHelper.direct_client(node, "default")
key = "{0}_{1}".format(prefix, str(uuid.uuid4()))
self.log.info("setting key {0} with expiration {1}".format(key, expiration))
mc.set(key, expiration, 0, key)
self.log.info("getl key {0} timeout {1}".format(key, getl_timeout))
mc.getl(key, getl_timeout)
self.log.info("get key {0} which is locked now".format(key))
flags_v, cas_v, get_v = mc.get(key)
self.assertEquals(get_v, key)
if getl_timeout > 30:
self.log.info("sleep for {0} seconds".format(30))
time.sleep(30)
elif getl_timeout > 0:
self.log.info("sleep for {0} seconds".format(getl_timeout))
time.sleep(getl_timeout)
else:
self.log.info("sleep for {0} seconds".format(15))
time.sleep(15)
self.log.info("get key {0} which was locked when it expired. should fail".format(key))
try:
mc.get(key)
self.fail("get {0} should have raised not_found error".format(key))
except mc_bin_client.MemcachedError as error:
self.log.info("raised exception as expected : {0}".format(error))
self.log.info("item expired and lock should have timed out by now . try to set the item again")
new_value = "*"
self.log.info("setting key {0} with new value {1}".format(key, new_value))
mc.set(key, 0, 0, new_value)
self.log.info("get key {0}".format(key))
flags_v, cas_v, get_v = mc.get(key)
self.assertEquals(get_v, "*")
class GetrTests(unittest.TestCase):
memcapableTestBase = None
log = logger.Logger.get_logger()
NO_REBALANCE = 0
DURING_REBALANCE = 1
AFTER_REBALANCE = 2
def setUp(self):
self.servers = TestInputSingleton.input.servers
self.input = TestInputSingleton.input
self.master = self.servers[0]
self.bucket_name = "default"
self.memcapableTestBase = MemcapableTestBase()
self.rest = RestConnection(self.master)
ClusterOperationHelper.cleanup_cluster([self.master])
BucketOperationHelper.delete_all_buckets_or_assert([self.master], self)
def tearDown(self):
BucketOperationHelper.delete_all_buckets_or_assert([self.master], self)
ClusterOperationHelper.cleanup_cluster([self.master])
def test_getr_1(self):
# GETR_TEST_1, 1 replica, all items retrieved
self._getr_body(item_count=10000,
replica_count=1,
expiration=0,
delay=0,
eject=False,
delete=False,
mutate=False,
warmup=False,
skipload=False,
rebalance=GetrTests.NO_REBALANCE)
def test_getr_2(self):
# GETR_TEST_2, 2 replica, all items retrieved
self._getr_body(item_count=10000,
replica_count=2,
expiration=0,
delay=0,
eject=False,
delete=False,
mutate=False,
warmup=False,
skipload=False,
rebalance=GetrTests.NO_REBALANCE)
def test_getr_3(self):
# GETR_TEST_3, 3 replica, all items retrieved
self._getr_body(item_count=10000,
replica_count=3,
expiration=0,
delay=0,
eject=False,
delete=False,
mutate=False,
warmup=False,
skipload=False,
rebalance=GetrTests.NO_REBALANCE)
def test_getr_4(self):
# GETR_TEST_4, 1 replica eject items, all items retrieved
self._getr_body(item_count=10000,
replica_count=1,
expiration=0,
delay=0,
eject=True,
delete=False,
mutate=False,
warmup=False,
skipload=False,
rebalance=GetrTests.NO_REBALANCE)
def test_getr_5(self):
# GETR_TEST_5, 1 replica expiration, all items retrieved
self._getr_body(item_count=10000,
replica_count=1,
expiration=300,
delay=0,
eject=False,
delete=False,
mutate=False,
warmup=False,
skipload=False,
rebalance=GetrTests.NO_REBALANCE)
def test_getr_6(self):
# GETR_TEST_6, 1 replica during rebalance in, all items retrieved
self._getr_body(item_count=10000,
replica_count=1,
expiration=0,
delay=0,
eject=False,
delete=False,
mutate=False,
warmup=False,
skipload=False,
rebalance=GetrTests.DURING_REBALANCE)
def test_getr_7(self):
# GETR_TEST_7, 1 replica after rebalance in, all items retrieved
self._getr_body(item_count=10000,
replica_count=1,
expiration=0,
delay=0,
eject=False,
delete=False,
mutate=False,
warmup=False,
skipload=False,
rebalance=GetrTests.AFTER_REBALANCE)
def test_getr_8(self):
# GETR_TEST_8, 1 replica warmup, all items retrieved
self._getr_body(item_count=10000,
replica_count=1,
expiration=0,
delay=0,
eject=False,
delete=False,
mutate=False,
warmup=True,
skipload=False,
rebalance=GetrTests.NO_REBALANCE)
def test_getr_9(self):
# GETR_TEST_9, 1 replica mutates, all items retrieved
self._getr_body(item_count=10000,
replica_count=1,
expiration=0,
delay=0,
eject=False,
delete=False,
mutate=True,
warmup=False,
skipload=False,
rebalance=GetrTests.NO_REBALANCE)
def test_getr_10(self):
# GETR_TEST_10, 1 replica deletes, all items retrieved
self._getr_body(item_count=10000,
replica_count=1,
expiration=0,
delay=0,
eject=False,
delete=True,
mutate=False,
warmup=False,
skipload=False,
rebalance=GetrTests.NO_REBALANCE)
def test_getr_1n(self):
# GETR_TESTN_1, non-existing items, ERR_NOT_FOUND
self._getr_body(item_count=10000,
replica_count=1,
expiration=0,
delay=0,
eject=False,
delete=False,
mutate=False,
warmup=False,
skipload=True,
rebalance=GetrTests.NO_REBALANCE)
def test_getr_2n(self):
# GETR_TESTN_2, 1 replica expired, ERR_NOT_FOUND
self._getr_body(item_count=10000,
replica_count=1,
expiration=15,
delay=35,
eject=False,
delete=False,
mutate=False,
warmup=False,
skipload=False,
rebalance=GetrTests.NO_REBALANCE)
def test_getr_3n(self):
# GETR_TESTN_3, 1 replica expired rebalance in, ERR_NOT_FOUND
self._getr_body(item_count=10000,
replica_count=1,
expiration=15,
delay=35,
eject=False,
delete=False,
mutate=False,
warmup=False,
skipload=False,
rebalance=GetrTests.AFTER_REBALANCE)
def test_getr_4n(self):
# GETR_TESTN_4, 0 replica, ENGINE_NOT_MY_VBUCKET
self._getr_body(item_count=10000,
replica_count=0,
expiration=0,
delay=0,
eject=False,
delete=False,
mutate=False,
warmup=False,
skipload=False,
rebalance=GetrTests.NO_REBALANCE)
def test_getr_5n(self):
# GETR_TESTN_5, 1 replica delete, ERR_NOT_FOUND
self._getr_body(item_count=10000,
replica_count=1,
expiration=0,
delay=0,
eject=False,
delete=True,
mutate=False,
warmup=False,
skipload=False,
rebalance=GetrTests.NO_REBALANCE)
def test_getr(self):
self._getr_body(item_count=self.input.param("item_count", 10000),
replica_count=self.input.param("replica_count", 1),
expiration=self.input.param("expiration", 0),
delay=self.input.param("delay", 0),
eject=self.input.param("eject", 0),
delete=self.input.param("delete", 0),
mutate=self.input.param("mutate", 0),
warmup=self.input.param("warmup", 0),
skipload=self.input.param("skipload", 0),
rebalance=self.input.param("rebalance", 0))
def _getr_body(self, item_count, replica_count, expiration, delay, eject, delete, mutate, warmup, skipload, rebalance):
negative_test = False
if delay > expiration:
negative_test = True
if delete and not mutate:
negative_test = True
if skipload and not mutate:
negative_test = True
prefix = str(uuid.uuid4())[:7]
BucketOperationHelper.delete_all_buckets_or_assert([self.master], self)
BucketOperationHelper.create_bucket(self.master, name=self.bucket_name, replica=replica_count, port=11210, test_case=self, bucket_ram= -1, password="")
if rebalance == GetrTests.DURING_REBALANCE or rebalance == GetrTests.AFTER_REBALANCE:
# leave 1 node unclustered for rebalance in
ClusterOperationHelper.begin_rebalance_out(self.master, self.servers[-1:])
ClusterOperationHelper.end_rebalance(self.master)
ClusterOperationHelper.begin_rebalance_in(self.master, self.servers[:-1])
ClusterOperationHelper.end_rebalance(self.master)
else:
ClusterOperationHelper.begin_rebalance_in(self.master, self.servers)
ClusterOperationHelper.end_rebalance(self.master)
vprefix = ""
if not skipload:
self._load_items(item_count=item_count, expiration=expiration, prefix=prefix, vprefix=vprefix)
if not expiration:
RebalanceHelper.wait_for_stats_int_value(self.master, self.bucket_name, "curr_items_tot", item_count * (replica_count + 1), "<=", 600, True)
if delete:
self._delete_items(item_count=item_count, prefix=prefix)
if mutate:
vprefix = "mutated"
self._load_items(item_count=item_count, expiration=expiration, prefix=prefix, vprefix=vprefix)
self.assertTrue(RebalanceHelper.wait_for_replication(self.rest.get_nodes(), timeout=180),
msg="replication did not complete")
if eject:
self._eject_items(item_count=item_count, prefix=prefix)
if delay:
self.log.info("delaying for {0} seconds".format(delay))
time.sleep(delay)
if rebalance == GetrTests.DURING_REBALANCE:
ClusterOperationHelper.begin_rebalance_in(self.master, self.servers)
if rebalance == GetrTests.AFTER_REBALANCE:
ClusterOperationHelper.end_rebalance(self.master)
if warmup:
self.log.info("restarting memcached")
command = "[rpc:multicall(ns_port_sup, restart_port_by_name, [memcached], 20000)]."
memcached_restarted = self.rest.diag_eval(command)
#wait until memcached starts
self.assertTrue(memcached_restarted, "unable to restart memcached process through diag/eval")
RebalanceHelper.wait_for_stats(self.master, self.bucket_name, "curr_items_tot", item_count * (replica_count + 1), 600)
count = self._getr_items(item_count=item_count, replica_count=replica_count, prefix=prefix, vprefix=vprefix)
if negative_test:
self.assertTrue(count == 0, "found items, expected none")
else:
self.assertTrue(count == replica_count * item_count, "expected {0} items, got {1} items".format(replica_count * item_count, count))
if rebalance == GetrTests.DURING_REBALANCE:
ClusterOperationHelper.end_rebalance(self.master)
def _load_items(self, item_count, expiration, prefix, vprefix=""):
flags = 0
client = MemcachedClientHelper.proxy_client(self.master, self.bucket_name)
time_start = time.time()
for i in range(item_count):
timeout_end = time.time() + 10
passed = False
while time.time() < timeout_end and not passed:
try:
client.set(prefix + "_key_" + str(i), expiration, flags, vprefix + "_value_" + str(i))
passed = True
except Exception as e:
self.log.error("failed to set key {0}, error: {1}".format(prefix + "_key_" + str(i), e))
time.sleep(2)
self.assertTrue(passed, "exit due to set errors after {0} seconds".format(time.time() - time_start))
self.log.info("loaded {0} items in {1} seconds".format(item_count, time.time() - time_start))
def _get_items(self, item_count, prefix, vprefix=""):
client = MemcachedClientHelper.proxy_client(self.master, self.bucket_name)
time_start = time.time()
get_count = 0
last_error = ""
error_count = 0
for i in range(item_count):
try:
value = client.get(prefix + "_key_" + str(i))[2]
assert(value == vprefix + "_value_" + str(i))
get_count += 1
except Exception as e:
last_error = "failed to getr key {0}, error: {1}".format(prefix + "_key_" + str(i), e)
error_count += 1
if error_count > 0:
self.log.error("got {0} errors, last error: {1}".format(error_count, last_error))
self.log.info("got {0} replica items in {1} seconds".format(get_count, time.time() - time_start))
return get_count
def _getr_items(self, item_count, replica_count, prefix, vprefix=""):
time_start = time.time()
get_count = 0
last_error = ""
error_count = 0
awareness = VBucketAwareMemcached(self.rest, self.bucket_name)
for r in range(replica_count):
for i in range(item_count):
retry = True
key = prefix + "_key_" + str(i)
while retry:
client = awareness.memcached(key, r)
try:
value = client.getr(prefix + "_key_" + str(i))[2]
assert(value == vprefix + "_value_" + str(i))
get_count += 1
retry = False
except mc_bin_client.MemcachedError as e:
last_error = "failed to getr key {0}, error: {1}".format(prefix + "_key_" + str(i), e)
error_count += 1
if e.status == 7:
self.log.info("getting new vbucket map {0}")
awareness.reset(self.rest)
else:
retry = False
except Exception as e:
last_error = "failed to getr key {0}, error: {1}".format(prefix + "_key_" + str(i), e)
error_count += 1
retry = False
if error_count > 0:
self.log.error("got {0} errors, last error: {1}".format(error_count, last_error))
self.log.info("got {0} replica items in {1} seconds".format(get_count, time.time() - time_start))
awareness.done()
return get_count
def _delete_items(self, item_count, prefix):
client = MemcachedClientHelper.proxy_client(self.master, self.bucket_name)
time_start = time.time()
for i in range(item_count):
timeout_end = time.time() + 10
passed = False
while time.time() < timeout_end and not passed:
try:
client.delete(prefix + "_key_" + str(i))
passed = True
except Exception as e:
self.log.error("failed to delete key {0}, error: {1}".format(prefix + "_key_" + str(i), e))
time.sleep(2)
self.assertTrue(passed, "exit due to delete errors after {0} seconds".format(time.time() - time_start))
self.log.info("deleted {0} items in {1} seconds".format(item_count, time.time() - time_start))
def _eject_items(self, item_count, prefix):
client = MemcachedClientHelper.proxy_client(self.master, self.bucket_name)
time_start = time.time()
for i in range(item_count):
timeout_end = time.time() + 10
passed = False
while time.time() < timeout_end and not passed:
try:
client.evict_key(prefix + "_key_" + str(i))
passed = True
except Exception as e:
self.log.error("failed to eject key {0}, error: {1}".format(prefix + "_key_" + str(i), e))
time.sleep(2)
self.assertTrue(passed, "exit due to eject errors after {0} seconds".format(time.time() - time_start))
self.log.info("ejected {0} items in {1} seconds".format(item_count, time.time() - time_start))
def _next_parameter(self, d, di):
carry = True
for k in d:
if carry:
di[k] += 1
if di[k] >= len(d[k]):
carry = True
di[k] = 0
else:
carry = False
return not carry
class SimpleDecrMembaseBucketDefaultPort(unittest.TestCase):
memcapableTestBase = None
log = logger.Logger.get_logger()
def setUp(self):
self.memcapableTestBase = MemcapableTestBase()
self.memcapableTestBase.setUp_bucket('default', 11211, 'membase', self)
def test_decr_an_exist_key_never_exp(self):
key_test = 'has_key'
value = '100'
exp_time = 0
decr_amt = 5
incr_amt = 0
decr_time = 10
update_v = self.memcapableTestBase.decr_test(key_test, exp_time, 0, value, incr_amt, decr_amt, decr_time)
if update_v == (int(value) - decr_amt * decr_time):
self.log.info('Value update correctly. Expected value {0}. Tested value {1}'\
.format((int(value) - decr_amt * decr_time), update_v))
else:
self.fail("FAILED test_decr_an_exist_key_never_exp. Original value %s. \
Expected value %d" % (value, int(value) - decr_amt * decr_time))
def test_decr_non_exist_key(self):
key_test = 'no_key'
value = '100'
exp_time = 0
decr_amt = 5
incr_amt = 0
decr_time = 10
update_v = self.memcapableTestBase.decr_test(key_test, exp_time, 0, value, incr_amt, decr_amt, decr_time)
if not update_v:
self.log.info('Value update correctly. Expected value 0. Tested value {0}'\
.format(update_v))
else:
self.fail('FAILED test_decr_non_exist_key. Expected value 0')
def test_decr_with_exist_key_and_expired(self):
key_test = 'has_key'
value = '100'
exp_time = 5
decr_amt = 5
incr_amt = 0
decr_time = 10
update_v = self.memcapableTestBase.decr_test(key_test, exp_time, 0, value, incr_amt, decr_amt, decr_time)
if not update_v:
self.log.info('Value update correctly. Expected value 0. Tested value {0}'\
.format(update_v))
else:
self.fail('FAILED test_decr_with_exist_key_and_expired. Expected value 0')
def test_decr_with_exist_key_incr_then_decr_never_expired(self):
key_test = 'has_key'
value = '100'
exp_time = 0
decr_amt = 5
incr_amt = 50
decr_time = 10
update_v = self.memcapableTestBase.decr_test(key_test, exp_time, 0, value, incr_amt, decr_amt, decr_time)
if update_v == (int(value) + incr_amt - decr_amt * decr_time):
self.log.info('Value update correctly. Expected value {0}. Tested value {1}'\
.format(int(value) + incr_amt - decr_amt * decr_time, update_v))
else:
self.fail(
"Expected value %d. Test result %d" % (int(value) + incr_amt - decr_amt * decr_time, update_v))
class StatsAggregationDuringMemcachedOps(unittest.TestCase):
def setUp(self):
self.log = logger.Logger.get_logger()
self.params = TestInputSingleton.input.test_params
self.master = TestInputSingleton.input.servers[0]
rest = RestConnection(self.master)
rest.init_cluster(self.master.rest_username, self.master.rest_password)
info = rest.get_nodes_self()
rest.init_cluster_memoryQuota(self.master.rest_username, self.master.rest_password,
memoryQuota=info.mcdMemoryReserved)
ClusterOperationHelper.cleanup_cluster([self.master])
ClusterOperationHelper.wait_for_ns_servers_or_assert([self.master], self)
self._create_default_bucket()
self.onenodemc = MemcachedClientHelper.direct_client(self.master, "default")
self.shutdown_load_data = False
if "results" in self.params:
self.results = ResultsHelper(self.params["results"])
def _create_default_bucket(self):
name = "default"
master = self.master
rest = RestConnection(master)
helper = RestHelper(RestConnection(master))
if not helper.bucket_exists(name):
node_ram_ratio = BucketOperationHelper.base_bucket_ratio(TestInputSingleton.input.servers)
info = rest.get_nodes_self()
available_ram = info.memoryQuota * node_ram_ratio
rest.create_bucket(bucket=name, ramQuotaMB=int(available_ram))
ready = BucketOperationHelper.wait_for_memcached(master, name)
self.assertTrue(ready, msg="wait_for_memcached failed")
self.assertTrue(helper.bucket_exists(name),
msg="unable to create {0} bucket".format(name))
self.threads = []
self.shutdown_load_data = False
def test_stats_during_load(self):
#how many items
#how many iterations
#couchdb url
#which stats
if "items" in self.params:
items = int(self.params["items"])
else:
items = 100 * 1000
if "iterations" in self.params:
iterations = int(self.params["iterations"])
else:
iterations = -1
self.onenodemc.flush(1)
time.sleep(2)
rest = RestConnection(self.master)
tasks = rest.active_tasks()
while tasks:
self.log.info("found active tasks, waiting for them to complete. {0}".format(tasks))
time.sleep(10)
self.log.info("set, items {0} , iterations {1}".format(items, iterations))
t = Thread(name="loader-thread", target=self._load_data, args=(items, iterations))
t.daemon = True
start = time.time()
t.start()
# shell = RemoteMachineShellConnection(self.master)
# mc_pid = RemoteMachineHelper(shell).is_process_running("memcached").pid
# beam_pid = RemoteMachineHelper(shell).is_process_running("beam.smp").pid
# m = Thread(name="cpu-stat-memcached", target=self._extract_proc_info, args=(shell,mc_pid,60))
# b = Thread(name="cpu-stat-beam", target=self._extract_proc_info, args=(shell,beam_pid,60))
# m.start()
# b.start()
# self.threads.extend([t, b, m])
# time.sleep(1000)
t.join()
end = time.time()
msg = "set took {0} seconds to iterate over {1} items {2} times"
self.log.info(msg.format((end - start), items, iterations))
result_data = {
"version":self.params["version"],
"product":"couchbase",
"nodes":1,
"operation":"set",
"items":items,
"value_size":256,
"key_size":36,
"json":False,
"active_task":None,
"ops_per_second":self.ops_per_second,
}
if self.results:
percent_change = self.results.compare_perf(result_data,
limit=1,
same_keys=["product",
"nodes",
"operation",
"value_size",
"key_size",
"json",
"active_task"],
different_keys={},
result_key="ops_per_second")
self.results.add_perf(result_data)
for change in percent_change:
self.assertTrue(change >= 0.9, msg="test performace is less than 90% of previous run")
def test_stats_during_get(self):
#how many items
#how many iterations
#couchdb url
#which stats
if "items" in self.params:
items = int(self.params["items"])
else:
items = 100 * 1000
if "iterations" in self.params:
iterations = int(self.params["iterations"])
else:
iterations = -1
self.onenodemc.flush(1)
time.sleep(2)
self.log.info("preloading {0} items".format(items))
keys = [str(uuid.uuid4()) for i in range(0, items)]
value = MemcachedClientHelper.create_value("*", 256)
for key in keys:
self.onenodemc.set(key, 0, 0, value)
RebalanceHelper.wait_for_stats(self.master, 'default', 'curr_items', len(keys))
RebalanceHelper.wait_for_stats(self.master, 'default', 'ep_queue_size', 0)
rest = RestConnection(self.master)
tasks = rest.active_tasks()
while tasks:
self.log.info("found active tasks, waiting for them to complete. {0}".format(tasks))
time.sleep(10)
self.log.info("get items {0} , iterations {1}".format(items, iterations))
t = Thread(name="getter-thread", target=self._get_data, args=(keys, iterations))
t.daemon = True
start = time.time()
t.start()
# shell = RemoteMachineShellConnection(self.master)
# mc_pid = RemoteMachineHelper(shell).is_process_running("memcached").pid
# beam_pid = RemoteMachineHelper(shell).is_process_running("beam.smp").pid
# m = Thread(name="cpu-stat-memcached", target=self._extract_proc_info, args=(shell,mc_pid,60))
# b = Thread(name="cpu-stat-beam", target=self._extract_proc_info, args=(shell,beam_pid,60))
# m.start()
# b.start()
# self.threads.extend([t, b, m])
# time.sleep(1000)
t.join()
end = time.time()
msg = "get took {0} seconds to iterate over {1} items {2} times"
self.log.info(msg.format((end - start), items, iterations))
result_data = {
"version":self.params["version"],
"product":"couchbase",
"nodes":1,
"operation":"get",
"items":items,
"value_size":256,
"key_size":36,
"json":False,
"active_task":None,
"ops_per_second":self.ops_per_second,
}
if self.results:
percent_change = self.results.compare_perf(result_data,
limit=1,
same_keys=["product",
"nodes",
"operation",
"value_size",
"key_size",
"json",
"active_task"],
different_keys={},
result_key="ops_per_second")
self.results.add_perf(result_data)
for change in percent_change:
self.assertTrue(change >= 0.9, msg="test performace is less than 90% of previous run")
def tearDown(self):
self.shutdown_load_data = True
if self.threads:
for t in self.threads:
t.join()
def _extract_proc_info(self, shell, pid, frequency):
while not self.shutdown_load_data:
time.sleep(frequency)
o, r = shell.execute_command("cat /proc/{0}/stat".format(pid))
# shell.log_command_output(o, r)
fields = ('pid comm state ppid pgrp session tty_nr tpgid flags minflt '
'cminflt majflt cmajflt utime stime cutime cstime priority '
'nice num_threads itrealvalue starttime vsize rss rsslim '
'startcode endcode startstack kstkesp kstkeip signal blocked '
'sigignore sigcatch wchan nswap cnswap exit_signal '
'processor rt_priority policy delayacct_blkio_ticks '
'guest_time cguest_time ').split(' ')
d = dict(zip(fields, o[0].split(' ')))
print d
def _load_data(self, items, iterations):
iteration = 0
self.ops_per_second = 0
ops_per_second = []
keys = [str(uuid.uuid4()) for i in range(0, items)]
value = MemcachedClientHelper.create_value("*", 256)
while iteration < iterations:
start = time.time()
for key in keys:
self.onenodemc.set(key, 0, 0, value)
iteration += 1
end = time.time()
msg = "set iteration #{0} took {1} seconds to iterate over {2} items"
self.log.info(msg.format(iteration, (end - start), items))
ops_per_second.append(len(keys) / float(end - start))
self.ops_per_second = sum(ops_per_second) / float(iterations)
def _get_data(self, keys, iterations):
iteration = 0
self.ops_per_second = 0
ops_per_second = []
while iteration < iterations:
start = time.time()
for key in keys:
self.onenodemc.get(key)
iteration += 1
end = time.time()
msg = "get iteration #{0} took {1} seconds to iterate over {2} items"
self.log.info(msg.format(iteration, (end - start), len(keys)))
ops_per_second.append(len(keys) / float(end - start))
self.ops_per_second = sum(ops_per_second) / float(iterations)
class AppendTests(unittest.TestCase):
def setUp(self):
self.log = logger.Logger.get_logger()
self.params = TestInputSingleton.input.test_params
self.master = TestInputSingleton.input.servers[0]
rest = RestConnection(self.master)
rest.init_cluster(self.master.rest_username, self.master.rest_password)
info = rest.get_nodes_self()
rest.init_cluster_memoryQuota(self.master.rest_username, self.master.rest_password,
memoryQuota=info.mcdMemoryReserved)
ClusterOperationHelper.cleanup_cluster([self.master])
ClusterOperationHelper.wait_for_ns_servers_or_assert([self.master], self)
self._create_default_bucket()
self.onenodemc = MemcachedClientHelper.direct_client(self.master, "default")
def _create_default_bucket(self):
name = "default"
master = self.master
rest = RestConnection(master)
helper = RestHelper(RestConnection(master))
if not helper.bucket_exists(name):
node_ram_ratio = BucketOperationHelper.base_bucket_ratio(TestInputSingleton.input.servers)
info = rest.get_nodes_self()
available_ram = info.memoryQuota * node_ram_ratio
rest.create_bucket(bucket=name, ramQuotaMB=int(available_ram))
ready = BucketOperationHelper.wait_for_memcached(master, name)
self.assertTrue(ready, msg="wait_for_memcached failed")
self.assertTrue(helper.bucket_exists(name),
msg="unable to create {0} bucket".format(name))
self.load_thread = None
self.shutdown_load_data = False
def test_append_wrong_cas(self):
#monitor the memory usage , it should not go beyond
#doing append 20,000 times ( each 5k) mem_used should not increase more than
#10 percent
#
stats = self.onenodemc.stats()
initial_mem_used = -1
if "mem_used" in stats:
initial_mem_used = int(stats["mem_used"])
self.assertTrue(initial_mem_used > 0)
key = str(uuid.uuid4())
size = 5 * 1024
value = MemcachedClientHelper.create_value("*", size)
self.onenodemc.set(key, 0, 0, value)
flags_v, cas_v, get_v = self.onenodemc.get(key)
self.onenodemc.append(key, value, cas_v)
iteration = 50000
for i in range(0, iteration):
try:
self.onenodemc.append(key, value, random.randint(0, 1000))
except:
#ignoring the error here
pass
stats = self.onenodemc.stats()
if "mem_used" in stats:
delta = int(stats["mem_used"]) - initial_mem_used
self.log.info("initial mem_used {0}, current mem_used {1}".format(initial_mem_used, stats["mem_used"]))
self.log.info(delta)
def test_append_with_delete(self):
#monitor the memory usage , it should not go beyond
#doing append 20,000 times ( each 5k) mem_used should not increase more than
#10 percent
#
if "iteration" in self.params:
iteration = int(self.params["iteration"])
else:
iteration = 50000
if "items" in self.params:
items = int(self.params["items"])
else:
items = 10000
if "append_size" in self.params:
append_size = int(self.params["append_size"])
else:
append_size = 5 * 1024
append_iteration_before_delete = 100
keys = [str(uuid.uuid4()) for i in range(0, items)]
size = 5 * 1024
stats = self.onenodemc.stats()
initial_mem_used = -1
self.log.info("items : {0} , iteration : {1} ".format(items, iteration))
if "mem_used" in stats:
initial_mem_used = int(stats["mem_used"])
self.assertTrue(initial_mem_used > 0)
for i in range(0, iteration):
for key in keys:
self.onenodemc.set(key, 0, 0, os.urandom(size))
try:
for append_iteration in range(0, append_iteration_before_delete):
appened_value = MemcachedClientHelper.create_value("*", append_size)
for key in keys:
self.onenodemc.append(key, appened_value)
self.onenodemc.get(key)
except:
#ignoring the error here
pass
stats = None
for t in range(0, 10):
try:
stats = self.onenodemc.stats()
break
except:
pass
if stats and "mem_used" in stats:
delta = int(stats["mem_used"]) - initial_mem_used
#only print out if delta is more than 20% than it should be
# delta ahould be #items * size + #items * append
expected_delta = items * (size + append_iteration_before_delete * append_size * 1.0)
msg = "initial mem_used {0}, current mem_used {1} , delta : {2} , expected delta : {3} , increase percentage {4}"
self.log.info(
msg.format(initial_mem_used, stats["mem_used"], delta, expected_delta, delta / expected_delta))
# if delta > (1.2 * expected_delta):
# self.fail("too much memory..")
# for key in keys:
# self.onenodemc.delete(key)
def tearDown(self):
self.shutdown_load_data = True
if self.load_thread:
self.load_thread.join()
# BucketOperationHelper.delete_all_buckets_or_assert([self.master], self)
class WarmUpMemcachedTest(unittest.TestCase):
def setUp(self):
self.log = logger.Logger.get_logger()
self.params = TestInputSingleton.input.test_params
self.master = TestInputSingleton.input.servers[0]
rest = RestConnection(self.master)
rest.init_cluster(self.master.rest_username, self.master.rest_password)
info = rest.get_nodes_self()
rest.init_cluster_memoryQuota(self.master.rest_username, self.master.rest_password,
memoryQuota=info.mcdMemoryReserved)
ClusterOperationHelper.cleanup_cluster([self.master])
ClusterOperationHelper.wait_for_ns_servers_or_assert([self.master], self)
BucketOperationHelper.delete_all_buckets_or_assert([self.master], self)
# recreate bucket instead of flush
self._create_default_bucket()
self.onenodemc = MemcachedClientHelper.direct_client(self.master, "default")
self._log_start()
def tearDown(self):
ClusterOperationHelper.cleanup_cluster([self.master])
BucketOperationHelper.delete_all_buckets_or_assert([self.master], self)
self._log_finish()
def _log_start(self):
try:
msg = "{0} : {1} started ".format(datetime.datetime.now(), self._testMethodName)
RestConnection(self.servers[0]).log_client_error(msg)
except:
pass
def _log_finish(self):
try:
msg = "{0} : {1} finished ".format(datetime.datetime.now(), self._testMethodName)
RestConnection(self.servers[0]).log_client_error(msg)
except:
pass
def _create_default_bucket(self):
name = "default"
master = self.master
rest = RestConnection(master)
node_ram_ratio = BucketOperationHelper.base_bucket_ratio(TestInputSingleton.input.servers)
info = rest.get_nodes_self()
available_ram = info.memoryQuota * node_ram_ratio
rest.create_bucket(bucket=name, ramQuotaMB=int(available_ram))
ready = BucketOperationHelper.wait_for_memcached(master, name)
self.assertTrue(ready, msg="wait_for_memcached failed")
self.load_thread = None
self.shutdown_load_data = False
def _insert_data(self, howmany):
# prefix = str(uuid.uuid4())
items = ["{0}-{1}".format(str(uuid.uuid4()), i) for i in range(0, howmany)]
for item in items:
self.onenodemc.set(item, 0, 0, item)
self.log.info("inserted {0} items".format(howmany))
def _do_warmup(self, howmany, timeout_in_seconds=1800):
# max_time is in micro seconds
self._insert_data(howmany)
curr_items = int(self.onenodemc.stats()["curr_items"])
uptime = int(self.onenodemc.stats()["uptime"])
RebalanceHelper.wait_for_stats(self.master, "default", 'ep_queue_size', 0)
RebalanceHelper.wait_for_stats(self.master, "default", 'ep_flusher_todo', 0)
self.log.info(curr_items)
self.log.info("sleeping for 10 seconds")
time.sleep(10)
rest = RestConnection(self.master)
command = "[erlang:exit(element(2, X), kill) || X <- supervisor:which_children(ns_port_sup)]."
memcached_restarted = rest.diag_eval(command)
self.assertTrue(memcached_restarted, "unable to restart memcached/moxi process through diag/eval")
#wait until memcached starts
start = time.time()
memcached_restarted = False
while time.time() - start < 60:
try:
self.onenodemc = MemcachedClientHelper.direct_client(self.master, "default")
value = int(self.onenodemc.stats()["uptime"])
if value < uptime:
self.log.info("memcached restarted...")
memcached_restarted = True
break
self.onenodemc.close()
except Exception:
time.sleep(1)
self.assertTrue(memcached_restarted, "memcached restarted and uptime is now reset")
# Warmup till curr_items match
self.onenodemc = MemcachedClientHelper.direct_client(self.master, "default")
stats = self.onenodemc.stats()
present_count = int(stats["curr_items"])
self.log.info("ep curr_items : {0}, inserted_items {1}".format(present_count, curr_items))
start = time.time()
while present_count < curr_items:
if (time.time() - start) <= timeout_in_seconds:
stats = self.onenodemc.stats()
present_count = int(stats["curr_items"])
self.log.info("curr_items : {0}".format(present_count))
time.sleep(1)
else:
self.fail("Timed out waiting for warmup")
self.log.info("ep curr_items : {0}, inserted_items {1}".format(present_count, curr_items))
stats = self.onenodemc.stats()
warmup_time = int(stats["ep_warmup_time"])
self.log.info("ep_warmup_time is {0}".format(warmup_time))
def do_warmup_2(self):
self._do_warmup(2)
def do_warmup_10k(self):
self._do_warmup(10000)
def do_warmup_100k(self):
self._do_warmup(100000)
def do_warmup_1M(self):
self._do_warmup(1000000)
class MultiGetNegativeTest(unittest.TestCase):
def setUp(self):
self.log = logger.Logger.get_logger()
self.params = TestInputSingleton.input.test_params
self.master = TestInputSingleton.input.servers[0]
rest = RestConnection(self.master)
rest.init_cluster(self.master.rest_username, self.master.rest_password)
info = rest.get_nodes_self()
rest.init_cluster_memoryQuota(self.master.rest_username, self.master.rest_password,
memoryQuota=info.mcdMemoryReserved)
ClusterOperationHelper.cleanup_cluster([self.master])
ClusterOperationHelper.wait_for_ns_servers_or_assert([self.master], self)
self._create_default_bucket()
self.keys_cleanup = []
self.onenodemc = MemcachedClientHelper.direct_client(self.master, "default", timeout=600)
self.onenodemoxi = MemcachedClientHelper.proxy_client(self.master, "default", timeout=600)
def tearDown(self):
if self.onenodemc:
#delete the keys
for key in self.keys_cleanup:
try:
self.onenodemc.delete(key)
except Exception:
pass
def _create_default_bucket(self):
name = "default"
master = self.master
rest = RestConnection(master)
helper = RestHelper(RestConnection(master))
if not helper.bucket_exists(name):
node_ram_ratio = BucketOperationHelper.base_bucket_ratio(TestInputSingleton.input.servers)
info = rest.get_nodes_self()
available_ram = info.memoryQuota * node_ram_ratio
rest.create_bucket(bucket=name, ramQuotaMB=int(available_ram))
ready = BucketOperationHelper.wait_for_memcached(master, name)
self.assertTrue(ready, msg="wait_for_memcached failed")
self.assertTrue(helper.bucket_exists(name),
msg="unable to create {0} bucket".format(name))
def _insert_data(self, client, howmany):
prefix = str(uuid.uuid4())
keys = ["{0}-{1}".format(prefix, i) for i in range(0, howmany)]
value = MemcachedClientHelper.create_value("*", 1024)
for key in keys:
client.set(key, 0, 0, value)
self.log.info("inserted {0} items".format(howmany))
return keys
def test_mc_multi_get(self):
self._test_multi_get(self.onenodemc, 10)
self._test_multi_get(self.onenodemc, 100)
self._test_multi_get(self.onenodemc, 1000)
self._test_multi_get(self.onenodemc, 10 * 1000)
self._test_multi_get(self.onenodemc, 20 * 1000)
self._test_multi_get(self.onenodemc, 30 * 1000)
self._test_multi_get(self.onenodemc, 40 * 1000)
def test_mx_multi_get(self):
self._test_multi_get(self.onenodemoxi, 10)
self._test_multi_get(self.onenodemoxi, 100)
self._test_multi_get(self.onenodemoxi, 1000)
self._test_multi_get(self.onenodemoxi, 10 * 1000)
self._test_multi_get(self.onenodemoxi, 20 * 1000)
self._test_multi_get(self.onenodemoxi, 30 * 1000)
self._test_multi_get(self.onenodemoxi, 40 * 1000)
def _test_multi_get(self, client, howmany):
mc_message = "memcached virt memory size {0} : resident memory size : {1}"
mx_message = "moxi virt memory size {0} : resident memory size : {1}"
shell = RemoteMachineShellConnection(self.master)
moxi_pid = RemoteMachineHelper(shell).is_process_running("moxi").pid
mc_pid = RemoteMachineHelper(shell).is_process_running("memcached").pid
keys = self._insert_data(client, howmany)
self.keys_cleanup.extend(keys)
self.log.info("printing moxi and memcached stats before running multi-get")
moxi_sys_stats = self._extract_proc_info(shell, moxi_pid)
memcached_sys_stats = self._extract_proc_info(shell, mc_pid)
self.log.info(mc_message.format(int(memcached_sys_stats["rss"]) * 4096,
memcached_sys_stats["vsize"]))
self.log.info(mx_message.format(int(moxi_sys_stats["rss"]) * 4096,
moxi_sys_stats["vsize"]))
self.log.info("running multiget to get {0} keys".format(howmany))
gets = client.getMulti(keys)
self.log.info("recieved {0} keys".format(len(gets)))
self.log.info(gets)
self.assertEquals(len(gets), len(keys))
self.log.info("printing moxi and memcached stats after running multi-get")
moxi_sys_stats = self._extract_proc_info(shell, moxi_pid)
memcached_sys_stats = self._extract_proc_info(shell, mc_pid)
self.log.info(mc_message.format(int(memcached_sys_stats["rss"]) * 4096,
memcached_sys_stats["vsize"]))
self.log.info(mx_message.format(int(moxi_sys_stats["rss"]) * 4096,
moxi_sys_stats["vsize"]))
def _extract_proc_info(self, shell, pid):
o, r = shell.execute_command("cat /proc/{0}/stat".format(pid))
fields = ('pid comm state ppid pgrp session tty_nr tpgid flags minflt '
'cminflt majflt cmajflt utime stime cutime cstime priority '
'nice num_threads itrealvalue starttime vsize rss rsslim '
'startcode endcode startstack kstkesp kstkeip signal blocked '
'sigignore sigcatch wchan nswap cnswap exit_signal '
'processor rt_priority policy delayacct_blkio_ticks '
'guest_time cguest_time ').split(' ')
d = dict(zip(fields, o[0].split(' ')))
return d
class MemcachedValueSizeLimitTest(unittest.TestCase):
def setUp(self):
self.log = logger.Logger.get_logger()
self.params = TestInputSingleton.input.test_params
self.master = TestInputSingleton.input.servers[0]
rest = RestConnection(self.master)
rest.init_cluster(self.master.rest_username, self.master.rest_password)
info = rest.get_nodes_self()
rest.init_cluster_memoryQuota(self.master.rest_username, self.master.rest_password,
memoryQuota=info.mcdMemoryReserved)
ClusterOperationHelper.cleanup_cluster([self.master])
ClusterOperationHelper.wait_for_ns_servers_or_assert([self.master], self)
self._create_default_bucket()
self.keys_cleanup = []
self.onenodemc = MemcachedClientHelper.direct_client(self.master, "default")
def tearDown(self):
if self.onenodemc:
#delete the keys
for key in self.keys_cleanup:
try:
self.onenodemc.delete(key)
except Exception:
pass
def _create_default_bucket(self):
name = "default"
master = self.master
rest = RestConnection(master)
helper = RestHelper(RestConnection(master))
if not helper.bucket_exists(name):
node_ram_ratio = BucketOperationHelper.base_bucket_ratio(TestInputSingleton.input.servers)
info = rest.get_nodes_self()
available_ram = info.memoryQuota * node_ram_ratio
rest.create_bucket(bucket=name, ramQuotaMB=int(available_ram))
ready = BucketOperationHelper.wait_for_memcached(master, name)
self.assertTrue(ready, msg="wait_for_memcached failed")
self.assertTrue(helper.bucket_exists(name),
msg="unable to create {0} bucket".format(name))
def test_append_till_20_mb(self):
value = MemcachedClientHelper.create_value("*", 1024 * 1024)
key = str(uuid.uuid4())
self.keys_cleanup.append(key)
self.onenodemc.set(key, 0, 0, value)
a,b,c = self.onenodemc.get(key)
print "key has %s characters" % (len(c))
#create 10 byte value
value = '1234567890'
for i in range(0, (100*1024 - 1)):
if i % 10 == 0 :
stats = self.onenodemc.stats('allocator')
lines = stats['detailed'].split('\n')
print "%s th append" % (i)
for line in lines:
if line.find("MALLOC:") >= 0:
print line
self.onenodemc.append(key, value)
try:
self.onenodemc.append(key, value)
a,b,c = self.onenodemc.get(key)
print "key has %s characters" % (len(c))
self.fail("memcached did not raise an error")
except mc_bin_client.MemcachedError as err:
self.assertEquals(err.status, 5)
def test_prepend_till_20_mb(self):
initial_value = "12345678"
key = str(uuid.uuid4())
self.keys_cleanup.append(key)
self.onenodemc.set(key, 0, 0, initial_value)
#for 20 * 1024 times append 1024 chars each time
value = MemcachedClientHelper.create_value("*", 1024 * 20)
for i in range(0, (1024 - 1)):
self.onenodemc.prepend(key, value)
try:
self.onenodemc.prepend(key, value)
self.fail("memcached did not raise an error")
except mc_bin_client.MemcachedError as err:
self.assertEquals(err.status, 5)
# def test_incr_till_max(self):
# initial_value = '0'
# max_value = pow(2, 64)
# step = max_value / 1024
# self.log.info("step : {0}")
# key = str(uuid.uuid4())
# self.keys_cleanup.append(key)
# self.onenodemc.set(key, 0, 0, initial_value)
# for i in range(0, (2 * 1024 - 1)):
# self.onenodemc.incr(key, amt=step)
# a, b, c = self.onenodemc.get(key)
# delta = long(c) - max_value
# self.log.info("delta = value - pow(2,64) = {0}".format(delta))
# a, b, c = self.onenodemc.get(key)
# self.log.info("key : {0} value {1}".format(key, c))
# try:
# self.onenodemc.incr(key, step)
# self.fail("memcached did not raise an error")
# except mc_bin_client.MemcachedError as err:
# self.assertEquals(err.status, 5)
#
# def test_decr_till_max(self):
# initial_value = '1'
# max_value = pow(2, 64)
# step = max_value / 1024
# key = str(uuid.uuid4())
# self.keys_cleanup.append(key)
# self.onenodemc.set(key, 0, 0, initial_value)
# for i in range(0, (1024 - 1)):
# self.onenodemc.decr(key, amt=step)
# a, b, c = self.onenodemc.get(key)
# self.log.info("key : {0} value {1}".format(key, c))
# try:
# self.onenodemc.incr(key, step)
# self.fail("memcached did not raise an error")
# except mc_bin_client.MemcachedError as err:
# self.assertEquals(err.status, 5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment