Created
September 10, 2016 03:03
-
-
Save dongweiming/c001b84407871aa96fe57890718d73c6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf-8 | |
from mapreduce import SimpleMapReduce | |
from simple import FILES, file_parser | |
def count_err_log(item): | |
word, occurances = item | |
return (word, sum(occurances)) | |
def map_wrapper(*args, **kwargs): | |
matches = file_parser(*args, **kwargs) | |
return [(match, 1) for match in matches] | |
def main(): | |
mapper = SimpleMapReduce(map_wrapper, count_err_log) | |
log_counts = mapper(FILES) | |
print sorted(log_counts, key=lambda x: x[1], reverse=True)[0] | |
if __name__ == '__main__': | |
import time | |
start = time.time() | |
main() | |
print 'COST: {}'.format(time.time() - start) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf-8 | |
import multiprocessing | |
from multiprocessing.queues import Empty | |
from simple import FILES, RESULT, file_parser | |
class Consumer(multiprocessing.Process): | |
def __init__(self, task_queue, result_queue): | |
multiprocessing.Process.__init__(self) | |
self.task_queue = task_queue | |
self.result_queue = result_queue | |
def run(self): | |
while 1: | |
next_task = self.task_queue.get() | |
if next_task is None: | |
self.task_queue.task_done() | |
break | |
matches = file_parser(next_task) | |
self.task_queue.task_done() | |
self.result_queue.put(matches) | |
def main(num_consumers): | |
tasks = multiprocessing.JoinableQueue() | |
results = multiprocessing.Queue() | |
consumers = [Consumer(tasks, results) | |
for i in xrange(num_consumers)] | |
for w in consumers: | |
w.start() | |
# 添加任务 | |
for filename in FILES: | |
tasks.put(filename) | |
for i in range(num_consumers): | |
tasks.put(None) | |
# 等待所有任务完成 | |
tasks.join() | |
while 1: | |
try: | |
matches = results.get(timeout=1) | |
except Empty: | |
break | |
for match in matches: | |
RESULT[match] += 1 | |
print sorted(RESULT.items(), key=lambda x: x[1], reverse=True)[0] | |
if __name__ == '__main__': | |
import time | |
start = time.time() | |
main(multiprocessing.cpu_count()) | |
print 'COST: {}'.format(time.time() - start) | |
start = time.time() | |
main(multiprocessing.cpu_count() * 2) | |
print 'COST: {}'.format(time.time() - start) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf-8 | |
import glob | |
from collections import defaultdict | |
FILES = glob.glob('/mfs/dae/logs/applog/dae_applog_movie/nori/*') | |
RESULT = defaultdict(int) | |
def file_parser(filename): | |
output = [] | |
with open(filename) as f: | |
for line in f: | |
words = line.split() | |
if len(words) > 6 and 'stderr_log' in words[5]: | |
output.append(words[6]) | |
return output | |
def main(): | |
for filename in FILES: | |
for match in file_parser(filename): | |
RESULT[match] += 1 | |
print sorted(RESULT.items(), key=lambda x: x[1], reverse=True)[0] | |
if __name__ == '__main__': | |
import time | |
start = time.time() | |
main() | |
print 'COST: {}'.format(time.time() - start) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
缺少注释