Last active
September 7, 2021 23:21
-
-
Save ShaneHarvey/26ae13af634eec04b34e818503b07bab to your computer and use it in GitHub Desktop.
Very rough python script to start MongoDB clusters using mongo-orchestration
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright 2015 MongoDB, Inc. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
import atexit | |
import copy | |
import itertools | |
import time | |
import os | |
import ssl | |
import sys | |
import pymongo | |
import requests | |
if sys.version_info[0] == 3: | |
raw_input = input | |
unicode = str | |
# Configurable hosts and ports used in the tests | |
db_user = unicode(os.environ.get("DB_USER", "")) | |
db_password = unicode(os.environ.get("DB_PASSWORD", "")) | |
# Document count for stress tests | |
STRESS_COUNT = 100 | |
# Test namespace, timestamp arguments | |
TESTARGS = ('test.test', 1) | |
_mo_address = os.environ.get("MO_ADDRESS", "localhost:8889") | |
_mongo_start_port = int(os.environ.get("MONGO_PORT", 27017)) | |
_free_port = itertools.count(_mongo_start_port) | |
DEFAULT_OPTIONS = { | |
'logappend': True, | |
'ipv6': True, | |
'bind_ip': '127.0.0.1,::1', | |
# 'storageEngine': 'mmapv1', | |
# 'networkMessageCompressors': 'disabled', | |
# 'vvvvv': '', | |
'setParameter': {'enableTestCommands': 1}, # 'logicalSessionRefreshMillis': 1000000}, | |
} | |
_post_request_template = {} | |
if db_user and db_password: | |
_post_request_template = {'login': db_user, 'password': db_password} | |
def _mo_url(resource, *args): | |
return 'http://' + '/'.join([_mo_address, resource] + list(args)) | |
@atexit.register | |
def kill_all(): | |
clusters = requests.get(_mo_url('sharded_clusters')).json() | |
repl_sets = requests.get(_mo_url('replica_sets')).json() | |
servers = requests.get(_mo_url('servers')).json() | |
for cluster in clusters['sharded_clusters']: | |
requests.delete(_mo_url('sharded_clusters', cluster['id'])) | |
for rs in repl_sets['replica_sets']: | |
requests.delete(_mo_url('relica_sets', rs['id'])) | |
for server in servers['servers']: | |
requests.delete(_mo_url('servers', server['id'])) | |
class MCTestObject(object): | |
def proc_params(self): | |
params = copy.deepcopy(DEFAULT_OPTIONS) | |
params.update(self._proc_params) | |
params["port"] = next(_free_port) | |
return params | |
def get_config(self): | |
raise NotImplementedError | |
def _make_post_request(self): | |
config = _post_request_template.copy() | |
config.update(self.get_config()) | |
import pprint | |
pprint.pprint(config) | |
ret = requests.post( | |
_mo_url(self._resource), timeout=None, json=config)#.json() | |
if not ret.ok: | |
raise RuntimeError( | |
"Error sending POST to cluster: %s" % (ret.text,)) | |
ret = ret.json() | |
if type(ret) == list: # Will return a list if an error occurred. | |
raise RuntimeError("Error sending POST to cluster: %s" % (ret,)) | |
pprint.pprint(ret) | |
return ret | |
def _make_get_request(self): | |
ret = requests.get(_mo_url(self._resource, self.id), timeout=None) | |
if not ret.ok: | |
raise RuntimeError( | |
"Error sending GET to cluster: %s" % (ret.text,)) | |
ret = ret.json() | |
if type(ret) == list: # Will return a list if an error occurred. | |
raise RuntimeError("Error sending GET to cluster: %s" % (ret,)) | |
return ret | |
def client(self, **kwargs): | |
client = pymongo.MongoClient(self.uri, **kwargs) | |
if db_user: | |
client.admin.authenticate(db_user, db_password) | |
return client | |
def stop(self): | |
requests.delete(_mo_url(self._resource, self.id)) | |
class Server(MCTestObject): | |
_resource = 'servers' | |
def __init__(self, id=None, uri=None, **kwargs): | |
self.id = id | |
self.uri = uri | |
self._proc_params = kwargs | |
def get_config(self): | |
return { | |
'name': 'mongod', | |
'procParams': self.proc_params()} | |
def start(self): | |
if self.id is None: | |
response = self._make_post_request() | |
self.id = response['id'] | |
self.uri = response.get('mongodb_auth_uri', | |
response['mongodb_uri']) | |
else: | |
requests.post( | |
_mo_url('servers', self.id), timeout=None, | |
json={'action': 'start'} | |
) | |
return self | |
def stop(self, destroy=True): | |
if destroy: | |
super(Server, self).stop() | |
else: | |
requests.post(_mo_url('servers', self.id), timeout=None, | |
json={'action': 'stop'}) | |
class ReplicaSet(MCTestObject): | |
_resource = 'replica_sets' | |
def __init__(self, id=None, uri=None, primary=None, secondary=None, | |
single=False, **kwargs): | |
self.single = single | |
self.id = id | |
self.uri = uri | |
self.primary = primary | |
self.secondary = secondary | |
self._proc_params = kwargs | |
self.members = [] | |
def proc_params(self): | |
params = super(ReplicaSet, self).proc_params() | |
# params.setdefault('setParameter', {}).setdefault('transactionLifetimeLimitSeconds', 3) | |
# params.setdefault('setParameter', {}).setdefault('periodicNoopIntervalSecs', 1) | |
return params | |
def get_config(self): | |
members = [{'procParams': self.proc_params()}] | |
if not self.single: | |
members.extend([ | |
{'procParams': self.proc_params()}, | |
{#'rsParams': {'arbiterOnly': True}, | |
'procParams': self.proc_params()} | |
]) | |
return {'members': members} | |
def _init_from_response(self, response): | |
self.id = response['id'] | |
self.uri = response.get('mongodb_auth_uri', response['mongodb_uri']) | |
for member in response['members']: | |
m = Server(member['server_id'], member['host']) | |
self.members.append(m) | |
if member['state'] == 1: | |
self.primary = m | |
elif member['state'] == 2: | |
self.secondary = m | |
return self | |
def start(self): | |
# We never need to restart a replica set, only start new ones. | |
return self._init_from_response(self._make_post_request()) | |
def restart_primary(self): | |
self.primary.stop(destroy=False) | |
time.sleep(5) | |
self.primary.start() | |
time.sleep(1) | |
self._init_from_response(self._make_get_request()) | |
print('New primary: %s' % self.primary.uri) | |
class ReplicaSetSingle(ReplicaSet): | |
def get_config(self): | |
return { | |
'members': [ | |
{'procParams': self.proc_params()} | |
] | |
} | |
class ShardedCluster(MCTestObject): | |
_resource = 'sharded_clusters' | |
_shard_type = ReplicaSet | |
def __init__(self, **kwargs): | |
self.id = None | |
self.uri = None | |
self.shards = [] | |
self._proc_params = kwargs | |
def get_config(self): | |
return { | |
# 'configsvrs': [{'members': [DEFAULT_OPTIONS.copy()]}], | |
'routers': [self.proc_params(), self.proc_params()], | |
'shards': [ | |
{'id': 'demo-set-0', 'shardParams': | |
self._shard_type().get_config()}, | |
# {'id': 'demo-set-1', 'shardParams': | |
# self._shard_type().get_config()} | |
] | |
} | |
def start(self): | |
# We never need to restart a sharded cluster, only start new ones. | |
response = self._make_post_request() | |
for shard in response['shards']: | |
shard_resp = requests.get(_mo_url('replica_sets', shard['_id'])) | |
shard_json = shard_resp.json() | |
self.shards.append(self._shard_type()._init_from_response(shard_json)) | |
self.id = response['id'] | |
self.uri = response.get('mongodb_auth_uri', response['mongodb_uri']) | |
return self | |
class ShardedClusterSingle(ShardedCluster): | |
_shard_type = ReplicaSetSingle | |
def argv_has(string): | |
return any(string in arg for arg in sys.argv[1:]) | |
CERTS = '/Users/shane/git/mongo-python-driver/test/certificates/' | |
# CERTS = 'C:\\cygwin\\home\\Administrator\\mongo-python-driver\\test\\certificates\\' | |
# Requires mongo-orchestration running on port 8889. | |
# For SSL support you must modify the CERTS path. | |
# | |
# Usage: | |
# python launch.py <single|repl|shard> <auth> <ssl> | |
# | |
# Examples (standalone node): | |
# python ~/launch.py single | |
# python ~/launch.py single auth | |
# python ~/launch.py single auth ssl | |
# | |
# Sharded clusters: | |
# python ~/launch.py shard | |
# python ~/launch.py shard auth | |
# python ~/launch.py shard auth ssl | |
# | |
# Replica sets: | |
# python ~/launch.py repl | |
# python ~/launch.py repl single | |
# python ~/launch.py repl single auth | |
if __name__ == '__main__': | |
for arg in sys.argv[1:]: | |
try: | |
port = int(arg) | |
_free_port = itertools.count(port) | |
except: | |
pass | |
for version in ['2.6.12', '3.0.12', '3.2.10', '3.4.0-rc0']: | |
if argv_has(version): | |
_post_request_template['version'] = version | |
break | |
if argv_has('ssl') or argv_has('tls'): | |
_post_request_template['sslParams'] = { | |
"sslOnNormalPorts": True, | |
"sslPEMKeyFile": CERTS + "server.pem", | |
"sslCAFile": CERTS + "ca.pem", | |
"sslWeakCertificateValidation" : True | |
} | |
if argv_has('auth'): | |
_post_request_template['login'] = db_user or 'user' | |
_post_request_template['password'] = db_password or 'password' | |
_post_request_template['auth_key'] = 'secret' | |
single = argv_has('single') or argv_has('standalone') or argv_has('mongod') | |
if argv_has('repl'): | |
# DEFAULT_OPTIONS['enableMajorityReadConcern'] = '' | |
cluster = ReplicaSet(single=single) | |
elif argv_has('shard') or argv_has('mongos'): | |
cluster = ShardedClusterSingle() | |
elif single: | |
cluster = Server() | |
else: | |
exit('Usage: %s [single|replica|shard] [ssl] [auth]' % (__file__,)) | |
cluster.start() | |
while True: | |
data = raw_input('Type "q" to quit, "r" to shutdown and restart the primary": ') | |
if data == 'q': | |
break | |
if data == 'r': | |
cluster.restart_primary() | |
cluster.stop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Requires mongo-orchestration running on port 8889.
Initial set up
export MONGO_ORCHESTRATION_HOME=$HOME/mongo-orchestration
to your .zschrc or .bashrc$ mkdir -p ~/mongo-orchestration/lib
$ ln -s <mongo-python-driver-git-repo>/test/certificates/client.pem ~/mongo-orchestration/lib/client.pem
Example
For example, start mongo-orchestration:
Then in another terminal, start a 3 node replica set with ssl:
Usage:
python launch.py <single|repl|shard>
Examples (standalone node):
python launch.py single
python launch.py single auth
python launch.py single auth ssl
Sharded clusters:
python launch.py shard
python launch.py shard auth
python launch.py shard auth ssl
Replica sets:
python launch.py repl
python launch.py repl auth
python launch.py repl auth ssl
Single member replica set:
python launch.py repl single
python launch.py repl single auth
python launch.py repl single auth ssl