Created
November 28, 2017 21:33
-
-
Save ntpz/f5c8e7629c131cdc7946709136efe163 to your computer and use it in GitHub Desktop.
Usage stats extractor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
extract usage stats | |
requirements: | |
- peewee | |
- requests | |
""" | |
import os | |
from calendar import monthrange | |
import datetime | |
import json | |
import sys | |
import time | |
import requests | |
from peewee import (SqliteDatabase, Model, PrimaryKeyField, DateTimeField, | |
CharField, SmallIntegerField, FloatField) | |
HOST = os.environ.get('RSTAT_HOST') | |
DEFAULT_HEADERS = { | |
'User-Agent': 'Mozilla/5.0 Chrome/55.0.2883.87' | |
} | |
OVER18COOKIE = { | |
"name": 'plus18', | |
"value": '1', | |
"domain": '.' + HOST, | |
"path": '/', | |
} | |
LOGIN_PARAMS = { | |
'remember': '1', | |
'logon': 'mainform' | |
} | |
db = SqliteDatabase('rstat.sqlite3') # :memory: | |
class StatsRow(Model): | |
class Meta: | |
database = db | |
db_table = 'stats' | |
id = PrimaryKeyField() | |
# required fields | |
dt_start = DateTimeField() | |
status_code = SmallIntegerField() | |
status_title = CharField() | |
delta = FloatField() # positive for top-ups, negative for spendings | |
# optional fields | |
model_name = CharField(null=True) | |
dt_end = DateTimeField(null=True) | |
duration = FloatField(null=True) | |
price = SmallIntegerField(null=True) | |
interval_line = CharField(null=True) | |
discount_line = CharField(null=True) | |
def main(username, password): | |
sess = make_session() | |
log_in(sess, username, password) | |
if StatsRow.table_exists(): | |
StatsRow.truncate_table() | |
else: | |
StatsRow.create_table() | |
import_stats(sess, 120) | |
# create report | |
def import_stats(sess, depth_months): | |
for year, month in makeranges(datetime.datetime.now(), depth_months): | |
dt_start = datetime.date(year, month, 1) | |
dt_end = datetime.date(year, month, monthrange(year, month)[1]) | |
blk = statsblock(sess, dt_start, dt_end) | |
rows = [makefields(line) for line in blk] | |
# StatsRow.insert_many(rows).execute() | |
for row in rows: | |
StatsRow.create(**row) | |
print("{} - {} - {} rows".format(dt_start, dt_end, len(rows))) | |
time.sleep(.5) | |
def makeranges(now, depth_months): | |
cy, cm = now.year, now.month | |
while depth_months: | |
yield cy, cm | |
depth_months -= 1 | |
cm -= 1 | |
if cm == 0: | |
cm = 12 | |
cy -= 1 | |
def makefields(line): | |
dt_start, dt_end = parse_interval(line['interval'], line['date']) | |
fields = { | |
'dt_start': dt_start, | |
'status_code': int(line['action_code']), | |
'status_title': line['action'], | |
'delta': float(line['total_kr']), | |
'model_name': line['model'] or None, | |
'dt_end': dt_end, | |
'duration': parse_duration(line['full_time']), | |
'price': int(line['kr_min']) if line['kr_min'] else None, | |
'interval_line': line['interval'] or None, | |
'discount_line': line['discont'] or None | |
} | |
if not line['income']: | |
fields['delta'] = fields['delta'] * -1 | |
return fields | |
def parse_duration(dur_str): | |
try: | |
h, m, s = dur_str.split(':') | |
delta = datetime.timedelta(0, int(s), 0, 0, int(m), int(h)) | |
except ValueError: | |
return None | |
return delta.total_seconds() | |
def parse_interval(interval_str, date_str): | |
try: | |
str_start, str_end = interval_str.split('<br>') | |
start = datetime.datetime.strptime(str_start, '%d.%m.%Y %H:%M:%S') | |
end = datetime.datetime.strptime(str_end, '%d.%m.%Y %H:%M:%S') | |
except ValueError: # failed to split or parse date | |
start = datetime.datetime.strptime(date_str, '%d.%m.%Y') | |
end = None | |
assert start, 'Invalid interval {}/{}'.format(interval_str, date_str) | |
return start, end | |
def make_session(): | |
sess = requests.Session() | |
sess.headers.update(DEFAULT_HEADERS) | |
sess.cookies.set(**OVER18COOKIE) | |
sess.get('http://{}/'.format(HOST)) # create server-side session | |
return sess | |
def log_in(sess, username, password): | |
params = LOGIN_PARAMS.copy() | |
params.update({'username': username, 'password': password}) | |
resp = sess.post('http://{}/login/'.format(HOST), data=params) | |
resp.raise_for_status() | |
try: | |
assert sess.cookies.get('uname') == username, 'Login failed' | |
except AssertionError: | |
with open('response.html', 'w') as f: | |
f.write(resp.text) | |
raise | |
def statsblock(sess, start, end): | |
data = { | |
'from': start.strftime('%d.%m.%Y'), | |
'to': end.strftime('%d.%m.%Y') | |
} | |
resp = sess.post('https://cabinet.{}/api/statistics'.format(HOST), data=data) | |
return json.loads(resp.text) | |
if __name__ == '__main__': | |
if len(sys.argv) == 3: | |
_, username, password = sys.argv | |
main(username, password) | |
else: | |
print("Usage: {} login password".format(sys.argv[0])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment