Skip to content

Instantly share code, notes, and snippets.

@snowleung
Created December 28, 2014 15:37
Show Gist options
  • Save snowleung/b7b8e226b3e2714555d5 to your computer and use it in GitHub Desktop.
Save snowleung/b7b8e226b3e2714555d5 to your computer and use it in GitHub Desktop.
test
#coding:utf-8
import logging
import json
from datetime import datetime, timedelta
import time
import random
USE_MONGO = True
if USE_MONGO:
import pymongo
MONGO_DB = 'analytics'
mongo_client = pymongo.MongoClient('localhost', 27017)
mongo_any = mongo_client[MONGO_DB]
logging.basicConfig(level=logging.DEBUG)
def mongo_daily(d):
mongo_any.daily.insert(d)
def mongo_hourly(d):
mongo_any.hourly.insert(d)
def drop_mongo():
mongo_any.daily.drop()
mongo_any.hourly.drop()
def generate_mongo_data(_macs=10, days=30):
macs = ['macc{mac}'.format(mac=i) for i in range(_macs)]
pk = 'asdffsad'
now = datetime.now()
now = now - timedelta(hours = now.hour)
now = now - timedelta(minutes = now.minute)
thirty_day = [now - timedelta(days=i) for i in range(days)]
for mac in macs:
print '.',
for day in thirty_day:
if 1 or random.random()*10 > 5:
for h in range(24):
current_datetime = day + timedelta(hours = h)
ts = time.mktime(current_datetime.timetuple())
mongo_hourly({'pk':pk, 'mac': mac,'timestamp':ts,
'year':day.year,
'month':day.month, 'day':day.day,'hour':h, 'second':12})
ts = time.mktime(day.timetuple())
mongo_daily({'pk':pk, 'mac': mac,'timestamp':ts,
'year':day.year,
'month':day.month, 'day':day.day})
import unittest
class DataTest(unittest.TestCase):
def setUp(self):
pass
def test_run_time(self):
d = 30
# drop_mongo()
# generate_mongo_data(1000, d) #
from devices_analytics import _analytic
f4 = _analytic(d, mongo_any.daily.find(), mongo_any.hourly.find())
def test_func(self):
now = datetime.now()
now = now - timedelta(hours = now.hour)
now = now - timedelta(minutes = now.minute)
drange = 2
_days = [now - timedelta(days=i) for i in range(drange)]
test_daily = []
test_hourly = []
def insert_hourly(mac, day, h, pk='asdf'):
current_datetime = day + timedelta(hours = h)
ts = time.mktime(current_datetime.timetuple())
test_hourly.append({'pk':pk, 'mac': mac,'timestamp':ts,
'year':day.year,
'month':day.month, 'day':day.day,'hour':h, 'second':12})
def insert_daily(mac, day, pk='asdf'):
current_datetime = day
ts = time.mktime(current_datetime.timetuple())
test_daily.append({'pk':pk, 'mac': mac,'timestamp':ts,
'year':day.year,
'month':day.month, 'day':day.day, 'second':12})
#macs = 2
insert_hourly(mac='mac001', day=_days[0], h=0)
insert_hourly(mac='mac001', day=_days[0], h=2)
insert_hourly(mac='mac002', day=_days[0], h=0)
insert_hourly(mac='mac002', day=_days[0], h=1)
insert_daily(mac='mac001', day=_days[0])
insert_daily(mac='mac002', day=_days[0])
# day end
insert_hourly(mac='mac001', day=_days[1], h=1)
insert_hourly(mac='mac001', day=_days[1], h=3)
insert_hourly(mac='mac002', day=_days[1], h=2)
insert_daily(mac='mac001', day=_days[1])
insert_daily(mac='mac002', day=_days[1])
# day end
from devices_analytics import _analytic
f4 = _analytic(drange, test_daily, test_hourly)
'''
f4 like this.
{0:0.5, 1:0.5, 2:0.5, 3:0.25, ...}
'''
if __name__ == '__main__':
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment