Last active
December 22, 2015 20:49
-
-
Save ajdavis/6529576 to your computer and use it in GitHub Desktop.
Replica set client with multithreaded monitoring.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Member | |
def init(): | |
self.connecting = True | |
self.sleep_condition = condition variable | |
self.monitoring = True | |
self.up = False | |
start a thread that runs monitor() | |
def monitor() | |
while self.monitoring: | |
# TODO: notice if Cluster has been GC'ed and die. | |
try: | |
info = call_ismaster() | |
changed = info != self.info | |
self.info = info | |
self.up = True | |
self.client.process_ismaster_response(info) | |
except | |
self.up = False | |
self.connecting = False | |
self.sleep_condition.wait(30) | |
def wake() | |
self.sleep_condition.notify() | |
class Cluster | |
hosts # known seeds or, once connected, hosts + passives | |
members_and_ismaster # Tuple of set, response | |
def init(seeds) | |
self.bootstrap(seeds) | |
def bootstrap(seeds) | |
members = [new Member(seed) for seed in seeds] | |
self.members_and_ismaster = (members, None) | |
# select_member_callback chooses an up member matching a read pref | |
def get_member(select_member_callback) | |
end = now() + 20 sec # configurable | |
while True: | |
members, _ = self.members_and_ismaster | |
# members can't change here | |
member = select_member_callback(members) | |
if member: | |
return member | |
if not any member.connecting: | |
raise error, "No members match selector" | |
try | |
# Wait here during initial lazy connect, or after disconnect() | |
# TODO: "phase" like in Java / .NET design to avoid race, if | |
self.rs_changed_condition.wait(end - now()) | |
except timeout: | |
raise error, "I took more than 20 seconds to connect" | |
# called on a monitor thread | |
def process_ismaster_response(response) | |
lock(process_ismaster_response) | |
# TODO: check rs name, raise on main thread if wrong | |
# TODO: | |
old_members, old_response = self.members_and_ismaster | |
if old_response == response: | |
unlock(process_ismaster_response) | |
return | |
if ( | |
response.set_version | |
and response.set_version > old_members.set_version): | |
members = old_members.copy() | |
for member in members: | |
if member not in response: | |
member.monitoring = False | |
members.remove(member) | |
for host in response.hosts + response.passives: | |
if host not in members: | |
members.add(new Member(host)) | |
members.set_version = response.set_version | |
# Read-copy-update pattern. | |
self.members_and_ismaster = (members, response) | |
self.hosts = response.hosts + response.passives | |
unlock(process_ismaster_response) | |
self.rs_changed_condition.notify() | |
# Called in response to any socket err. | |
def disconnect(): | |
lock(self.members) | |
for member in self.members: | |
member.connecting = True | |
member.wake() | |
unlock(self.members) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Java / .NET design notes | |
they have handleReplicaSetMemberChange, handleStandaloneServerChange, | |
handleShardRouterChange | |
their Servers (our Members) can be subclassed. TestServer has a | |
sendNotification method so RS client can be tested w/ o a real server. | |
Their "request" has a different name | |
Name "session" conflicts with a server-side term | |
they somewhat conflate requests w/ cursors' need to send getMore to | |
same server | |
Our new Request object has to document it's not thread-safe |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment