Skip to content

Instantly share code, notes, and snippets.

@ntpz
Created November 28, 2017 21:33
Show Gist options
  • Save ntpz/f5c8e7629c131cdc7946709136efe163 to your computer and use it in GitHub Desktop.
Save ntpz/f5c8e7629c131cdc7946709136efe163 to your computer and use it in GitHub Desktop.
Usage stats extractor
#!/usr/bin/env python
"""
extract usage stats
requirements:
- peewee
- requests
"""
import os
from calendar import monthrange
import datetime
import json
import sys
import time
import requests
from peewee import (SqliteDatabase, Model, PrimaryKeyField, DateTimeField,
CharField, SmallIntegerField, FloatField)
HOST = os.environ.get('RSTAT_HOST')
DEFAULT_HEADERS = {
'User-Agent': 'Mozilla/5.0 Chrome/55.0.2883.87'
}
OVER18COOKIE = {
"name": 'plus18',
"value": '1',
"domain": '.' + HOST,
"path": '/',
}
LOGIN_PARAMS = {
'remember': '1',
'logon': 'mainform'
}
db = SqliteDatabase('rstat.sqlite3') # :memory:
class StatsRow(Model):
class Meta:
database = db
db_table = 'stats'
id = PrimaryKeyField()
# required fields
dt_start = DateTimeField()
status_code = SmallIntegerField()
status_title = CharField()
delta = FloatField() # positive for top-ups, negative for spendings
# optional fields
model_name = CharField(null=True)
dt_end = DateTimeField(null=True)
duration = FloatField(null=True)
price = SmallIntegerField(null=True)
interval_line = CharField(null=True)
discount_line = CharField(null=True)
def main(username, password):
sess = make_session()
log_in(sess, username, password)
if StatsRow.table_exists():
StatsRow.truncate_table()
else:
StatsRow.create_table()
import_stats(sess, 120)
# create report
def import_stats(sess, depth_months):
for year, month in makeranges(datetime.datetime.now(), depth_months):
dt_start = datetime.date(year, month, 1)
dt_end = datetime.date(year, month, monthrange(year, month)[1])
blk = statsblock(sess, dt_start, dt_end)
rows = [makefields(line) for line in blk]
# StatsRow.insert_many(rows).execute()
for row in rows:
StatsRow.create(**row)
print("{} - {} - {} rows".format(dt_start, dt_end, len(rows)))
time.sleep(.5)
def makeranges(now, depth_months):
cy, cm = now.year, now.month
while depth_months:
yield cy, cm
depth_months -= 1
cm -= 1
if cm == 0:
cm = 12
cy -= 1
def makefields(line):
dt_start, dt_end = parse_interval(line['interval'], line['date'])
fields = {
'dt_start': dt_start,
'status_code': int(line['action_code']),
'status_title': line['action'],
'delta': float(line['total_kr']),
'model_name': line['model'] or None,
'dt_end': dt_end,
'duration': parse_duration(line['full_time']),
'price': int(line['kr_min']) if line['kr_min'] else None,
'interval_line': line['interval'] or None,
'discount_line': line['discont'] or None
}
if not line['income']:
fields['delta'] = fields['delta'] * -1
return fields
def parse_duration(dur_str):
try:
h, m, s = dur_str.split(':')
delta = datetime.timedelta(0, int(s), 0, 0, int(m), int(h))
except ValueError:
return None
return delta.total_seconds()
def parse_interval(interval_str, date_str):
try:
str_start, str_end = interval_str.split('<br>')
start = datetime.datetime.strptime(str_start, '%d.%m.%Y %H:%M:%S')
end = datetime.datetime.strptime(str_end, '%d.%m.%Y %H:%M:%S')
except ValueError: # failed to split or parse date
start = datetime.datetime.strptime(date_str, '%d.%m.%Y')
end = None
assert start, 'Invalid interval {}/{}'.format(interval_str, date_str)
return start, end
def make_session():
sess = requests.Session()
sess.headers.update(DEFAULT_HEADERS)
sess.cookies.set(**OVER18COOKIE)
sess.get('http://{}/'.format(HOST)) # create server-side session
return sess
def log_in(sess, username, password):
params = LOGIN_PARAMS.copy()
params.update({'username': username, 'password': password})
resp = sess.post('http://{}/login/'.format(HOST), data=params)
resp.raise_for_status()
try:
assert sess.cookies.get('uname') == username, 'Login failed'
except AssertionError:
with open('response.html', 'w') as f:
f.write(resp.text)
raise
def statsblock(sess, start, end):
data = {
'from': start.strftime('%d.%m.%Y'),
'to': end.strftime('%d.%m.%Y')
}
resp = sess.post('https://cabinet.{}/api/statistics'.format(HOST), data=data)
return json.loads(resp.text)
if __name__ == '__main__':
if len(sys.argv) == 3:
_, username, password = sys.argv
main(username, password)
else:
print("Usage: {} login password".format(sys.argv[0]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment