Created
February 12, 2016 21:44
-
-
Save marcotc/5123d0d9fe6fad6dea6a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#AUTHOR: MARK KHAITMAN | |
from datetime import datetime, timedelta | |
from urlparse import urlparse | |
from csv import reader | |
fields = ('timestamp','elb','ip','backend:port','req_pt','bpt', | |
'res_pt','elb_status','backend_status','rec_bytes','sent_bytes', | |
'req','ua','ssl_cipher','ssl_protocol') | |
INACTIVITY_WINDOW = 60 | |
def mapInput(iterator): | |
'''Attaching metadata to the log lines so it's cleaner to work with below''' | |
r = reader(iterator, delimiter=' ', quotechar='\"') | |
for row in r: | |
yield dict(zip(fields, row)) | |
def getSessions(iterator): | |
''' Key = time, value = URL | |
This function is meant for mapValues, where the iterator consists of | |
an iterable of time + URL pairs for an IP address RDD key. | |
We'll iterate through it, tracking the minTime, maxTime, and previous records' maxTime | |
as well as a set of distinct URLs visited during the session. | |
However, if the time elapsed between 2 consecutive page requests is greater than the INACTIVITY_WINDOW (minutes), | |
We end the previous session and yield back its session length and list of unique pages visited during that session | |
We then carry on, reset those params and assume a brand new session. This goes on until we've iterated through all page hits for that IP address | |
''' | |
minTime = maxTime = lastTime = None | |
uniquePages = set() | |
for k,v in iterator: | |
if lastTime and float(abs((k - lastTime)).seconds)/60 >= INACTIVITY_WINDOW: | |
yield (round(float((maxTime - minTime).seconds)/60, 6), list(uniquePages)) | |
minTime = maxTime = lastTime = None | |
uniquePages = set() | |
if minTime == None: | |
minTime = maxTime = k | |
if k < minTime: | |
minTime = k | |
elif k > maxTime: | |
maxTime = k | |
uniquePages.add(v) | |
lastTime = k | |
yield (round(float((maxTime - minTime).seconds)/60,6), list(uniquePages)) | |
# Here is where I create my RDD and point to the log location. For simplicity, I placed it in HDFS in the /tmp directory | |
# I'm running a dual-core machine, so with HT, I can parallelize the input into 4 and make use of all 4 cores. | |
# I put each of the 4 partitions through the mapInput function defined above, to attach metadata for the purpose of making my life easier :) | |
# I then map each record and only keep the client IP address, the datetime parsed from the timestamp string, as well as the path from the parsed URL | |
# since those are the only fields I care about | |
# I then SortByKey which sorts by both IP address and timestamps, groupBy the IP address to give me an iterable list of timestamps and URLs by IP address | |
# I'm then ready to perform the flatMapValues function (getSesssions) in order to sessionize and gather the required statistics | |
# Finally, I sort the result by descending order of session time, giving me the most engaged users from the beginning of my result to the end | |
rdd = sc.textFile("hdfs://sandbox.hortonworks.com/tmp/2015_07_22_mktplace_shop_web_log_sample.log",4) \ | |
.mapPartitions(mapInput) \ | |
.map(lambda l: (l.get('ip').split(':')[0], (datetime.strptime(l.get('timestamp'),"%Y-%m-%dT%H:%M:%S.%fZ"), urlparse(l.get('req')).path.split(' ')[1]))) \ | |
.cache() | |
res = rdd.sortByKey().groupByKey().flatMapValues(getSessions).sortBy(lambda x: x[1][0], ascending=False).collect() | |
# 100 Most Engaged Users (IP and session length): (Defined by time window) | |
for x in res[:100]: | |
x[0], x[1][0] | |
# Average session time | |
sessionCount = 0 | |
total = 0 | |
for x in res: | |
sessionCount += 1 | |
total += x[1][0] | |
print 'Average Session Time: %s Minutes' % (total / sessionCount) | |
#Average Session Time: 1.34913467626 Minutes for 15 minute inactivity window | |
#Average Session Time: 2.48392207583 Minutes for 30 minute inactivity window | |
#Average Session Time: 2.65802297988 Minutes for 60 minute inactivity window |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment