Skip to content

Instantly share code, notes, and snippets.

@ronfe
Created November 5, 2015 06:13
Show Gist options
  • Save ronfe/6ab36df37c77fd2a3d5c to your computer and use it in GitHub Desktop.
Save ronfe/6ab36df37c77fd2a3d5c to your computer and use it in GitHub Desktop.
__author__ = 'ronfe'
from pymongo import MongoClient
from bson.objectid import ObjectId
import datetime
import threading
remoteClient = MongoClient('mongodb://10.8.3.182:27017')
points = remoteClient['yangcong-prod25']['points']
startTime = datetime.datetime(2015,6,19)
endTime = datetime.datetime(2015,10,25)
curTimePointer = startTime
# set interval of 1 hour
timeRange = datetime.timedelta(hours=1)
totalAmount = 0
totalList = set([])
def assignTasks(returnList = False):
global curTimePointer, timeRange, totalList
curTimePointer = curTimePointer + timeRange
if curTimePointer < endTime:
return curTimePointer
else:
print "Done!"
print "total: " + str(totalAmount)
return 'done'
# def handleList(returnList):
# globals global totalList, totalAmount
# totalList.union(set(returnList))
# totalAmount = len(totalList)
# print 'totalAmount: ' + str(totalAmount)
def worker(workerNum, pipeLine, isList=False):
global totalAmount, deadWorkers, totalList, totalAmount
finishMarker = False
while not finishMarker:
start = assignTasks()
if start == 'done':
print 'No work, done'
finishMarker = True
else:
print "[Worker " + str(workerNum) + "] Start " + str(start)
end = start + timeRange
startId = ObjectId.from_datetime(start)
endId = ObjectId.from_datetime(end)
usedPipe = [{
"$match": {"_id": {"$gte": startId, "$lt": endId}}
}] + pipeLine
# results = len(list(points.aggregate(usedPipe)))
data = list(points.aggregate(usedPipe))
if isList == False:
results = len(data)
totalAmount += results
else:
if len(data) != 0:
results = data[0][isList]
else:
results = []
totalList = totalList.union(set(results))
totalAmount = len(totalList)
print 'totalAmount: ' + str(totalAmount)
# print "[Worder " + str(workerNum) + '] ' + str(start) + ": have " + str(results)
threads = []
pipeLine = [
{"$match": {"from": "ios"}},
{"$group": {"_id": None, "users": {"$addToSet": "$user"}}}
]
for i in range(50):
t = threading.Thread(target=worker, args=[i, pipeLine, "users", ])
threads.append(t)
t.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment