Last active
February 16, 2023 03:39
-
-
Save deniszh/7986974 to your computer and use it in GitHub Desktop.
Quick and dirty script to migrate Graphite from whisper to cyanite
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import mmap | |
import struct | |
import signal | |
import optparse | |
import cql | |
try: | |
import whisper | |
except ImportError: | |
raise SystemExit('[ERROR] Please make sure whisper is installed properly') | |
# Ignore SIGPIPE | |
signal.signal(signal.SIGPIPE, signal.SIG_DFL) | |
option_parser = optparse.OptionParser(usage='''%prog path''') | |
(options, args) = option_parser.parse_args() | |
if len(args) != 1: | |
option_parser.error("require one input file name") | |
else: | |
path = args[0] | |
def mmap_file(filename): | |
fd = os.open(filename, os.O_RDONLY) | |
map = mmap.mmap(fd, os.fstat(fd).st_size, prot=mmap.PROT_READ) | |
os.close(fd) | |
return map | |
def read_header(map): | |
try: | |
(aggregationType,maxRetention,xFilesFactor,archiveCount) = struct.unpack(whisper.metadataFormat,map[:whisper.metadataSize]) | |
except: | |
raise CorruptWhisperFile("Unable to unpack header") | |
archives = [] | |
archiveOffset = whisper.metadataSize | |
for i in xrange(archiveCount): | |
try: | |
(offset, secondsPerPoint, points) = struct.unpack(whisper.archiveInfoFormat, map[archiveOffset:archiveOffset+whisper.archiveInfoSize]) | |
except: | |
raise CorruptWhisperFile("Unable to read archive %d metadata" % i) | |
archiveInfo = { | |
'offset' : offset, | |
'secondsPerPoint' : secondsPerPoint, | |
'points' : points, | |
'retention' : secondsPerPoint * points, | |
'size' : points * whisper.pointSize, | |
} | |
archives.append(archiveInfo) | |
archiveOffset += whisper.archiveInfoSize | |
header = { | |
'aggregationMethod' : whisper.aggregationTypeToMethod.get(aggregationType, 'average'), | |
'maxRetention' : maxRetention, | |
'xFilesFactor' : xFilesFactor, | |
'archives' : archives, | |
} | |
return header | |
def dump_header(header): | |
print 'Meta data:' | |
print ' aggregation method: %s' % header['aggregationMethod'] | |
print ' max retention: %d' % header['maxRetention'] | |
print ' xFilesFactor: %g' % header['xFilesFactor'] | |
def dump_archive_headers(archives): | |
for i,archive in enumerate(archives): | |
print 'Archive %d info:' % i | |
print ' offset: %d' % archive['offset'] | |
print ' seconds per point: %d' % archive['secondsPerPoint'] | |
print ' points: %d' % archive['points'] | |
print ' retention: %d' % archive['retention'] | |
print ' size: %d' % archive['size'] | |
def dump_archives(archives): | |
name = path.replace('/opt/graphite/storage/whisper/','',1) | |
name = name.replace('.wsp','',1) | |
name = name.replace('/', '.') | |
con = cql.connect('127.0.0.1', 9160, 'metric', cql_version='3.0.0') | |
print ("Connected to Cassandra!") | |
cursor = con.cursor() | |
for i,archive in enumerate(archives): | |
print 'Archive %d data:' %i | |
offset = archive['offset'] | |
for point in xrange(archive['points']): | |
(timestamp, value) = struct.unpack(whisper.pointFormat, map[offset:offset+whisper.pointSize]) | |
print '%d: %d, %10.35g' % (point, timestamp, value) | |
offset += whisper.pointSize | |
period = archive['retention'] | |
rollup = archive['secondsPerPoint'] | |
ttl = period | |
#CQLString = "UPDATE metric USING TTL ? SET data = data + ? WHERE tenant = '' AND rollup = ? AND period = ? AND path = ? AND time = ?;" | |
#cursor.execute(CQLString, [ttl, value, rollup, period, name, timestamp]) | |
#print CQLString, [ttl, value, rollup, period, name, timestamp] | |
if timestamp > 0: | |
CQLString = "UPDATE metric USING TTL %d SET data = data + [ %d ] WHERE tenant = '' AND rollup = %d AND period = %d AND path = '%s' AND time = %d;" % (ttl, value, rollup, period, name, timestamp) | |
#print CQLString | |
cursor.execute(CQLString) | |
if not os.path.exists(path): | |
raise SystemExit('[ERROR] File "%s" does not exist!' % path) | |
map = mmap_file(path) | |
header = read_header(map) | |
dump_header(header) | |
dump_archive_headers(header['archives']) | |
dump_archives(header['archives']) |
@punnie
But btw - speed is not required for migration. You can spit metrics stream in two with relay, one stream will go to your old installation - new will go to cyanite. After that you can copy wsp files and migrate them, even if it will took a week - you will get all data in cyanite and then remove old installation.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi @punnie,
never tried to migrate our main storage though because of space concerns. But, even for linear scalability, if migration of 1281 file took ~ 1281 sec (very loose approximation) in 16 threads, then migration of full storage of 160K metrics will took 160000 sec = 44 hours.
I do not think that you will be able to squeeze more speed from python version, and I'm loose at clojure - maybe @pir will be able to provide clojure version of this script with built in concurrency.