Skip to content

Instantly share code, notes, and snippets.

@RainJayTsai
Created December 7, 2018 02:50
Show Gist options
  • Save RainJayTsai/b5d896fed35b3050daa218a2344367c1 to your computer and use it in GitHub Desktop.
Save RainJayTsai/b5d896fed35b3050daa218a2344367c1 to your computer and use it in GitHub Desktop.
fast dump mongo collection to memory, using ProcessPoolExecutor multiprocess
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import bson
from typing import Callable, Optional
import concurrent.futures as cf
from pymongo.collection import Collection
def decode(data: bytes, callback: Callable[[list], None]):
datas = bson.decode_all(data)
if callable(callback):
callback(datas)
def fast_find(coll: Collection,
size: int = 5000,
limit: int = 1000000,
executor: Optional[cf.ProcessPoolExecutor] = None,
callback: Callable[[list], None] = None):
if executor is None:
executor = cf.ProcessPoolExecutor()
with executor as executor:
futures = []
for ele in coll.find_raw_batches({}, batch_size=size):
future = executor.submit(decode, ele, callback)
futures.append(future)
if len(futures) * size >= limit:
for _ in cf.as_completed(futures):
pass
futures.clear()
if futures:
if len(futures) * size >= limit:
for _ in cf.as_completed(futures):
pass
futures.clear()
if __name__ == '__main__':
from pymongo import MongoClient
c = MongoClient('IP')
db = c['DB_NAME']
coll = db['COLLECTION_NAME']
executor = cf.ProcessPoolExecutor(max_workers=2)
fast_find(coll, executor=executor)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment