Last active
January 24, 2020 23:14
-
-
Save ShaneHarvey/abb528c578d5f7f9b9080c77747251ab to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import time | |
import threading | |
from bson import SON | |
from pymongo import monitoring, MongoClient | |
from pymongo.server_selectors import writable_server_selector | |
class ServerLogger(monitoring.ServerListener): | |
def opened(self, event): | |
logging.info("Server {0.server_address} added to topology " | |
"{0.topology_id}".format(event)) | |
def description_changed(self, event): | |
previous_server_type = event.previous_description.server_type | |
new_server_type = event.new_description.server_type | |
if new_server_type != previous_server_type: | |
# server_type_name was added in PyMongo 3.4 | |
logging.info( | |
"Server {0.server_address} changed type from " | |
"{0.previous_description.server_type_name} to " | |
"{0.new_description.server_type_name}".format(event)) | |
def closed(self, event): | |
logging.warning("Server {0.server_address} removed from topology " | |
"{0.topology_id}".format(event)) | |
class HeartbeatLogger(monitoring.ServerHeartbeatListener): | |
def started(self, event): | |
logging.info("Heartbeat sent to server " | |
"{0.connection_id}".format(event)) | |
def succeeded(self, event): | |
# The reply.document attribute was added in PyMongo 3.4. | |
logging.info("Heartbeat to server {0.connection_id} " | |
"succeeded with reply " | |
"{0.reply.document}".format(event)) | |
def failed(self, event): | |
if hasattr(event.reply, 'details'): | |
resp = event.reply.details | |
extra = ": {0}".format(resp) | |
else: | |
extra = "" | |
logging.warning("Heartbeat to server {0.connection_id} " | |
"failed with error {0.reply}{1}".format(event, extra)) | |
class TopologyLogger(monitoring.TopologyListener): | |
def opened(self, event): | |
logging.info("Topology with id {0.topology_id} " | |
"opened".format(event)) | |
def description_changed(self, event): | |
logging.info("Topology description updated for " | |
"topology id {0.topology_id}".format(event)) | |
previous_topology_type = event.previous_description.topology_type | |
new_topology_type = event.new_description.topology_type | |
if new_topology_type != previous_topology_type: | |
# topology_type_name was added in PyMongo 3.4 | |
logging.info( | |
"Topology {0.topology_id} changed type from " | |
"{0.previous_description.topology_type_name} to " | |
"{0.new_description.topology_type_name}".format(event)) | |
# The has_writable_server and has_readable_server methods | |
# were added in PyMongo 3.4. | |
if not event.new_description.has_writable_server(): | |
logging.warning("No writable servers available.") | |
def closed(self, event): | |
logging.info("Topology with id {0.topology_id} " | |
"closed".format(event)) | |
class CommandLogger(monitoring.CommandListener): | |
def started(self, event): | |
logging.info("Command {0.command_name} with request id " | |
"{0.request_id} started on server " | |
"{0.connection_id}".format(event)) | |
def succeeded(self, event): | |
logging.info("Command {0.command_name} with request id " | |
"{0.request_id} on server {0.connection_id} " | |
"succeeded in {0.duration_micros} " | |
"microseconds".format(event)) | |
def failed(self, event): | |
logging.info("Command {0.command_name} with request id " | |
"{0.request_id} on server {0.connection_id} " | |
"failed in {0.duration_micros} " | |
"microseconds".format(event)) | |
LOGGING_FORMAT = '%(asctime)s [%(levelname)s] %(threadName)s:%(lineno)d - %(message)s' | |
def get_pool(client): | |
"""Get the standalone, primary, or mongos pool.""" | |
topology = client._get_topology() | |
server = topology.select_server(writable_server_selector) | |
return server.pool | |
def insert(client): | |
start = time.time() | |
client.test.test.insert_one({}) | |
time_taken = time.time() - start | |
logging.info('INSERT took: %s', time_taken) | |
def _fail_point(client, command_args): | |
cmd_on = SON([('configureFailPoint', 'failCommand')]) | |
cmd_on.update(command_args) | |
client.admin.command(cmd_on) | |
def fail_insert(client, network_error=False): | |
config = { | |
"configureFailPoint": "failCommand", | |
"mode": {"times": 1}, | |
"data": { | |
"failCommands": ["insert"], | |
"errorCode": 10107, # NotMaster | |
"closeConnection": network_error, | |
} | |
} | |
_fail_point(client, config) | |
def cause_failover_after(seconds): | |
with get_client() as client: | |
time.sleep(seconds) | |
logging.info('Causing failover via replSetStepDown\n') | |
client.admin.command('replSetStepDown', 10) | |
def get_client(**kwargs): | |
"""Return a client connected to the whole replica set""" | |
# Discover the replica set name. | |
with MongoClient() as client: | |
doc = client.admin.command('isMaster') | |
name = doc['setName'] | |
hosts = doc['hosts'] | |
return MongoClient(hosts, replicaSet=name, retryWrites=True, **kwargs) | |
def main(): | |
logging.basicConfig(format=LOGGING_FORMAT, level=logging.INFO) | |
listeners = [ServerLogger(), HeartbeatLogger(), TopologyLogger()] | |
client = get_client(event_listeners=listeners) | |
logging.info('Primary: %s', client.admin.command('isMaster')['me']) | |
insert(client) | |
# Cause a network error | |
logging.info('Causing a network error with failCommand\n') | |
fail_insert(client, network_error=True) | |
insert(client) | |
# Cause a NotMaster error | |
logging.info('Causing a NotMaster error with failCommand\n') | |
fail_insert(client, network_error=False) | |
insert(client) | |
# Run retryable writes for 10 seconds. | |
thread = threading.Thread(target=cause_failover_after, args=(3,)) | |
thread.start() | |
start = time.time() | |
while time.time() - start < 10: | |
insert(client) | |
time.sleep(0.05) | |
thread.join() | |
# for i in range(2): | |
# if i == 1: | |
# # Cause a network error. | |
# logging.info('Causing a network error') | |
# pool = get_pool(client) | |
# for sock in pool.sockets: | |
# sock.sock.close() | |
# insert(client) | |
# time.sleep(0.05) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment