Last active
August 29, 2015 14:14
-
-
Save toni-moreno/520e991a7828c81be755 to your computer and use it in GitHub Desktop.
Whisper tools
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import os | |
import mmap | |
import struct | |
import time | |
import sys | |
import signal | |
import optparse | |
try: | |
import whisper | |
except ImportError: | |
raise SystemExit('[ERROR] Please make sure whisper is installed properly') | |
# Ignore SIGPIPE | |
try: | |
signal.signal(signal.SIGPIPE, signal.SIG_DFL) | |
except AttributeError: | |
#OS=windows | |
pass | |
now = int(time.time()) | |
option_parser = optparse.OptionParser( | |
usage='''%prog path timePerPoint:timeToStore [timePerPoint:timeToStore]* | |
timePerPoint and timeToStore specify lengths of time, for example: | |
60:1440 60 seconds per datapoint, 1440 datapoints = 1 day of retention | |
15m:8 15 minutes per datapoint, 8 datapoints = 2 hours of retention | |
1h:7d 1 hour per datapoint, 7 days of retention | |
12h:2y 12 hours per datapoint, 2 years of retention | |
''') | |
option_parser.add_option('--xFilesFactor', default=0.5, type='float') | |
option_parser.add_option('--aggregationMethod', default='average', | |
type='string', help="Function to use when aggregating values (%s)" % | |
', '.join(whisper.aggregationMethods)) | |
option_parser.add_option('--overwrite', default=False, action='store_true') | |
option_parser.add_option('--fillvalue', default=1.0, type='float') | |
(options, args) = option_parser.parse_args() | |
if len(args) < 2: | |
option_parser.print_help() | |
sys.exit(1) | |
path = args[0] | |
archives = [whisper.parseRetentionDef(retentionDef) | |
for retentionDef in args[1:]] | |
if os.path.exists(path) and options.overwrite: | |
print 'Overwriting existing file: %s' % path | |
os.unlink(path) | |
try: | |
whisper.create(path, archives, xFilesFactor=options.xFilesFactor, aggregationMethod=options.aggregationMethod) | |
except whisper.WhisperException, exc: | |
raise SystemExit('[ERROR] %s' % str(exc)) | |
size = os.stat(path).st_size | |
print 'Created: %s (%d bytes)' % (path,size) | |
info = whisper.info(path) | |
old_archives = info['archives'] | |
# sort by precision, lowest to highest | |
old_archives.sort(key=lambda a: a['secondsPerPoint'], reverse=True) | |
#creating a unit value from now-retention to now with | |
fromTime = now-old_archives[0]['retention'] | |
toTime = now | |
old_archives.sort(key=lambda a: a['secondsPerPoint'], reverse=False) | |
precision = old_archives[0]['secondsPerPoint'] | |
datapoints = [] | |
timedata=range(fromTime,toTime,precision) | |
datapoints = zip(timedata,[options.fillvalue] * len(timedata)) | |
whisper.update_many(path, datapoints) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import os | |
import mmap | |
import struct | |
import sys | |
import math | |
import bisect | |
import signal | |
import optparse | |
import traceback | |
try: | |
import whisper | |
except ImportError: | |
raise SystemExit('[ERROR] Please make sure whisper is installed properly') | |
# Ignore SIGPIPE | |
signal.signal(signal.SIGPIPE, signal.SIG_DFL) | |
option_parser = optparse.OptionParser( | |
usage='''%prog path | |
''') | |
option_parser.add_option( | |
'--xFilesFactor', default=None, | |
type='float', help="Change the xFilesFactor") | |
option_parser.add_option( | |
'--aggregationMethod', default=None, | |
type='string', help="Change the aggregation function (%s)" % | |
', '.join(whisper.aggregationMethods)) | |
option_parser.add_option( | |
'--force', default=False, action='store_true', | |
help="Perform a destructive change") | |
option_parser.add_option( | |
'--newfile', default=None, action='store', | |
help="Create a new database file without removing the existing one") | |
option_parser.add_option( | |
'--nobackup', action='store_true', | |
help='Delete the .bak file after successful execution') | |
(options, args) = option_parser.parse_args() | |
if len(args) < 1: | |
option_parser.print_help() | |
sys.exit(1) | |
path = args[0] | |
def mmap_file(filename): | |
fd = os.open(filename, os.O_RDONLY) | |
map = mmap.mmap(fd, os.fstat(fd).st_size, prot=mmap.PROT_READ) | |
os.close(fd) | |
return map | |
def read_header(map): | |
try: | |
(aggregationType,maxRetention,xFilesFactor,archiveCount) = struct.unpack(whisper.metadataFormat,map[:whisper.metadataSize]) | |
except: | |
raise whisper.CorruptWhisperFile("Unable to unpack header") | |
archives = [] | |
archiveOffset = whisper.metadataSize | |
for i in xrange(archiveCount): | |
try: | |
(offset, secondsPerPoint, points) = struct.unpack(whisper.archiveInfoFormat, map[archiveOffset:archiveOffset+whisper.archiveInfoSize]) | |
except: | |
raise whisper.CorruptWhisperFile("Unable to read archive %d metadata" % i) | |
archiveInfo = { | |
'offset' : offset, | |
'secondsPerPoint' : secondsPerPoint, | |
'points' : points, | |
'retention' : secondsPerPoint * points, | |
'size' : points * whisper.pointSize, | |
} | |
archives.append(archiveInfo) | |
archiveOffset += whisper.archiveInfoSize | |
header = { | |
'aggregationMethod' : whisper.aggregationTypeToMethod.get(aggregationType, 'average'), | |
'maxRetention' : maxRetention, | |
'xFilesFactor' : xFilesFactor, | |
'archives' : archives, | |
} | |
return header | |
def file_update_archive(fh,archive,points): | |
step = archive['secondsPerPoint'] | |
alignedPoints = [ (timestamp - (timestamp % step), value) | |
for (timestamp,value) in points ] | |
#Create a packed string for each contiguous sequence of points | |
packedStrings = [] | |
previousInterval = None | |
currentString = "" | |
lenAlignedPoints = len(alignedPoints) | |
for i in xrange(0,lenAlignedPoints): | |
#take last point in run of points with duplicate intervals | |
if i+1 < lenAlignedPoints and alignedPoints[i][0] == alignedPoints[i+1][0]: | |
continue | |
(interval,value) = alignedPoints[i] | |
if (not previousInterval) or (interval == previousInterval + step): | |
currentString += struct.pack(whisper.pointFormat,interval,value) | |
previousInterval = interval | |
else: | |
numberOfPoints = len(currentString) / whisper.pointSize | |
startInterval = previousInterval - (step * (numberOfPoints-1)) | |
packedStrings.append( (startInterval,currentString) ) | |
currentString = struct.pack(whisper.pointFormat,interval,value) | |
previousInterval = interval | |
if currentString: | |
numberOfPoints = len(currentString) / whisper.pointSize | |
startInterval = previousInterval - (step * (numberOfPoints-1)) | |
packedStrings.append( (startInterval,currentString) ) | |
#Read base point and determine where our writes will start | |
fh.seek(archive['offset']) | |
packedBasePoint = fh.read(whisper.pointSize) | |
(baseInterval,baseValue) = struct.unpack(whisper.pointFormat,packedBasePoint) | |
if baseInterval == 0: #This file's first update | |
baseInterval = packedStrings[0][0] #use our first string as the base, so we start at the start | |
#Write all of our packed strings in locations determined by the baseInterval | |
for (interval,packedString) in packedStrings: | |
timeDistance = interval - baseInterval | |
pointDistance = timeDistance / step | |
byteDistance = pointDistance * whisper.pointSize | |
myOffset = archive['offset'] + (byteDistance % archive['size']) | |
fh.seek(myOffset) | |
archiveEnd = archive['offset'] + archive['size'] | |
bytesBeyond = (myOffset + len(packedString)) - archiveEnd | |
if bytesBeyond > 0: | |
fh.write( packedString[:-bytesBeyond] ) | |
assert fh.tell() == archiveEnd, "archiveEnd=%d fh.tell=%d bytesBeyond=%d len(packedString)=%d" % (archiveEnd,fh.tell(),bytesBeyond,len(packedString)) | |
fh.seek( archive['offset'] ) | |
fh.write( packedString[-bytesBeyond:] ) #safe because it can't exceed the archive (retention checking logic above) | |
else: | |
fh.write(packedString) | |
if not os.path.exists(path): | |
sys.stderr.write("[ERROR] File '%s' does not exist!\n\n" % path) | |
option_parser.print_help() | |
sys.exit(1) | |
else: | |
map_orig = mmap_file(path) | |
header_orig = read_header(map_orig) | |
archives_orig = header_orig['archives'] | |
new_archives = [] | |
for ar in archives_orig: | |
new_archives.append((ar['secondsPerPoint'],ar['points'])) | |
# sort by precision, lowest to highest | |
archives_orig.sort(key=lambda a: a['secondsPerPoint'], reverse=True) | |
if options.xFilesFactor is None: | |
xff = header_orig['xFilesFactor'] | |
else: | |
xff = options.xFilesFactor | |
if options.aggregationMethod is None: | |
aggregationMethod = 'sum' | |
else: | |
aggregationMethod = options.aggregationMethod | |
print 'Retrieving all data from the archives' | |
if options.newfile is None: | |
tmpfile = path + '.tmp' | |
if os.path.exists(tmpfile): | |
print 'Removing previous temporary database file: %s' % tmpfile | |
os.unlink(tmpfile) | |
newfile = tmpfile | |
else: | |
newfile = options.newfile | |
print 'Creating new whisper database: %s' % newfile | |
whisper.create(newfile, new_archives, xFilesFactor=xff, aggregationMethod=aggregationMethod) | |
size = os.stat(newfile).st_size | |
print 'Created: %s (%d bytes)' % (newfile,size) | |
fh = open(newfile,'r+b') | |
print 'Converting data...' | |
for i,archive in enumerate(archives_orig): | |
print 'Transformating Archive %d data:' %i | |
offset = archive['offset'] | |
multiply = archive['secondsPerPoint'] | |
datapoints = [] | |
for point in xrange(archive['points']): | |
(timestamp, value) = struct.unpack(whisper.pointFormat, map_orig[offset:offset+whisper.pointSize]) | |
#print '%d: %d, %10.35g' % (point, timestamp, value) | |
offset += whisper.pointSize | |
if value is not None: | |
datapoints.append((timestamp, value*multiply)); | |
file_update_archive(fh,archive,datapoints) | |
if options.newfile is not None: | |
sys.exit(0) | |
backup = path + '.bak' | |
print 'Renaming old database to: %s' % backup | |
os.rename(path, backup) | |
try: | |
print 'Renaming new database to: %s' % path | |
os.rename(tmpfile, path) | |
except: | |
traceback.print_exc() | |
print '\nOperation failed, restoring backup' | |
os.rename(backup, path) | |
sys.exit(1) | |
if options.nobackup: | |
print "Unlinking backup: %s" % backup | |
os.unlink(backup) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment