Skip to content

Instantly share code, notes, and snippets.

@deniszh
Last active February 16, 2023 03:39
Show Gist options
  • Save deniszh/7986974 to your computer and use it in GitHub Desktop.
Save deniszh/7986974 to your computer and use it in GitHub Desktop.
Quick and dirty script to migrate Graphite from whisper to cyanite
#!/usr/bin/env python
import os
import mmap
import struct
import signal
import optparse
import cql
try:
import whisper
except ImportError:
raise SystemExit('[ERROR] Please make sure whisper is installed properly')
# Ignore SIGPIPE
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
option_parser = optparse.OptionParser(usage='''%prog path''')
(options, args) = option_parser.parse_args()
if len(args) != 1:
option_parser.error("require one input file name")
else:
path = args[0]
def mmap_file(filename):
fd = os.open(filename, os.O_RDONLY)
map = mmap.mmap(fd, os.fstat(fd).st_size, prot=mmap.PROT_READ)
os.close(fd)
return map
def read_header(map):
try:
(aggregationType,maxRetention,xFilesFactor,archiveCount) = struct.unpack(whisper.metadataFormat,map[:whisper.metadataSize])
except:
raise CorruptWhisperFile("Unable to unpack header")
archives = []
archiveOffset = whisper.metadataSize
for i in xrange(archiveCount):
try:
(offset, secondsPerPoint, points) = struct.unpack(whisper.archiveInfoFormat, map[archiveOffset:archiveOffset+whisper.archiveInfoSize])
except:
raise CorruptWhisperFile("Unable to read archive %d metadata" % i)
archiveInfo = {
'offset' : offset,
'secondsPerPoint' : secondsPerPoint,
'points' : points,
'retention' : secondsPerPoint * points,
'size' : points * whisper.pointSize,
}
archives.append(archiveInfo)
archiveOffset += whisper.archiveInfoSize
header = {
'aggregationMethod' : whisper.aggregationTypeToMethod.get(aggregationType, 'average'),
'maxRetention' : maxRetention,
'xFilesFactor' : xFilesFactor,
'archives' : archives,
}
return header
def dump_header(header):
print 'Meta data:'
print ' aggregation method: %s' % header['aggregationMethod']
print ' max retention: %d' % header['maxRetention']
print ' xFilesFactor: %g' % header['xFilesFactor']
print
def dump_archive_headers(archives):
for i,archive in enumerate(archives):
print 'Archive %d info:' % i
print ' offset: %d' % archive['offset']
print ' seconds per point: %d' % archive['secondsPerPoint']
print ' points: %d' % archive['points']
print ' retention: %d' % archive['retention']
print ' size: %d' % archive['size']
print
def dump_archives(archives):
name = path.replace('/opt/graphite/storage/whisper/','',1)
name = name.replace('.wsp','',1)
name = name.replace('/', '.')
con = cql.connect('127.0.0.1', 9160, 'metric', cql_version='3.0.0')
print ("Connected to Cassandra!")
cursor = con.cursor()
for i,archive in enumerate(archives):
print 'Archive %d data:' %i
offset = archive['offset']
for point in xrange(archive['points']):
(timestamp, value) = struct.unpack(whisper.pointFormat, map[offset:offset+whisper.pointSize])
print '%d: %d, %10.35g' % (point, timestamp, value)
offset += whisper.pointSize
period = archive['retention']
rollup = archive['secondsPerPoint']
ttl = period
#CQLString = "UPDATE metric USING TTL ? SET data = data + ? WHERE tenant = '' AND rollup = ? AND period = ? AND path = ? AND time = ?;"
#cursor.execute(CQLString, [ttl, value, rollup, period, name, timestamp])
#print CQLString, [ttl, value, rollup, period, name, timestamp]
if timestamp > 0:
CQLString = "UPDATE metric USING TTL %d SET data = data + [ %d ] WHERE tenant = '' AND rollup = %d AND period = %d AND path = '%s' AND time = %d;" % (ttl, value, rollup, period, name, timestamp)
#print CQLString
cursor.execute(CQLString)
print
if not os.path.exists(path):
raise SystemExit('[ERROR] File "%s" does not exist!' % path)
map = mmap_file(path)
header = read_header(map)
dump_header(header)
dump_archive_headers(header['archives'])
dump_archives(header['archives'])
@pyr
Copy link

pyr commented Dec 16, 2013

Hi denis,

This is awesome, thanks for taking the time to do it. Great to see that cassandra 1.2 works well with cyanite. Bad news about the data size, I'll see what I can do about this, are you running compression on the column family ?

@deniszh
Copy link
Author

deniszh commented Dec 16, 2013

Yep, compression is here:

cqlsh> DESCRIBE KEYSPACE metric

CREATE KEYSPACE metric WITH replication = {
  'class': 'SimpleStrategy',
  'replication_factor': '1'
};

USE metric;

CREATE TABLE metric (
  period int,
  rollup int,
  path text,
  time bigint,
  data list<double>,
  PRIMARY KEY ((period, rollup, path), time)
) WITH
  bloom_filter_fp_chance=0.010000 AND
  caching='KEYS_ONLY' AND
  comment='' AND
  dclocal_read_repair_chance=0.000000 AND
  gc_grace_seconds=864000 AND
  read_repair_chance=0.100000 AND
  replicate_on_write='true' AND
  populate_io_cache_on_flush='false' AND
  compaction={'class': 'SizeTieredCompactionStrategy'} AND
  compression={'sstable_compression': 'LZ4Compressor'};

And I just tried same with Cassandra 2.3 - same results, 1.3GB of data.
It not unusual though - whisper format is quite space effective.

But we have 1TB of data in whisper now, so we need at least 1 * 4 * 2 (replication factor) * 2 DC = 16TB. Geez...

@pyr
Copy link

pyr commented Dec 16, 2013

ouch indeed. I'll see what i can do to reduce storage space. I want to keep the metric as a list of doubles for now, but i can probably save space on the other fields

@pyr
Copy link

pyr commented Dec 16, 2013

as for the rep factor, i'd go for rep-factor 3, with one distant copy

@deniszh
Copy link
Author

deniszh commented Dec 17, 2013

Yep, indeed, replication factor must be 3, even scarier... maybe we can replace Cassandra with InfluxDB? Clojure module is also present... Not sure about space on InfluxDB, and its in early stage of maturity anyway...

@punnie
Copy link

punnie commented Aug 24, 2014

Sorry for digging up this old thing, but I've got to ask @deniszh: how long did this script take to run for your TB of whisper data? I've been running it for our measly 300GB of data for about 12 hours now, using 8 parallel jobs and it seems like it will take literally forever.

I know I'm just being lazy now, but is there a more updated version of this? ;)

@deniszh
Copy link
Author

deniszh commented Sep 5, 2014

Hi @punnie,
never tried to migrate our main storage though because of space concerns. But, even for linear scalability, if migration of 1281 file took ~ 1281 sec (very loose approximation) in 16 threads, then migration of full storage of 160K metrics will took 160000 sec = 44 hours.
I do not think that you will be able to squeeze more speed from python version, and I'm loose at clojure - maybe @pir will be able to provide clojure version of this script with built in concurrency.

@deniszh
Copy link
Author

deniszh commented Sep 5, 2014

@punnie
But btw - speed is not required for migration. You can spit metrics stream in two with relay, one stream will go to your old installation - new will go to cyanite. After that you can copy wsp files and migrate them, even if it will took a week - you will get all data in cyanite and then remove old installation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment