Created
May 17, 2016 10:55
-
-
Save abhi-bit/f93ba8bcae15cc6e512d841a1df6b827 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import argparse | |
import numpy as np | |
import matplotlib.pyplot as plt | |
import datetime as dt | |
import json | |
import matplotlib.pyplot as plt | |
import matplotlib.font_manager as font_manager | |
import matplotlib.dates | |
from matplotlib.dates import MONTHLY, DateFormatter, rrulewrapper, RRuleLocator | |
import logging | |
from pylab import * | |
FORMAT = "%(asctime)s vbucket-moves %(levelname)-8s: %(message)s" | |
logging.basicConfig(format=FORMAT) | |
logger = logging.getLogger('tcpserver') | |
class MasterEvents(object): | |
def __init__(self): | |
pass | |
def parse(self, filename): | |
try: | |
self.file = open(filename, 'r') | |
except IOError, e: | |
logger.error(e) | |
return None | |
data = self._rebalances() | |
self.file.close() | |
return data | |
def _next_line(self): | |
line = self.file.readline() | |
if not line: | |
return None | |
try: | |
raw = json.loads(line) | |
return raw | |
except: | |
print "Bad line: ", line | |
return None | |
def _rebalances(self): | |
rebalances = list() | |
rebalance = None | |
raw = self._next_line() | |
while raw: | |
if raw['type'] == 'rebalanceStart': | |
rebalance = dict() | |
rebalance['meta'] = dict() | |
rebalance['meta']['start'] = int(raw['ts']) | |
self._rebalance(rebalance) | |
rebalances.append(rebalance) | |
elif raw['type'] == 'rebalanceEnd': | |
rebalance['meta']['end'] = int(raw['ts']) | |
rebalances.append(rebalance) | |
rebalance = None | |
raw = self._next_line() | |
return rebalances | |
def _rebalance(self, rebalance): | |
raw = self._next_line() | |
while raw: | |
if raw['type'] == 'rebalanceEnd': | |
rebalance['meta']['end'] = int(raw['ts']) | |
return | |
elif raw['type'] == 'bucketRebalanceStarted': | |
name = raw['bucket'].encode('ascii') | |
bucket = dict() | |
bucket['meta'] = dict() | |
bucket['meta']['start'] = int(raw['ts']) | |
bucket['vbuckets'] = dict() | |
self._bucket(bucket) | |
rebalance[name] = bucket | |
raw = self._next_line() | |
def _bucket(self, bucket): | |
events = ['indexingInitiated', 'backfillPhaseEnded', | |
'seqnoWaitingEnded', 'waitIndexUpdatedEnded', | |
'vbucketMoveDone'] | |
raw = self._next_line() | |
while raw: | |
if raw['type'] == 'bucketRebalanceEnded': | |
bucket['meta']['end'] = int(raw['ts']) | |
return | |
if raw['type'] == 'vbucketMoveStart': | |
vbucketid = raw['vbucket'] | |
phase = raw['type'] | |
bucket['vbuckets'][vbucketid] = dict() | |
bucket['vbuckets'][vbucketid][phase] = int(raw['ts']) | |
### Begin Experimental | |
before = raw['chainBefore'] | |
after = raw['chainAfter'] | |
if len(before) > 0 and len(after) > 0: | |
if before[0] == after[0]: | |
bucket['vbuckets'][vbucketid]['move_to'] = after[1] | |
else: | |
bucket['vbuckets'][vbucketid]['move_to'] = after[0] | |
else: | |
print 'error' | |
### End Experimental | |
elif raw['type'] in events: | |
vbucketid = raw['vbucket'] | |
phase = raw['type'] | |
bucket['vbuckets'][vbucketid][phase] = int(raw['ts']) | |
raw = self._next_line() | |
def convert(rebalances): | |
moves = dict() | |
for i, rebalance in enumerate(rebalances): | |
for name, bucket in rebalance.items(): | |
if name == 'meta': | |
continue | |
bucket_moves = convert_bucket(bucket) | |
moves['reb' + str(i) + '-' + name] = bucket_moves | |
return moves | |
def convert_bucket(bucket): | |
moves = list() | |
#print json.dumps(bucket, sort_keys=True, indent=2) | |
base_time = bucket['meta']['start'] | |
for id, data in bucket['vbuckets'].items(): | |
start = data['vbucketMoveStart'] - base_time | |
backfill_end = data['backfillPhaseEnded'] - base_time | |
try: | |
index_end = data['waitIndexUpdatedEnded'] - base_time | |
except: | |
index_end = -1 # No indexing | |
seqno_end = data['seqnoWaitingEnded'] - base_time | |
end = data['vbucketMoveDone'] - base_time | |
#print id, start, backfill_end, index_end, end | |
data_point = (id, start, backfill_end, seqno_end, index_end, end) | |
#if data['move_to'] == '172.23.100.32:11209': | |
moves.append(data_point) | |
# Sort by start time | |
return sorted(moves, key=lambda st_time: st_time[1]) | |
def plot(moves): | |
for name, data in moves.items(): | |
plot_single(name, data) | |
def plot_single(name, moves): | |
# List Format: | |
# Vbucket, move start, backfill end, seqno persist, index end, move end | |
h_bar = .5 | |
#moves = [(0, 0, 5, 10, 15, 20), (1, 10, 15, 20, 25, 30)] | |
if len(moves) == 0: | |
logger.error("No rebalance data for \'%s\'" % name) | |
return | |
fig = plt.figure(figsize=(50, len(moves)/4)) | |
ax = fig.add_subplot(111) | |
ylabels = [] | |
for i,move in enumerate(moves): | |
vbucket = move[0] | |
start = move[1] | |
backfill_end = move[2] | |
seqno_end = move[3] | |
index_end = move[4] | |
end = move[5] | |
label = 'VBucket' + str(vbucket) | |
ylabels.append(label) | |
clr = 'green' | |
if index_end != -1: | |
clr = 'blue' | |
ax.barh(h_bar*(i+1), end-start, left=start, height=0.3, align='center', | |
color=clr, alpha = 0.75) | |
ax.barh(h_bar*(i+1)-.05, backfill_end-start, left=start, height=0.1, | |
align='center', color='red', alpha = 0.75)#, label = "Backfill") | |
if index_end != -1: | |
ax.barh(h_bar*(i+1)+.05, index_end-start, left=start, height=0.1, | |
align='center', color='yellow', alpha = 0.75)#, label = "Indexing") | |
ax.barh(h_bar*(i+1)+.05, seqno_end-start, left=start, height=0.1, | |
align='center', color='black', alpha = 0.75)#, label = "Indexing") | |
# Format the y-axis | |
pos = arange(0.5,(len(ylabels) / 2 + 0.5),0.5) | |
locsy, labelsy = yticks(pos,ylabels) | |
plt.setp(labelsy, fontsize = 10) | |
# Format the x-axis | |
ax.axis('tight') | |
ax.grid(color = 'g', linestyle = ':') | |
labelsx = ax.get_xticklabels() | |
plt.setp(labelsx, rotation=30, fontsize=12) | |
# Format the legend | |
font = font_manager.FontProperties(size='small') | |
ax.legend(loc=1,prop=font) | |
# Finish up | |
ax.invert_yaxis() | |
plt.savefig(name + '.png', bbox_inches='tight', dpi=120) | |
#plt.show() | |
def set_log_level(level): | |
if level == 0: | |
logger.setLevel(logging.ERROR) | |
elif level == 1: | |
logger.setLevel(logging.INFO) | |
elif level == 2: | |
logger.setLevel(logging.INFO) | |
else: | |
logger.setLevel(logging.DEBUG) | |
def main(): | |
parser = argparse.ArgumentParser(prog='vbucket-moves', | |
usage='%(prog)s -f <file> [options]', | |
add_help=True) | |
parser.add_argument('-f', '--file', required=True, | |
help='The path to the master events file') | |
parser.add_argument('-v', '--verbose', action='count', | |
help='Increase verbosity') | |
parser.add_argument | |
args = parser.parse_args() | |
filename = args.file | |
set_log_level(args.verbose) | |
events = MasterEvents() | |
data = events.parse(filename) | |
if not data: | |
return | |
moves = convert(data) | |
plot(moves) | |
print json.dumps(data, sort_keys=True, indent=2) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment