Created
November 21, 2012 08:05
-
-
Save wangzaixiang/4123733 to your computer and use it in GitHub Desktop.
ForkJoin/MapReduce Demo Code
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 | |
''' | |
从数据源同步获取Channel在线的数据,保存到mongodb中 | |
这个脚本一般以crontab方式运行,每30秒运行一次 | |
@author: [email protected] | |
''' | |
from app_logger import Logger, set_logging_path | |
from db import yy_mysql | |
import threading | |
import time | |
import pymongo | |
from forkjoin import ForkJoin | |
from map_reduce import map_reduce | |
from timeit import itertools | |
#todo change to Configuration | |
db_session_list = [ | |
{"host":"1.2.3.4:3306", "db":"appDaemon", | |
"user":"user", "password":"password" }, | |
{"host":"2.3.4.5:3306", "db":"appDaemon", | |
"user":"user", "password":"password" } | |
] | |
def _query(name, sql, host, database, user, password): | |
def totuple(row): | |
ts = int(time.mktime( row["sync_date"].timetuple())) | |
ts = ts - (ts % 60) | |
users = int(row["users"]) | |
sessions = int(row["sessions"]) | |
return (ts, users, sessions) | |
for i in range(0,3): # retry 3 times | |
try: | |
conn = yy_mysql.Connection(host, database, user, password, connect_timeout=10) | |
rows = conn.query(sql) | |
tuples = map(totuple, rows) | |
Logger().debug("%s read data ok: %s", name, tuples) | |
return tuples | |
break | |
except: | |
Logger().exception("query database failed") | |
def __query_datasource(): | |
sql = """ | |
select sync_date, sum(totalUser) as users, count(totalUser) as sessions | |
from tbl_session_his | |
where sync_date >= ( | |
select date_add( (select max(sync_date) from tbl_session_his), interval -5 minute) | |
) | |
group by sync_date | |
order by sync_date | |
""" | |
forkjoin = ForkJoin() | |
for db in db_session_list: | |
forkjoin.fork(target=_query, kwargs=dict( | |
name=db["host"], sql=sql, | |
host=db["host"], database=db["db"], user=db["user"], password=db["password"] | |
) | |
) | |
results = forkjoin.join(30) | |
Logger().info("query databases, result: %d", len(results) ) | |
return results; | |
def __save(mongodb, records): | |
records = sorted(records, cmp = lambda r1,r2: r1[0] - r2[0]) | |
for (ts, users, sessions) in records: | |
Logger().info("save channel: %d, %d, %d", ts, users, sessions) | |
try: | |
mongodb.online2["sum_channel_user"].update( | |
{'_id':ts, "val": {"$lt": users } }, | |
{"$set": {"val": users, "sessions": sessions} }, | |
upsert=True, safe=True) | |
except pymongo.errors.DuplicateKeyError: # 已经存在,且无需更新 | |
pass | |
except: | |
Logger().exception("save channel: %d, %d, %d failed", ts, users, sessions) | |
""" | |
删除最近1分钟的数据, 频道数据比较准时,YY一般需要等待超过1分钟 | |
""" | |
def __removeLatest(records): | |
now = time.time() | |
return [ rec for rec in records if now - rec[0] >= 60 ] | |
def channelonline_sync(): | |
Logger().info("ChannelOnline Data Sync Begin...") | |
mongodb = pymongo.Connection("127.0.0.1", 27017) | |
results = __query_datasource() | |
"""[ (time, sessionCount, totalUsers) .. ] | |
""" | |
merged = map_reduce(itertools.chain(*results), | |
index = lambda row:(row[0],row), | |
mapper = lambda key, row:[(key,row)], | |
reducer = lambda key, rows: (key, max([row[1] for row in rows]), max([row[2] for row in rows])) | |
) | |
result = __removeLatest(merged.values()) | |
__save(mongodb, result) | |
Logger().info("ChannelOnline Data Completed\n") | |
if __name__ == "__main__": | |
set_logging_path("logs/channelonline_sync.log") | |
channelonline_sync() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 | |
''' | |
提供模拟的fork join编程模式 | |
@author: [email protected] | |
''' | |
import threading | |
import time | |
import itertools | |
class ForkJoin: | |
def __init__(self): | |
self.condition = threading.Condition() | |
self.condition.acquire(); | |
self.threads = [] | |
self.results = [] | |
pass | |
def fork(self, target, args=(), kwargs=None): | |
def run(): | |
self.threads.append(threading.currentThread()) | |
ret = target(*args, **kwargs) | |
if ret != None: | |
self.results.append(ret) | |
self.threads.remove(threading.currentThread()) | |
self.condition.acquire() | |
self.condition.notify() | |
self.condition.release() | |
if kwargs == None: kwargs = {} | |
thread = threading.Thread(target = run) | |
thread.start() | |
def join(self, seconds): | |
begin = time.time() | |
end = begin + seconds | |
while len(self.threads) > 0: | |
remain = end - time.time() | |
if remain <= 0: | |
break | |
self.condition.wait(remain) | |
self.condition.release() | |
return self.results | |
if __name__ == "__main__": | |
# Pie = sum( (-1)^n/(2*n+1) ) | |
def task(begin, end): | |
return sum([ float(pow(-1, n)) / (2*n+1) for n in range(begin, end) ]) | |
begin = time.time() | |
fork_join = ForkJoin() | |
for i in range(0, 5): | |
loop = 1000000 | |
fork_join.fork(task, args=(i*loop,i*loop+loop)) | |
results = fork_join.join(30) | |
end = time.time() | |
print "results = %s, pie=%f time=%d" % (results, 4*sum(itertools.chain(results)), end-begin) | |
pass |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 | |
''' | |
简单的map reduce框架 | |
@author: [email protected] | |
''' | |
def map_reduce(rows, index, mapper, reducer): | |
""" | |
rows: [ row0, row1, .. ] | |
index: lambda row: (k1,v1) | |
mapper: lambda k,v: [(k1, v2), (k2, v2)] | |
reducer: lambda k, vals: val | |
""" | |
group = {} | |
# group by key | |
for row in rows: | |
(_key, _value) = index(row) | |
for (k,v) in mapper(_key,_value): | |
if k in group: | |
group[k].append(v) | |
else: | |
group[k] = [v] | |
reduced = {} | |
for (key,vals) in group.items(): | |
reduced[key] = reducer(key, vals) | |
return reduced | |
if __name__ == "__main__": | |
array = [ {"_id":1, "value":100}, | |
{"_id":2, "value":200}, | |
{"_id":3, "value":300}, | |
{"_id":1, "value":103}, | |
{"_id":2, "value":195}, | |
{"_id":3, "value":320}, | |
] | |
results = map_reduce( | |
array, | |
index = lambda r: (r["_id"],r), | |
mapper = lambda k,v: [(k,v)], | |
reducer = lambda k,vals: max(vals, key=lambda val: val["value"]) | |
) | |
print results |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment