-
-
Save tahajahangir/8059155 to your computer and use it in GitHub Desktop.
The file is a util for dumping mongo commands/queries. Can be used in any python environment, and included a django middleware.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import struct | |
import logging | |
from pymongo import MongoClient | |
import bson | |
from bson.errors import InvalidBSON | |
from pymongo.mongo_replica_set_client import MongoReplicaSetClient | |
logger = logging.getLogger('mongo_dump') | |
# noinspection PyProtectedMember | |
class MongoDumper(object): | |
""" | |
@see: https://gist.github.com/8059155 | |
""" | |
def __init__(self, cls): | |
self.cls = cls | |
def install(self): | |
self._used_msg_ids = [] | |
# save old methods | |
orig_simple_command = [(k, v) for k, v in self.cls.__dict__.iteritems() if k.endswith('__simple_command')] | |
self.orig_simple_command_func_name = orig_simple_command[0][0] | |
self.orig_simple_command = orig_simple_command[0][1] | |
self.orig_send_message = self.cls._send_message | |
self.orig_send_message_with_response = self.cls._send_message_with_response | |
# instrument methods to record messages | |
self.cls._send_message = self._instrument(self.cls._send_message) | |
self.cls._send_message_with_response = self._instrument(self.cls._send_message_with_response) | |
setattr(self.cls, self.orig_simple_command_func_name, | |
lambda *args, **kwargs: self._simple_command(*args, **kwargs)) | |
def uninstall(self): | |
# remove instrumentation from pymongo | |
self.cls._send_message = self.orig_send_message | |
self.cls._send_message_with_response = self.orig_send_message_with_response | |
def _simple_command(self, obj, sock_info, dbname, spec): | |
response, time = self.orig_simple_command(obj, sock_info, dbname, spec) | |
msg_text = 'command on {}.$cmd {}, result: {} on {} ms'.format(dbname, spec, response, time * 1000) | |
logger.debug(msg_text) | |
return response, time | |
def _instrument(self, original_method): | |
def instrumented_method(*args, **kwargs): | |
message = _mongodb_decode_wire_protocol(args[1][1]) | |
if message['msg_id'] in self._used_msg_ids: | |
return original_method(*args, **kwargs) | |
self._used_msg_ids.append(message['msg_id']) | |
result = original_method(*args, **kwargs) | |
msg_text = '{op} {collection}: {query} [skip:{skip} limit:{limit} id:{msg_id}]'.format(**message) | |
logger.debug(msg_text) | |
return result | |
return instrumented_method | |
MONGO_OPS = { | |
2001: 'msg', | |
2002: 'insert', | |
2003: 'reserved', | |
2004: 'query', | |
2005: 'get_more', | |
2006: 'delete', | |
2007: 'kill_cursors', | |
} | |
def _mongodb_decode_wire_protocol(message): | |
""" http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol """ | |
_, msg_id, _, opcode, _ = struct.unpack('<iiiii', message[:20]) | |
op = MONGO_OPS.get(opcode, 'unknown') | |
zidx = 20 | |
collection_name_size = message[zidx:].find('\0') | |
collection_name = message[zidx:zidx + collection_name_size] | |
zidx += collection_name_size + 1 | |
skip, limit = struct.unpack('<ii', message[zidx:zidx + 8]) | |
zidx += 8 | |
try: | |
msg = bson.decode_all(message[zidx:]) | |
except InvalidBSON: | |
msg = 'invalid bson' | |
return {'op': op, 'collection': collection_name, | |
'msg_id': msg_id, 'skip': skip, 'limit': limit, | |
'query': msg} | |
def install_dumper(): | |
""" Handy helper for installing dumper """ | |
if not logging.getLogger().handlers: # if not logging already configed | |
logging.basicConfig(level=logging.DEBUG) | |
MongoDumper(MongoClient).install() | |
MongoDumper(MongoReplicaSetClient).install() | |
class MongoDumpMiddleware(object): | |
def __init__(self): | |
from django.conf import settings | |
from django.core.exceptions import MiddlewareNotUsed | |
if not settings.DEBUG: | |
raise MiddlewareNotUsed() | |
self.dumpers = [MongoDumper(MongoClient), MongoDumper(MongoReplicaSetClient)] | |
def process_request(self, request): | |
for dumper in self.dumpers: | |
dumper.install() | |
def process_response(self, request, response): | |
for dumper in self.dumpers: | |
dumper.uninstall() | |
return response |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment