-
-
Save deniszh/7986974 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python | |
import os | |
import mmap | |
import struct | |
import signal | |
import optparse | |
import cql | |
try: | |
import whisper | |
except ImportError: | |
raise SystemExit('[ERROR] Please make sure whisper is installed properly') | |
# Ignore SIGPIPE | |
signal.signal(signal.SIGPIPE, signal.SIG_DFL) | |
option_parser = optparse.OptionParser(usage='''%prog path''') | |
(options, args) = option_parser.parse_args() | |
if len(args) != 1: | |
option_parser.error("require one input file name") | |
else: | |
path = args[0] | |
def mmap_file(filename): | |
fd = os.open(filename, os.O_RDONLY) | |
map = mmap.mmap(fd, os.fstat(fd).st_size, prot=mmap.PROT_READ) | |
os.close(fd) | |
return map | |
def read_header(map): | |
try: | |
(aggregationType,maxRetention,xFilesFactor,archiveCount) = struct.unpack(whisper.metadataFormat,map[:whisper.metadataSize]) | |
except: | |
raise CorruptWhisperFile("Unable to unpack header") | |
archives = [] | |
archiveOffset = whisper.metadataSize | |
for i in xrange(archiveCount): | |
try: | |
(offset, secondsPerPoint, points) = struct.unpack(whisper.archiveInfoFormat, map[archiveOffset:archiveOffset+whisper.archiveInfoSize]) | |
except: | |
raise CorruptWhisperFile("Unable to read archive %d metadata" % i) | |
archiveInfo = { | |
'offset' : offset, | |
'secondsPerPoint' : secondsPerPoint, | |
'points' : points, | |
'retention' : secondsPerPoint * points, | |
'size' : points * whisper.pointSize, | |
} | |
archives.append(archiveInfo) | |
archiveOffset += whisper.archiveInfoSize | |
header = { | |
'aggregationMethod' : whisper.aggregationTypeToMethod.get(aggregationType, 'average'), | |
'maxRetention' : maxRetention, | |
'xFilesFactor' : xFilesFactor, | |
'archives' : archives, | |
} | |
return header | |
def dump_header(header): | |
print 'Meta data:' | |
print ' aggregation method: %s' % header['aggregationMethod'] | |
print ' max retention: %d' % header['maxRetention'] | |
print ' xFilesFactor: %g' % header['xFilesFactor'] | |
def dump_archive_headers(archives): | |
for i,archive in enumerate(archives): | |
print 'Archive %d info:' % i | |
print ' offset: %d' % archive['offset'] | |
print ' seconds per point: %d' % archive['secondsPerPoint'] | |
print ' points: %d' % archive['points'] | |
print ' retention: %d' % archive['retention'] | |
print ' size: %d' % archive['size'] | |
def dump_archives(archives): | |
name = path.replace('/opt/graphite/storage/whisper/','',1) | |
name = name.replace('.wsp','',1) | |
name = name.replace('/', '.') | |
con = cql.connect('127.0.0.1', 9160, 'metric', cql_version='3.0.0') | |
print ("Connected to Cassandra!") | |
cursor = con.cursor() | |
for i,archive in enumerate(archives): | |
print 'Archive %d data:' %i | |
offset = archive['offset'] | |
for point in xrange(archive['points']): | |
(timestamp, value) = struct.unpack(whisper.pointFormat, map[offset:offset+whisper.pointSize]) | |
print '%d: %d, %10.35g' % (point, timestamp, value) | |
offset += whisper.pointSize | |
period = archive['retention'] | |
rollup = archive['secondsPerPoint'] | |
ttl = period | |
#CQLString = "UPDATE metric USING TTL ? SET data = data + ? WHERE tenant = '' AND rollup = ? AND period = ? AND path = ? AND time = ?;" | |
#cursor.execute(CQLString, [ttl, value, rollup, period, name, timestamp]) | |
#print CQLString, [ttl, value, rollup, period, name, timestamp] | |
if timestamp > 0: | |
CQLString = "UPDATE metric USING TTL %d SET data = data + [ %d ] WHERE tenant = '' AND rollup = %d AND period = %d AND path = '%s' AND time = %d;" % (ttl, value, rollup, period, name, timestamp) | |
#print CQLString | |
cursor.execute(CQLString) | |
if not os.path.exists(path): | |
raise SystemExit('[ERROR] File "%s" does not exist!' % path) | |
map = mmap_file(path) | |
header = read_header(map) | |
dump_header(header) | |
dump_archive_headers(header['archives']) | |
dump_archives(header['archives']) |
Yep, indeed, replication factor must be 3, even scarier... maybe we can replace Cassandra with InfluxDB? Clojure module is also present... Not sure about space on InfluxDB, and its in early stage of maturity anyway...
Sorry for digging up this old thing, but I've got to ask @deniszh: how long did this script take to run for your TB of whisper data? I've been running it for our measly 300GB of data for about 12 hours now, using 8 parallel jobs and it seems like it will take literally forever.
I know I'm just being lazy now, but is there a more updated version of this? ;)
Hi @punnie,
never tried to migrate our main storage though because of space concerns. But, even for linear scalability, if migration of 1281 file took ~ 1281 sec (very loose approximation) in 16 threads, then migration of full storage of 160K metrics will took 160000 sec = 44 hours.
I do not think that you will be able to squeeze more speed from python version, and I'm loose at clojure - maybe @pir will be able to provide clojure version of this script with built in concurrency.
@punnie
But btw - speed is not required for migration. You can spit metrics stream in two with relay, one stream will go to your old installation - new will go to cyanite. After that you can copy wsp files and migrate them, even if it will took a week - you will get all data in cyanite and then remove old installation.
as for the rep factor, i'd go for rep-factor 3, with one distant copy