Last active
September 28, 2015 21:19
-
-
Save reidmorrison/1498297 to your computer and use it in GitHub Desktop.
Mongo Reconnect logic for Connection Failures. Deprecated, moved to: https://github.com/reidmorrison/mongo_ha
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# NOTE: This gist is now deprecated and has been moved into a gem: https://github.com/reidmorrison/mongo_ha | |
# | |
require 'mongo/collection' | |
module Mongo | |
class Collection | |
alias_method :find_one_original, :find_one | |
def find_one(*args) | |
@connection.retry_on_connection_failure do | |
find_one_original *args | |
end | |
end | |
alias_method :find_and_modify_original, :find_and_modify | |
def find_and_modify(*args) | |
@connection.retry_on_connection_failure do | |
find_and_modify_original *args | |
end | |
end | |
alias_method :aggregate_original, :aggregate | |
def aggregate(*args) | |
@connection.retry_on_connection_failure do | |
aggregate_original *args | |
end | |
end | |
alias_method :map_reduce_original, :map_reduce | |
def map_reduce(*args) | |
@connection.retry_on_connection_failure do | |
map_reduce_original *args | |
end | |
end | |
alias_method :group_original, :group | |
def group(*args) | |
@connection.retry_on_connection_failure do | |
group_original *args | |
end | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'mongo/collection_writer' | |
module Mongo | |
class CollectionOperationWriter | |
alias_method :send_write_operation_original, :send_write_operation | |
def send_write_operation(*args) | |
@connection.retry_on_connection_failure do | |
send_write_operation_original *args | |
end | |
end | |
alias_method :batch_message_send_original, :batch_message_send | |
def batch_message_send(*args) | |
@connection.retry_on_connection_failure do | |
batch_message_send_original *args | |
end | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'mongo' | |
# Add methods to Mongo Ruby driver to support retries on connection failure | |
# In the event of a connection failure, only one thread will attempt to re-establish | |
# connectivity to the Mongo server(s). This is to prevent swamping the mongo | |
# servers with reconnect attempts. | |
# | |
# Retries are initialy performed quickly in case it is brief network issue | |
# and then backs off to give the replica-set time to elect a new master | |
# | |
# Supports Ruby Mongo driver v1.x, as well as transparent support for MongoMapper | |
# | |
# Mongo Router processes will often return a connection failure on their side | |
# as an OperationFailure. This code will also retry automatically when the router | |
# has errors talking to a sharded cluster | |
module Mongo | |
class MongoClient | |
include SemanticLogger::Loggable | |
# The following options can be specified in the Mongo configuration options | |
# to tune the retry intervals during a connection failure | |
# | |
# :reconnect_attempts | |
# Number of times to attempt to reconnect. | |
# Default: 53 | |
# reconnect_attempts = 120 seconds / reconnect_max_retry_seconds + (number of retries before reaching reconnect_max_retry_seconds) | |
# | |
# :reconnect_retry_seconds | |
# Initial delay before retrying | |
# Default: 0.1 | |
# | |
# :reconnect_retry_multiplier | |
# Multiply delay by this number with each retry to prevent overwhelming the server | |
# Default: 2 | |
# | |
# :reconnect_max_retry_seconds | |
# Maximum number of seconds to wait before retrying again | |
# Default: 5 | |
# | |
# Using the default values, will result in retry connects at the following intervals | |
# 0.1 0.2 0.4 0.8 1.6 3.2 5 5 5 5 .... | |
CONNECTION_RETRY_OPTS = [:reconnect_attempts, :reconnect_retry_seconds, :reconnect_retry_multiplier, :reconnect_max_retry_seconds] | |
attr_accessor *CONNECTION_RETRY_OPTS | |
# The following errors occur when monogos cannot connect to the shard | |
# They require a retry to resolve them | |
# This list was created through painful experience. Add any new ones as they are discovered | |
# 9001: socket exception | |
# Operation failed with the following exception: Unknown error - Connection reset by peer:Unknown error - Connection reset by peer | |
# DBClientBase::findOne: transport error | |
# : db assertion failure | |
# 8002: 8002 all servers down! | |
# Operation failed with the following exception: stream closed | |
# Operation failed with the following exception: Bad file descriptor - Bad file descriptor:Bad file descriptor - Bad file descriptor | |
# Failed to connect to primary node. | |
# 10009: ReplicaSetMonitor no master found for set: mdbb | |
MONGOS_CONNECTION_ERRORS = [ | |
'socket exception', | |
'Connection reset by peer', | |
'transport error', | |
'db assertion failure', | |
'8002', | |
'stream closed', | |
'Bad file descriptor', | |
'Failed to connect', | |
'10009', | |
'no master found', | |
'not master', | |
'Timed out waiting on socket', | |
"didn't get writeback", | |
] | |
# Retry the supplied block when a Mongo::ConnectionFailure occurs | |
# | |
# Note: Check for Duplicate Key on inserts | |
# | |
# Returns the result of the block | |
# | |
# Example: | |
# connection.retry_on_connection_failure { |retried| connection.ping } | |
def retry_on_connection_failure(&block) | |
raise "Missing mandatory block parameter on call to Mongo::Connection#retry_on_connection_failure" unless block | |
retried = false | |
mongos_retries = 0 | |
begin | |
result = block.call(retried) | |
retried = false | |
result | |
rescue ConnectionFailure => exc | |
# Retry if reconnected, but only once to prevent an infinite loop | |
logger.warn "Connection Failure: '#{exc.message}' [#{exc.error_code}]" | |
if !retried && reconnect | |
retried = true | |
# TODO There has to be a way to flush the connection pool of all inactive connections | |
retry | |
end | |
raise exc | |
rescue OperationFailure => exc | |
# Workaround not master issue. Disconnect connection when we get a not master | |
# error message. Master checks for an exact match on "not master", whereas | |
# it sometimes gets: "not master and slaveok=false" | |
if exc.result | |
error = exc.result['err'] || exc.result['errmsg'] | |
close if error && error.include?("not master") | |
end | |
# These get returned when connected to a local mongos router when it in turn | |
# has connection failures talking to the remote shards. All we do is retry the same operation | |
# since it's connections to multiple remote shards may have failed. | |
# Disconnecting the current connection will not help since it is just to the mongos router | |
# First make sure it is connected to the mongos router | |
raise exc unless (MONGOS_CONNECTION_ERRORS.any? { |err| exc.message.include?(err) }) || (exc.message.strip == ':') | |
mongos_retries += 1 | |
if mongos_retries <= 60 | |
retried = true | |
Kernel.sleep(0.5) | |
logger.warn "[#{primary.inspect}] Router Connection Failure. Retry ##{mongos_retries}. Exc: '#{exc.message}' [#{exc.error_code}]" | |
# TODO There has to be a way to flush the connection pool of all inactive connections | |
retry | |
end | |
raise exc | |
end | |
end | |
# Call this method whenever a MongoDB::ConnectionFailure Exception | |
# has been raised to re-establish the connection | |
# | |
# This method is thread-safe and ensure that only one thread at a time | |
# per connection will attempt to re-establish the connection | |
# | |
# Returns whether the connection is connected again | |
def reconnect | |
logger.debug "Going to reconnect" | |
# Prevent other threads from invoking reconnect logic at the same time | |
@@failover_mutex.synchronize do | |
# Another thread may have already failed over the connection by the | |
# time this threads gets in | |
if active? | |
logger.info "Connected to: #{primary.inspect}" | |
return true | |
end | |
# Close all sockets that are not checked out so that other threads not | |
# currently waiting on Mongo, don't get bad connections and have to | |
# retry each one in turn | |
@primary_pool.close if @primary_pool | |
if reconnect_attempts > 0 | |
# Wait for other threads to finish working on their sockets | |
retries = 1 | |
retry_seconds = reconnect_retry_seconds | |
begin | |
logger.warn "Connection unavailable. Waiting: #{retry_seconds} seconds before retrying" | |
sleep retry_seconds | |
# Call original connect method since it is already within a retry block | |
connect_original | |
rescue ConnectionFailure => exc | |
if retries < reconnect_attempts | |
retries += 1 | |
retry_seconds *= reconnect_retry_multiplier | |
retry_seconds = reconnect_max_retry_seconds if retry_seconds > reconnect_max_retry_seconds | |
retry | |
end | |
logger.error "Auto-reconnect giving up after #{retries} reconnect attempts" | |
raise exc | |
end | |
logger.info "Successfully reconnected to: #{primary.inspect}" | |
end | |
connected? | |
end | |
end | |
# Replace built-in logger with SemanticLogger | |
def logger | |
super | |
end | |
# Wrap internal networking calls with retry logic | |
# Do not stub out :send_message_with_gle or :send_message | |
# It modifies the message, see CollectionWriter#send_write_operation | |
alias_method :receive_message_original, :receive_message | |
def receive_message(*args) | |
retry_on_connection_failure do | |
receive_message_original *args | |
end | |
end | |
alias_method :connect_original, :connect | |
def connect(*args) | |
retry_on_connection_failure do | |
connect_original *args | |
end | |
end | |
protected | |
# Prevent multiple threads from trying to reconnect at the same time during | |
# a connection failures | |
@@failover_mutex = Mutex.new | |
alias_method :valid_opts_original, :valid_opts | |
def valid_opts(*args) | |
valid_opts_original(*args) + CONNECTION_RETRY_OPTS | |
end | |
alias_method :setup_original, :setup | |
def setup(opts) | |
self.reconnect_attempts = (opts.delete(:reconnect_attempts) || 53).to_i | |
self.reconnect_retry_seconds = (opts.delete(:reconnect_retry_seconds) || 0.1).to_f | |
self.reconnect_retry_multiplier = (opts.delete(:reconnect_retry_multiplier) || 2).to_f | |
self.reconnect_max_retry_seconds = (opts.delete(:reconnect_max_retry_seconds) || 5).to_f | |
setup_original(opts) | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# NOTE: The fix below is not needed if running mongo V1.12.0 or higher | |
require 'mongo/networking' | |
module Mongo | |
module Networking | |
# Fix problem where a Timeout exception is not checking the socket back into the pool | |
# Based on code from Gem V1.11.1 | |
# Only change is the ensure block | |
def send_message_with_gle(operation, message, db_name, log_message=nil, write_concern=false) | |
docs = num_received = cursor_id = '' | |
add_message_headers(message, operation) | |
last_error_message = build_get_last_error_message(db_name, write_concern) | |
last_error_id = add_message_headers(last_error_message, Mongo::Constants::OP_QUERY) | |
packed_message = message.append!(last_error_message).to_s | |
sock = nil | |
begin | |
sock = checkout_writer | |
send_message_on_socket(packed_message, sock) | |
docs, num_received, cursor_id = receive(sock, last_error_id) | |
# Removed checkin | |
# checkin(sock) | |
rescue ConnectionFailure, OperationFailure, OperationTimeout => ex | |
# Removed checkin | |
# checkin(sock) | |
raise ex | |
rescue SystemStackError, NoMemoryError, SystemCallError => ex | |
close | |
raise ex | |
# Added ensure block to always check sock back in | |
ensure | |
checkin(sock) if sock | |
end | |
if num_received == 1 | |
error = docs[0]['err'] || docs[0]['errmsg'] | |
if error && error.include?("not master") | |
close | |
raise ConnectionFailure.new(docs[0]['code'].to_s + ': ' + error, docs[0]['code'], docs[0]) | |
elsif (!error.nil? && note = docs[0]['jnote'] || docs[0]['wnote']) # assignment | |
code = docs[0]['code'] || Mongo::ErrorCode::BAD_VALUE # as of server version 2.5.5 | |
raise WriteConcernError.new(code.to_s + ': ' + note, code, docs[0]) | |
elsif error | |
code = docs[0]['code'] || Mongo::ErrorCode::UNKNOWN_ERROR | |
error = "wtimeout" if error == "timeout" | |
raise WriteConcernError.new(code.to_s + ': ' + error, code, docs[0]) if error == "wtimeout" | |
raise OperationFailure.new(code.to_s + ': ' + error, code, docs[0]) | |
end | |
end | |
docs[0] | |
end | |
end | |
end |
Hi Reid, just curious what version of the Mongo Ruby driver are you using? I was going to try this with 1.9.2 but it looks like the methods like send_message
, send_message_with_gle
, and receive_message
are now defined in the Mongo::Networking
module and not Mongo::MongoClient
.
Yes, we ran into the same issue when upgrading the mongo driver. This new version has been tested with the ruby driver: mongo
version 1.11.1
I needed the code in this gist for another gem that uses mongo
, so improved it and moved the code into it's own gem: https://github.com/reidmorrison/mongo_ha.
Check it out and let me know if you have any issues using it.
The above gist is deprecated in favor of the new mongo_ha gem.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This updated version supports Mongo Ruby driver V1.x, as well as MongoMapper.
It transparently handles any ReplicaSet reconfigurations or other network failures. It also detects and handles connection failures between routers (mongos) and replica-sets in the shard.
Failures while communicating with a MongoDB server, and when connecting to MongoDB are handled.
Connect failures are important especially during a ReplicaSet re-configuration since other threads will ask for new connections from the connection pool since the others are blocked waiting for the new master to be elected.