Created
May 1, 2013 03:40
-
-
Save sureshsaggar/5493610 to your computer and use it in GitHub Desktop.
Forecasting in R: Developing Python API to analyze time series data in Redis
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Description: dashboard/* primarily powers the APIs requires to support the growth dashboards. | |
@author: sureshsaggar | |
''' | |
#!/usr/bin/env python | |
from httpserver import * | |
from werkzeug.routing import * | |
import time | |
import redis | |
import werkzeug | |
import datetime | |
from datetime import datetime, date, timedelta | |
from rpy2.robjects import IntVector | |
from rpy2.robjects.packages import importr | |
import redis | |
import rpy2.robjects as robjects | |
APISERVER_PORT = 6600 | |
REDIS_HOST = "localhost" | |
REDIS_PORT = 6379 | |
""" | |
An object of this class is initialized as part of the DashboardAPIServer application | |
object and passed to the respective API handler depending on the route (as defined by | |
the map). | |
""" | |
class DbConnections(): | |
def __init__(self, stats_cache): | |
self.stats_cache = stats_cache | |
self.redis_host = REDIS_HOST | |
self.redis_port = REDIS_PORT | |
""" | |
Instantiated inside the main handler and primarily is the server process listening | |
for API requests. | |
""" | |
class DashboardAPIServer(object): | |
def __init__(self, dbconnections=None): | |
if dbconnections is None: | |
stats_cache = StatsCache() | |
dbconnections = DbConnections(stats_cache=stats_cache) | |
''' Use the API routes to serve incoming requests. ''' | |
self.url_map = APIRoutes.getRoutesMap(dbconnections) | |
def dispatch_request(self, request): | |
adapter = self.url_map.bind_to_environ(request.environ) | |
try: | |
endpoint, values = adapter.match() | |
method = request.method.lower() | |
endpoint.request = request | |
if method not in ("put", "post", "patch", "delete", "get"): | |
raise MethodNotAllowed("Invalid method: %s" % method) | |
handler = getattr(endpoint, method) | |
request.parse_form_data = endpoint.parse_form_data | |
response = handler(**values) | |
return response | |
except HTTPException, e: | |
return e | |
def __call__(self, environ, start_response): | |
request = GuessRequest(environ) | |
response = self.dispatch_request(request) | |
return response(environ, start_response) | |
""" | |
Abstraction of any API handler cache providing support for functions | |
like get, put etc. which are currently baked by redis. | |
""" | |
class StatsCache(): | |
def __init__(self): | |
self.growth_redis = redis.Redis(REDIS_HOST, REDIS_PORT) | |
def put(self, stat, key, value): | |
return self.growth_redis.hset(stat, key, value) | |
def get(self, stat, key): | |
return self.growth_redis.hget(stat, key) | |
def getall(self, stat, keys): | |
return self.growth_redis.hmget(stat, keys) | |
""" | |
Module meant to forecast data using R for the currently supported routes. | |
""" | |
class RPY(): | |
def __init__(self, dbconnections): | |
self.redis = redis.Redis(dbconnections.redis_host, dbconnections.redis_port) | |
""" | |
Forecasts global visitor count for a period of N days. Uses linear regression. | |
""" | |
def run_linear_regression(self, future_ts_list): | |
duration = time.time() | |
statistic = 'linearregression' | |
ts_val_dict = self.redis.hgetall(statistic) | |
activations = [] | |
timestamps = [] | |
for key in ts_val_dict.keys(): | |
timestamps.append(key) | |
activations.append(ts_val_dict[key]) | |
# Create R vectors | |
ct_timestamps = IntVector(timestamps) | |
ct_activations = IntVector(activations) | |
# Set Independent & Dependent variables | |
stats = importr('stats') | |
robjects.globalenv["timestamps"] = ct_timestamps | |
robjects.globalenv["activations"] = ct_activations | |
# Add R formula, run lm, & find intercept & slope | |
base = importr('base') | |
lm_D9 = stats.lm("activations ~ timestamps") | |
summary = base.summary(lm_D9) | |
intercept = summary[3][0] | |
slope = summary[3][1] | |
def solveForY(m, x, c): | |
return m*x+c | |
# Find predictions for future_ts_list | |
predictions = dict() | |
for f_date in future_ts_list: | |
f_date = str(f_date).split('.')[0] | |
f_ts = int(time.mktime(datetime.strptime(f_date, "%Y-%m-%d %H:%M:%S").timetuple())) | |
predictions[f_date.split(' ')[0]] = long(solveForY(slope, f_ts, intercept)) | |
return dict(points=len(ts_val_dict), d=time.time()-duration, m=slope, c=intercept, predictions=predictions) | |
class APIRoutes: | |
@staticmethod | |
def getRoutesMap(dbconnections): | |
return Map([ | |
Rule("/test", endpoint=APIHandler(dbconnections=dbconnections)), | |
Submount('/rpy', [ | |
Rule("/linearregression", endpoint=RPYLinearRegressionAPIHandler(dbconnections=dbconnections)) | |
]), | |
]) | |
""" Base/ parent API handler supporting HTTP get & post calls. """ | |
class APIHandler(): | |
parse_form_data = True | |
def __init__(self, dbconnections): | |
self.dbconnections = dbconnections | |
def get(self): | |
return JSONResponse(stat="pass", method=self.request.method) | |
def post(self): | |
return JSONResponse(stat="pass", method=self.request.method) | |
""" ----------------------------------------------------------------------- | |
/rpy/linearregression | |
----------------------------------------------------------------------- """ | |
class RPYLinearRegressionAPIHandler(APIHandler): | |
""" curl -X GET http://0.0.0.0:6600/rpy/linearregression """ | |
def get(self): | |
# Create a list of next 7 future dates | |
numdays = 7 | |
base = datetime.today() | |
future_ts_list = [ base + timedelta(days=x) for x in range(0,numdays) ] | |
return JSONResponse(stat="pass", statistic='linearregression', future=RPY(self.dbconnections).run_linear_regression(future_ts_list)) | |
""" Entry point of this analytics api server. """ | |
if __name__ == '__main__': | |
from werkzeug.serving import run_simple | |
application = DashboardAPIServer() | |
run_simple("0.0.0.0", APISERVER_PORT, application, use_debugger=True, use_reloader=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment