Last active
August 29, 2015 14:04
-
-
Save sconnelley/706be8a72ec0f5136af9 to your computer and use it in GitHub Desktop.
Python parser for maps.stamen.com log files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Parses maps.stamen.com log files | |
# 99% of the code comes from http://pymotw.com/2/multiprocessing/mapreduce.html | |
# Another useful article can be found here, http://effbot.org/zone/wide-finder.htm | |
# Per file processing time could be improved with threads I imagine | |
# roughly 2k-3k files totaling ~ 150mb takes about 30min | |
""" | |
Usage: | |
1. Download log files from s3 into local dated directories with your weapon of choice. | |
I used < s3cmd get --skip-existing s3://aws.stamen.com/logs/fastly/tile.stamen.com/2014-08*.gz > | |
2. Run python mapreduce.py < directory name > | |
3. Go get a beer, results will be printed after files have been processed. | |
For a full month, roughly 30-40 minutes. | |
""" | |
import gzip, re, time, string, multiprocessing, math, json, simplejson, sys, tldextract | |
from os.path import realpath, dirname, join, exists | |
from multiprocessing_mapreduce import SimpleMapReduce | |
EMBED_PATTERN = re.compile(r"(http://maps.stamen.com/.+/embed)") | |
SPLIT_PATTERN = re.compile('<\d+>([0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z) .+ (\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) .+ GET (\/([^\/]+)\/[\w\/\.]+) (\d+) "(.+)" "(.+)"').split | |
def process_fn(filename): | |
# print multiprocessing.current_process().name, 'reading', filename | |
d = {} | |
try: | |
with gzip.open(filename, 'rt') as f: | |
for line in f: | |
r = SPLIT_PATTERN(line) | |
o = {} | |
# skip if not enough elements | |
if len(r) < 6: | |
continue | |
# skip if not a status | |
try: | |
status = int(r[5]) | |
except: | |
status = None | |
# only process lines with status = 200 | |
if not status or status != 200: | |
continue | |
ip = r[2] | |
style = r[4] | |
referrer = r[6] | |
# date = r[1] | |
# request = r[3] | |
# browser = r[6] | |
try: | |
parts = tldextract.extract(referrer) | |
domain = '.'.join(parts[:3]) | |
except: | |
domain = None | |
if not domain: | |
continue | |
if not domain in d: | |
d[domain] = { | |
'total' : 0, | |
'styles' : {}, | |
'ips' : {} | |
} | |
if not style in d[domain]['styles']: | |
d[domain]['styles'][style] = 0 | |
if not ip in d[domain]['ips']: | |
d[domain]['ips'][ip] = 0; | |
d[domain]['total'] += 1 | |
d[domain]['styles'][style] += 1 | |
d[domain]['ips'][ip] += 1 | |
except: | |
d = None | |
if d: | |
return [d] | |
else: | |
return [] | |
def reduce_fn(item): | |
""" Were not reducing here, but you could | |
""" | |
return item | |
if __name__ == '__main__': | |
import operator | |
import glob | |
directory = None | |
try: | |
directory = sys.argv[1] | |
except: | |
pass | |
if not directory: | |
print 'ERROR: No directory given.' | |
sys.exit(0) | |
#input_files = glob.glob('*.rst') | |
# 2014-06 | |
fpath = realpath('./' + directory) | |
filenames = glob.glob1(directory, '*.gz') | |
input_files = [] | |
for f in filenames: | |
input_files.append(join(fpath, f)) | |
print "Start to process %s files" % len(input_files) | |
start_time = int(time.time()) | |
mapper = SimpleMapReduce(process_fn, reduce_fn) | |
result = mapper(input_files) | |
count = {} | |
unique_ips = {} | |
unique_ip_count = 0 | |
totalReferrers = 0 | |
map_styles = {} | |
for item in result: | |
for key, value in item.items(): | |
if not key in count: | |
totalReferrers += 1 | |
count[key] = { | |
'total' : 0, | |
'styles' : {} | |
} | |
for style,styleCount in value['styles'].items(): | |
if not style in count[key]['styles']: | |
count[key]['styles'][style] = 0 | |
if not style in map_styles: | |
map_styles[style] = 0 | |
map_styles[style] += 1 | |
count[key]['styles'][style] += styleCount | |
for ip in value['ips']: | |
if not ip in unique_ips: | |
unique_ips[ip] = 1 | |
unique_ip_count += 1 | |
count[key]['total'] += value['total'] | |
end_time = int(time.time()) - start_time | |
# print out results | |
print "Results for %s" % directory | |
print "Processing took %s seconds" % end_time | |
print "totalReferrers: %s" % totalReferrers | |
print "Unique IP's: %s" % unique_ip_count | |
print json.dumps(map_styles, indent=4) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# 99% of code from http://pymotw.com/2/multiprocessing/mapreduce.html | |
import collections | |
import itertools | |
import multiprocessing | |
class SimpleMapReduce(object): | |
def __init__(self, map_func, reduce_func, num_workers=None): | |
""" | |
map_func | |
Function to map inputs to intermediate data. Takes as | |
argument one input value and returns a tuple with the key | |
and a value to be reduced. | |
reduce_func | |
Function to reduce partitioned version of intermediate data | |
to final output. Takes as argument a key as produced by | |
map_func and a sequence of the values associated with that | |
key. | |
num_workers | |
The number of workers to create in the pool. Defaults to the | |
number of CPUs available on the current host. | |
""" | |
self.map_func = map_func | |
self.reduce_func = reduce_func | |
self.pool = multiprocessing.Pool(num_workers) | |
def partition(self, mapped_values): | |
"""Organize the mapped values by their key. | |
Returns an unsorted sequence of tuples with a key and a sequence of values. | |
""" | |
return mapped_values | |
""" | |
partitioned_data = collections.defaultdict(list) | |
for key, value in mapped_values: | |
partitioned_data[key].append(value) | |
return partitioned_data.items() | |
""" | |
def __call__(self, inputs, chunksize=1): | |
"""Process the inputs through the map and reduce functions given. | |
inputs | |
An iterable containing the input data to be processed. | |
chunksize=1 | |
The portion of the input data to hand to each worker. This | |
can be used to tune performance during the mapping phase. | |
""" | |
map_responses = self.pool.map(self.map_func, inputs, chunksize=chunksize) | |
partitioned_data = self.partition(itertools.chain(*map_responses)) | |
reduced_values = self.pool.map(self.reduce_func, partitioned_data) | |
return reduced_values |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment