Skip to content

Instantly share code, notes, and snippets.

@sureshsaggar
Created May 1, 2013 03:40
Show Gist options
  • Save sureshsaggar/5493610 to your computer and use it in GitHub Desktop.
Save sureshsaggar/5493610 to your computer and use it in GitHub Desktop.
Forecasting in R: Developing Python API to analyze time series data in Redis
'''
Description: dashboard/* primarily powers the APIs requires to support the growth dashboards.
@author: sureshsaggar
'''
#!/usr/bin/env python
from httpserver import *
from werkzeug.routing import *
import time
import redis
import werkzeug
import datetime
from datetime import datetime, date, timedelta
from rpy2.robjects import IntVector
from rpy2.robjects.packages import importr
import redis
import rpy2.robjects as robjects
APISERVER_PORT = 6600
REDIS_HOST = "localhost"
REDIS_PORT = 6379
"""
An object of this class is initialized as part of the DashboardAPIServer application
object and passed to the respective API handler depending on the route (as defined by
the map).
"""
class DbConnections():
def __init__(self, stats_cache):
self.stats_cache = stats_cache
self.redis_host = REDIS_HOST
self.redis_port = REDIS_PORT
"""
Instantiated inside the main handler and primarily is the server process listening
for API requests.
"""
class DashboardAPIServer(object):
def __init__(self, dbconnections=None):
if dbconnections is None:
stats_cache = StatsCache()
dbconnections = DbConnections(stats_cache=stats_cache)
''' Use the API routes to serve incoming requests. '''
self.url_map = APIRoutes.getRoutesMap(dbconnections)
def dispatch_request(self, request):
adapter = self.url_map.bind_to_environ(request.environ)
try:
endpoint, values = adapter.match()
method = request.method.lower()
endpoint.request = request
if method not in ("put", "post", "patch", "delete", "get"):
raise MethodNotAllowed("Invalid method: %s" % method)
handler = getattr(endpoint, method)
request.parse_form_data = endpoint.parse_form_data
response = handler(**values)
return response
except HTTPException, e:
return e
def __call__(self, environ, start_response):
request = GuessRequest(environ)
response = self.dispatch_request(request)
return response(environ, start_response)
"""
Abstraction of any API handler cache providing support for functions
like get, put etc. which are currently baked by redis.
"""
class StatsCache():
def __init__(self):
self.growth_redis = redis.Redis(REDIS_HOST, REDIS_PORT)
def put(self, stat, key, value):
return self.growth_redis.hset(stat, key, value)
def get(self, stat, key):
return self.growth_redis.hget(stat, key)
def getall(self, stat, keys):
return self.growth_redis.hmget(stat, keys)
"""
Module meant to forecast data using R for the currently supported routes.
"""
class RPY():
def __init__(self, dbconnections):
self.redis = redis.Redis(dbconnections.redis_host, dbconnections.redis_port)
"""
Forecasts global visitor count for a period of N days. Uses linear regression.
"""
def run_linear_regression(self, future_ts_list):
duration = time.time()
statistic = 'linearregression'
ts_val_dict = self.redis.hgetall(statistic)
activations = []
timestamps = []
for key in ts_val_dict.keys():
timestamps.append(key)
activations.append(ts_val_dict[key])
# Create R vectors
ct_timestamps = IntVector(timestamps)
ct_activations = IntVector(activations)
# Set Independent & Dependent variables
stats = importr('stats')
robjects.globalenv["timestamps"] = ct_timestamps
robjects.globalenv["activations"] = ct_activations
# Add R formula, run lm, & find intercept & slope
base = importr('base')
lm_D9 = stats.lm("activations ~ timestamps")
summary = base.summary(lm_D9)
intercept = summary[3][0]
slope = summary[3][1]
def solveForY(m, x, c):
return m*x+c
# Find predictions for future_ts_list
predictions = dict()
for f_date in future_ts_list:
f_date = str(f_date).split('.')[0]
f_ts = int(time.mktime(datetime.strptime(f_date, "%Y-%m-%d %H:%M:%S").timetuple()))
predictions[f_date.split(' ')[0]] = long(solveForY(slope, f_ts, intercept))
return dict(points=len(ts_val_dict), d=time.time()-duration, m=slope, c=intercept, predictions=predictions)
class APIRoutes:
@staticmethod
def getRoutesMap(dbconnections):
return Map([
Rule("/test", endpoint=APIHandler(dbconnections=dbconnections)),
Submount('/rpy', [
Rule("/linearregression", endpoint=RPYLinearRegressionAPIHandler(dbconnections=dbconnections))
]),
])
""" Base/ parent API handler supporting HTTP get & post calls. """
class APIHandler():
parse_form_data = True
def __init__(self, dbconnections):
self.dbconnections = dbconnections
def get(self):
return JSONResponse(stat="pass", method=self.request.method)
def post(self):
return JSONResponse(stat="pass", method=self.request.method)
""" -----------------------------------------------------------------------
/rpy/linearregression
----------------------------------------------------------------------- """
class RPYLinearRegressionAPIHandler(APIHandler):
""" curl -X GET http://0.0.0.0:6600/rpy/linearregression """
def get(self):
# Create a list of next 7 future dates
numdays = 7
base = datetime.today()
future_ts_list = [ base + timedelta(days=x) for x in range(0,numdays) ]
return JSONResponse(stat="pass", statistic='linearregression', future=RPY(self.dbconnections).run_linear_regression(future_ts_list))
""" Entry point of this analytics api server. """
if __name__ == '__main__':
from werkzeug.serving import run_simple
application = DashboardAPIServer()
run_simple("0.0.0.0", APISERVER_PORT, application, use_debugger=True, use_reloader=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment