Created
April 30, 2018 12:20
-
-
Save Sklavit/747e292fc17f6c9b400470006ff1c567 to your computer and use it in GitHub Desktop.
Mongo Dask Bag
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import dask | |
class MongoDaskBag: | |
def __init__(db_name, collection_name): | |
self.db_name =self.db_name | |
self.collection_name = collection_name | |
def bag(self, partition_size: int = 1000, partitions_num: int = None): | |
with pymongo.MongoClient() as mongo_client: | |
collection = mongo_client[self.db_name][self.collection_name] | |
local_collection = collection.find({'_id': 1}) | |
items = list(local_collection) | |
all_ids = [x['_id'] for x in items] | |
size = len(all_ids) | |
partition_size, partitions_num = adjust_partition_size(size, partitions_num, partition_size) | |
start_indexes = list(range(0, size, partition_size)) | |
start_ids = [all_ids[i] for i in start_indexes] | |
end_ids = [all_ids[i - 1] for i in start_indexes[1:]] + [all_ids[-1]] | |
partitions_requests = list(zip(start_ids, end_ids)) | |
logging.info(f"Data partitioned: {partition_size}x{len(partitions_requests)}") | |
b = (dask.bag.from_sequence(partitions_requests) | |
.map(self.read_datetime_interval_from_collection) | |
.flatten()) | |
return b | |
def read_datetime_interval_from_collection(self, args): | |
start_ts, end_ts = args | |
with pymongo.MongoClient() as mongo_client: | |
collection = mongo_client[self.db_name][self.collection_name] | |
items = list(collection.find({'ts': {'$gte': start_ts, '$lte': end_ts}})) | |
return items |
In signature stated, that partitions_num
and partition_size
are optional, but we need real values for both of them.
So this function computes partitions_num
and partition_size
if one of them is omitted.
Sad to say, I have no possibility to restore this function code.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi what is the adjust_partition_size in the code ?