Skip to content

Instantly share code, notes, and snippets.

@dongweiming
Created September 10, 2016 03:03
Show Gist options
  • Save dongweiming/c001b84407871aa96fe57890718d73c6 to your computer and use it in GitHub Desktop.
Save dongweiming/c001b84407871aa96fe57890718d73c6 to your computer and use it in GitHub Desktop.
# coding=utf-8
from mapreduce import SimpleMapReduce
from simple import FILES, file_parser
def count_err_log(item):
word, occurances = item
return (word, sum(occurances))
def map_wrapper(*args, **kwargs):
matches = file_parser(*args, **kwargs)
return [(match, 1) for match in matches]
def main():
mapper = SimpleMapReduce(map_wrapper, count_err_log)
log_counts = mapper(FILES)
print sorted(log_counts, key=lambda x: x[1], reverse=True)[0]
if __name__ == '__main__':
import time
start = time.time()
main()
print 'COST: {}'.format(time.time() - start)
# coding=utf-8
import multiprocessing
from multiprocessing.queues import Empty
from simple import FILES, RESULT, file_parser
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
while 1:
next_task = self.task_queue.get()
if next_task is None:
self.task_queue.task_done()
break
matches = file_parser(next_task)
self.task_queue.task_done()
self.result_queue.put(matches)
def main(num_consumers):
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
consumers = [Consumer(tasks, results)
for i in xrange(num_consumers)]
for w in consumers:
w.start()
# 添加任务
for filename in FILES:
tasks.put(filename)
for i in range(num_consumers):
tasks.put(None)
# 等待所有任务完成
tasks.join()
while 1:
try:
matches = results.get(timeout=1)
except Empty:
break
for match in matches:
RESULT[match] += 1
print sorted(RESULT.items(), key=lambda x: x[1], reverse=True)[0]
if __name__ == '__main__':
import time
start = time.time()
main(multiprocessing.cpu_count())
print 'COST: {}'.format(time.time() - start)
start = time.time()
main(multiprocessing.cpu_count() * 2)
print 'COST: {}'.format(time.time() - start)
# coding=utf-8
import glob
from collections import defaultdict
FILES = glob.glob('/mfs/dae/logs/applog/dae_applog_movie/nori/*')
RESULT = defaultdict(int)
def file_parser(filename):
output = []
with open(filename) as f:
for line in f:
words = line.split()
if len(words) > 6 and 'stderr_log' in words[5]:
output.append(words[6])
return output
def main():
for filename in FILES:
for match in file_parser(filename):
RESULT[match] += 1
print sorted(RESULT.items(), key=lambda x: x[1], reverse=True)[0]
if __name__ == '__main__':
import time
start = time.time()
main()
print 'COST: {}'.format(time.time() - start)
@banjin
Copy link

banjin commented Oct 21, 2016

缺少注释

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment