Skip to content

Instantly share code, notes, and snippets.

@ajdavis
Last active December 22, 2015 20:49
Show Gist options
  • Save ajdavis/6529576 to your computer and use it in GitHub Desktop.
Save ajdavis/6529576 to your computer and use it in GitHub Desktop.
Replica set client with multithreaded monitoring.
class Member
def init():
self.connecting = True
self.sleep_condition = condition variable
self.monitoring = True
self.up = False
start a thread that runs monitor()
def monitor()
while self.monitoring:
# TODO: notice if Cluster has been GC'ed and die.
try:
info = call_ismaster()
changed = info != self.info
self.info = info
self.up = True
self.client.process_ismaster_response(info)
except
self.up = False
self.connecting = False
self.sleep_condition.wait(30)
def wake()
self.sleep_condition.notify()
class Cluster
hosts # known seeds or, once connected, hosts + passives
members_and_ismaster # Tuple of set, response
def init(seeds)
self.bootstrap(seeds)
def bootstrap(seeds)
members = [new Member(seed) for seed in seeds]
self.members_and_ismaster = (members, None)
# select_member_callback chooses an up member matching a read pref
def get_member(select_member_callback)
end = now() + 20 sec # configurable
while True:
members, _ = self.members_and_ismaster
# members can't change here
member = select_member_callback(members)
if member:
return member
if not any member.connecting:
raise error, "No members match selector"
try
# Wait here during initial lazy connect, or after disconnect()
# TODO: "phase" like in Java / .NET design to avoid race, if
self.rs_changed_condition.wait(end - now())
except timeout:
raise error, "I took more than 20 seconds to connect"
# called on a monitor thread
def process_ismaster_response(response)
lock(process_ismaster_response)
# TODO: check rs name, raise on main thread if wrong
# TODO:
old_members, old_response = self.members_and_ismaster
if old_response == response:
unlock(process_ismaster_response)
return
if (
response.set_version
and response.set_version > old_members.set_version):
members = old_members.copy()
for member in members:
if member not in response:
member.monitoring = False
members.remove(member)
for host in response.hosts + response.passives:
if host not in members:
members.add(new Member(host))
members.set_version = response.set_version
# Read-copy-update pattern.
self.members_and_ismaster = (members, response)
self.hosts = response.hosts + response.passives
unlock(process_ismaster_response)
self.rs_changed_condition.notify()
# Called in response to any socket err.
def disconnect():
lock(self.members)
for member in self.members:
member.connecting = True
member.wake()
unlock(self.members)
Java / .NET design notes
they have handleReplicaSetMemberChange, handleStandaloneServerChange,
handleShardRouterChange
their Servers (our Members) can be subclassed. TestServer has a
sendNotification method so RS client can be tested w/ o a real server.
Their "request" has a different name
Name "session" conflicts with a server-side term
they somewhat conflate requests w/ cursors' need to send getMore to
same server
Our new Request object has to document it's not thread-safe
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment