Skip to content

Instantly share code, notes, and snippets.

@reidmorrison
Last active September 28, 2015 21:19
Show Gist options
  • Save reidmorrison/1498297 to your computer and use it in GitHub Desktop.
Save reidmorrison/1498297 to your computer and use it in GitHub Desktop.
Mongo Reconnect logic for Connection Failures. Deprecated, moved to: https://github.com/reidmorrison/mongo_ha
#
# NOTE: This gist is now deprecated and has been moved into a gem: https://github.com/reidmorrison/mongo_ha
#
require 'mongo/collection'
module Mongo
class Collection
alias_method :find_one_original, :find_one
def find_one(*args)
@connection.retry_on_connection_failure do
find_one_original *args
end
end
alias_method :find_and_modify_original, :find_and_modify
def find_and_modify(*args)
@connection.retry_on_connection_failure do
find_and_modify_original *args
end
end
alias_method :aggregate_original, :aggregate
def aggregate(*args)
@connection.retry_on_connection_failure do
aggregate_original *args
end
end
alias_method :map_reduce_original, :map_reduce
def map_reduce(*args)
@connection.retry_on_connection_failure do
map_reduce_original *args
end
end
alias_method :group_original, :group
def group(*args)
@connection.retry_on_connection_failure do
group_original *args
end
end
end
end
require 'mongo/collection_writer'
module Mongo
class CollectionOperationWriter
alias_method :send_write_operation_original, :send_write_operation
def send_write_operation(*args)
@connection.retry_on_connection_failure do
send_write_operation_original *args
end
end
alias_method :batch_message_send_original, :batch_message_send
def batch_message_send(*args)
@connection.retry_on_connection_failure do
batch_message_send_original *args
end
end
end
end
require 'mongo'
# Add methods to Mongo Ruby driver to support retries on connection failure
# In the event of a connection failure, only one thread will attempt to re-establish
# connectivity to the Mongo server(s). This is to prevent swamping the mongo
# servers with reconnect attempts.
#
# Retries are initialy performed quickly in case it is brief network issue
# and then backs off to give the replica-set time to elect a new master
#
# Supports Ruby Mongo driver v1.x, as well as transparent support for MongoMapper
#
# Mongo Router processes will often return a connection failure on their side
# as an OperationFailure. This code will also retry automatically when the router
# has errors talking to a sharded cluster
module Mongo
class MongoClient
include SemanticLogger::Loggable
# The following options can be specified in the Mongo configuration options
# to tune the retry intervals during a connection failure
#
# :reconnect_attempts
# Number of times to attempt to reconnect.
# Default: 53
# reconnect_attempts = 120 seconds / reconnect_max_retry_seconds + (number of retries before reaching reconnect_max_retry_seconds)
#
# :reconnect_retry_seconds
# Initial delay before retrying
# Default: 0.1
#
# :reconnect_retry_multiplier
# Multiply delay by this number with each retry to prevent overwhelming the server
# Default: 2
#
# :reconnect_max_retry_seconds
# Maximum number of seconds to wait before retrying again
# Default: 5
#
# Using the default values, will result in retry connects at the following intervals
# 0.1 0.2 0.4 0.8 1.6 3.2 5 5 5 5 ....
CONNECTION_RETRY_OPTS = [:reconnect_attempts, :reconnect_retry_seconds, :reconnect_retry_multiplier, :reconnect_max_retry_seconds]
attr_accessor *CONNECTION_RETRY_OPTS
# The following errors occur when monogos cannot connect to the shard
# They require a retry to resolve them
# This list was created through painful experience. Add any new ones as they are discovered
# 9001: socket exception
# Operation failed with the following exception: Unknown error - Connection reset by peer:Unknown error - Connection reset by peer
# DBClientBase::findOne: transport error
# : db assertion failure
# 8002: 8002 all servers down!
# Operation failed with the following exception: stream closed
# Operation failed with the following exception: Bad file descriptor - Bad file descriptor:Bad file descriptor - Bad file descriptor
# Failed to connect to primary node.
# 10009: ReplicaSetMonitor no master found for set: mdbb
MONGOS_CONNECTION_ERRORS = [
'socket exception',
'Connection reset by peer',
'transport error',
'db assertion failure',
'8002',
'stream closed',
'Bad file descriptor',
'Failed to connect',
'10009',
'no master found',
'not master',
'Timed out waiting on socket',
"didn't get writeback",
]
# Retry the supplied block when a Mongo::ConnectionFailure occurs
#
# Note: Check for Duplicate Key on inserts
#
# Returns the result of the block
#
# Example:
# connection.retry_on_connection_failure { |retried| connection.ping }
def retry_on_connection_failure(&block)
raise "Missing mandatory block parameter on call to Mongo::Connection#retry_on_connection_failure" unless block
retried = false
mongos_retries = 0
begin
result = block.call(retried)
retried = false
result
rescue ConnectionFailure => exc
# Retry if reconnected, but only once to prevent an infinite loop
logger.warn "Connection Failure: '#{exc.message}' [#{exc.error_code}]"
if !retried && reconnect
retried = true
# TODO There has to be a way to flush the connection pool of all inactive connections
retry
end
raise exc
rescue OperationFailure => exc
# Workaround not master issue. Disconnect connection when we get a not master
# error message. Master checks for an exact match on "not master", whereas
# it sometimes gets: "not master and slaveok=false"
if exc.result
error = exc.result['err'] || exc.result['errmsg']
close if error && error.include?("not master")
end
# These get returned when connected to a local mongos router when it in turn
# has connection failures talking to the remote shards. All we do is retry the same operation
# since it's connections to multiple remote shards may have failed.
# Disconnecting the current connection will not help since it is just to the mongos router
# First make sure it is connected to the mongos router
raise exc unless (MONGOS_CONNECTION_ERRORS.any? { |err| exc.message.include?(err) }) || (exc.message.strip == ':')
mongos_retries += 1
if mongos_retries <= 60
retried = true
Kernel.sleep(0.5)
logger.warn "[#{primary.inspect}] Router Connection Failure. Retry ##{mongos_retries}. Exc: '#{exc.message}' [#{exc.error_code}]"
# TODO There has to be a way to flush the connection pool of all inactive connections
retry
end
raise exc
end
end
# Call this method whenever a MongoDB::ConnectionFailure Exception
# has been raised to re-establish the connection
#
# This method is thread-safe and ensure that only one thread at a time
# per connection will attempt to re-establish the connection
#
# Returns whether the connection is connected again
def reconnect
logger.debug "Going to reconnect"
# Prevent other threads from invoking reconnect logic at the same time
@@failover_mutex.synchronize do
# Another thread may have already failed over the connection by the
# time this threads gets in
if active?
logger.info "Connected to: #{primary.inspect}"
return true
end
# Close all sockets that are not checked out so that other threads not
# currently waiting on Mongo, don't get bad connections and have to
# retry each one in turn
@primary_pool.close if @primary_pool
if reconnect_attempts > 0
# Wait for other threads to finish working on their sockets
retries = 1
retry_seconds = reconnect_retry_seconds
begin
logger.warn "Connection unavailable. Waiting: #{retry_seconds} seconds before retrying"
sleep retry_seconds
# Call original connect method since it is already within a retry block
connect_original
rescue ConnectionFailure => exc
if retries < reconnect_attempts
retries += 1
retry_seconds *= reconnect_retry_multiplier
retry_seconds = reconnect_max_retry_seconds if retry_seconds > reconnect_max_retry_seconds
retry
end
logger.error "Auto-reconnect giving up after #{retries} reconnect attempts"
raise exc
end
logger.info "Successfully reconnected to: #{primary.inspect}"
end
connected?
end
end
# Replace built-in logger with SemanticLogger
def logger
super
end
# Wrap internal networking calls with retry logic
# Do not stub out :send_message_with_gle or :send_message
# It modifies the message, see CollectionWriter#send_write_operation
alias_method :receive_message_original, :receive_message
def receive_message(*args)
retry_on_connection_failure do
receive_message_original *args
end
end
alias_method :connect_original, :connect
def connect(*args)
retry_on_connection_failure do
connect_original *args
end
end
protected
# Prevent multiple threads from trying to reconnect at the same time during
# a connection failures
@@failover_mutex = Mutex.new
alias_method :valid_opts_original, :valid_opts
def valid_opts(*args)
valid_opts_original(*args) + CONNECTION_RETRY_OPTS
end
alias_method :setup_original, :setup
def setup(opts)
self.reconnect_attempts = (opts.delete(:reconnect_attempts) || 53).to_i
self.reconnect_retry_seconds = (opts.delete(:reconnect_retry_seconds) || 0.1).to_f
self.reconnect_retry_multiplier = (opts.delete(:reconnect_retry_multiplier) || 2).to_f
self.reconnect_max_retry_seconds = (opts.delete(:reconnect_max_retry_seconds) || 5).to_f
setup_original(opts)
end
end
end
# NOTE: The fix below is not needed if running mongo V1.12.0 or higher
require 'mongo/networking'
module Mongo
module Networking
# Fix problem where a Timeout exception is not checking the socket back into the pool
# Based on code from Gem V1.11.1
# Only change is the ensure block
def send_message_with_gle(operation, message, db_name, log_message=nil, write_concern=false)
docs = num_received = cursor_id = ''
add_message_headers(message, operation)
last_error_message = build_get_last_error_message(db_name, write_concern)
last_error_id = add_message_headers(last_error_message, Mongo::Constants::OP_QUERY)
packed_message = message.append!(last_error_message).to_s
sock = nil
begin
sock = checkout_writer
send_message_on_socket(packed_message, sock)
docs, num_received, cursor_id = receive(sock, last_error_id)
# Removed checkin
# checkin(sock)
rescue ConnectionFailure, OperationFailure, OperationTimeout => ex
# Removed checkin
# checkin(sock)
raise ex
rescue SystemStackError, NoMemoryError, SystemCallError => ex
close
raise ex
# Added ensure block to always check sock back in
ensure
checkin(sock) if sock
end
if num_received == 1
error = docs[0]['err'] || docs[0]['errmsg']
if error && error.include?("not master")
close
raise ConnectionFailure.new(docs[0]['code'].to_s + ': ' + error, docs[0]['code'], docs[0])
elsif (!error.nil? && note = docs[0]['jnote'] || docs[0]['wnote']) # assignment
code = docs[0]['code'] || Mongo::ErrorCode::BAD_VALUE # as of server version 2.5.5
raise WriteConcernError.new(code.to_s + ': ' + note, code, docs[0])
elsif error
code = docs[0]['code'] || Mongo::ErrorCode::UNKNOWN_ERROR
error = "wtimeout" if error == "timeout"
raise WriteConcernError.new(code.to_s + ': ' + error, code, docs[0]) if error == "wtimeout"
raise OperationFailure.new(code.to_s + ': ' + error, code, docs[0])
end
end
docs[0]
end
end
end
@reidmorrison
Copy link
Author

Yes, we ran into the same issue when upgrading the mongo driver. This new version has been tested with the ruby driver: mongo version 1.11.1

@reidmorrison
Copy link
Author

I needed the code in this gist for another gem that uses mongo, so improved it and moved the code into it's own gem: https://github.com/reidmorrison/mongo_ha.

Check it out and let me know if you have any issues using it.

The above gist is deprecated in favor of the new mongo_ha gem.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment