Created
October 4, 2013 19:46
-
-
Save cocagne/6831642 to your computer and use it in GitHub Desktop.
Trivial key-value store for configuration content
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys | |
import zmq | |
import json | |
import datetime | |
import argparse | |
PUB_ADDR = 'ipc:///tmp/config_daemon_pub' | |
REP_ADDR = 'ipc:///tmp/config_daemon_rep' | |
context = zmq.Context() | |
req_sock = context.socket(zmq.REQ) | |
sub_sock = context.socket(zmq.SUB) | |
def pretty_print_json(j): | |
print json.dumps(json.loads(j), sort_keys=True, indent=4, separators=(',', ': ')) | |
def err_exit( msg ): | |
print >> sys.stdout, msg | |
sys.exit(1) | |
def check_error( parts ): | |
if parts[0] == 'ERROR': | |
err_exit( parts[1] ) | |
def get_cmd( args ): | |
req_sock.connect(REP_ADDR) | |
parts = ['GET'] | |
if args.label: | |
parts.append( args.label ) | |
req_sock.send_multipart(parts) | |
parts = req_sock.recv_multipart() | |
check_error(parts) | |
pretty_print_json( parts[1] ) | |
def get_key_cmd( args ): | |
req_sock.connect(REP_ADDR) | |
req_sock.send_multipart(['GET']) | |
parts = req_sock.recv_multipart() | |
check_error(parts) | |
print json.loads(parts[1]).get(args.key, '') | |
def set_cmd( args ): | |
if not (len(args.pairs)) % 2 == 0 and len(args.pairs) >= 2: | |
err_exit('Arguments must be an even number of key, value pairs') | |
req_sock.connect(REP_ADDR) | |
d = dict() | |
i = 0 | |
while i < len(args.pairs): | |
v = args.pairs[i+1] | |
d[ args.pairs[i] ] = v if v != 'NULL' else None | |
i += 2 | |
req_sock.send_multipart(['SET', json.dumps(d)]) | |
parts = req_sock.recv_multipart() | |
check_error(parts) | |
def watch_cmd( args ): | |
sub_sock.connect(PUB_ADDR) | |
sub_sock.setsockopt(zmq.SUBSCRIBE, 'UPDATE') | |
while True: | |
parts = sub_sock.recv_multipart() | |
print '-'*80 | |
print 'Updated: ' | |
pretty_print_json( parts[1] ) | |
if args.show_current: | |
print 'Current: ' | |
pretty_print_json( parts[2] ) | |
def list_cmd( args ): | |
req_sock.connect(REP_ADDR) | |
req_sock.send_multipart(['LIST']) | |
parts = req_sock.recv_multipart() | |
check_error(parts) | |
label_list = json.loads(parts[1]) | |
if not label_list: | |
print 'No labels defined' | |
else: | |
for t in label_list: | |
print datetime.datetime(*t[0]).isoformat(), t[1] | |
def label_cmd( args ): | |
req_sock.connect(REP_ADDR) | |
if not args.label[0].isalpha(): | |
err_exit('Label must begin with a letter') | |
req_sock.send_multipart(['LABEL', args.label]) | |
parts = req_sock.recv_multipart() | |
check_error(parts) | |
def restore_cmd( args ): | |
req_sock.connect(REP_ADDR) | |
if not args.label[0].isalpha(): | |
err_exit('Label must begin with a letter') | |
req_sock.send_multipart(['RESTORE', args.label]) | |
parts = req_sock.recv_multipart() | |
check_error(parts) | |
description='''Configuration daemon CLI''' | |
top = argparse.ArgumentParser(description=description) | |
sub = top.add_subparsers() | |
sub_get = sub.add_parser('get', help='Gets the current configuration') | |
sub_get.add_argument("label", nargs='?', help="Label of the configuration. If omitted, the current configuration is obtained") | |
sub_get.set_defaults( sub_command = get_cmd ) | |
sub_get = sub.add_parser('get-value', help='Prints the value of the requested key') | |
sub_get.add_argument("key", help="Key to print") | |
sub_get.set_defaults( sub_command = get_key_cmd ) | |
sub_set = sub.add_parser('set', help='Sets configuration key-value pairs. Values set to NULL will be removed from the configuration database') | |
sub_set.add_argument("pairs", nargs='+', help="List of key, value pairs. The number of arguments must be even.") | |
sub_set.set_defaults( sub_command = set_cmd ) | |
sub_watch = sub.add_parser('watch', help='Watches for configuration updates') | |
sub_watch.add_argument("-s", '--show-current', action='store_true', help="Shows the full current configuration in addition to the changed values on each update.") | |
sub_watch.set_defaults( sub_command = watch_cmd ) | |
sub_list = sub.add_parser('list', help='Lists the labled configurations') | |
sub_list.set_defaults( sub_command = list_cmd ) | |
sub_label = sub.add_parser('label', help='Labels the current configuration') | |
sub_label.add_argument("label", help="Label for the current configuration. Must begin with a letter") | |
sub_label.set_defaults( sub_command = label_cmd ) | |
sub_restore = sub.add_parser('restore', help='Restores a labeled configuration') | |
sub_restore.add_argument("label", help="Labled configuration") | |
sub_restore.set_defaults( sub_command = restore_cmd ) | |
args = top.parse_args() | |
try: | |
args.sub_command( args ) | |
except KeyboardInterrupt: | |
pass | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' | |
= Simple Configuration Daemon = | |
This module implements a low-performance but very simple configuration daemon | |
that is built on top of three core technologies: | |
* ZeroMQ for Inter-Process Communication | |
* JSON for message encoding | |
* Zipfiles for configuration archival and labeling | |
Configurations content is small (less than 10k on average) and updates are rare | |
(10s of operations during a busy week). Consequently, performance and scalability | |
are effectively non-issues. Robust operation and easy client use are the | |
primary goals. | |
One aspect of this daemon that will not be immediately apparent to those not | |
familiar with ZeroMQ is that client applications do no need to worry about | |
bootstrapping considerations. For example, a client application may send a GET | |
request message before the configuration daemon starts and it will be correctly | |
serviced once the configuration daemon starts up. Similarly, if the | |
configuration daemon is restarted on-the-fly, client applications will notice | |
nothing more than delayed responses to their queries. The built-in reconnection | |
and retry logic in ZeroMQ insulates the clients from having to deal with these | |
issues themselves. | |
== Design == | |
Single-file database:: | |
A single JSON file is used to store the current configuration. Updates | |
are done by way of a full re-write to a temp file followed by an | |
os.rename() | |
Atomic updates:: | |
Multiple key-value pairs may be updated in a single request. Requests | |
are single-threaded so no intermediate values are possible. | |
No partial reads:: | |
Requests for configuration return the entire configuration as a | |
single JSON object. | |
Zip-file archival of old configurations:: | |
Each time the configuration is modified, the previous configuration | |
is appended, in it's entirety to an +archive.zip+ file in the | |
same directory as the JSON file. | |
Configuration Label/Restore:: | |
Clients can assign labels to configurations and later request that | |
the configuration be restored to the value associated with that | |
label. | |
Completely passive:: | |
The daemon does nothing but respond to external requests. | |
ZeroMQ is used for communication:: | |
* Easy to use | |
* Language bindings exist for all programming languages | |
* zmq's built-in queueing and retry mechanisms simplify client code | |
JSON encoding used for all complex data structures:: | |
Every language has an easy-to-use JSON library | |
Publish-Subscribe used for notifying clients of configuration updates:: | |
* All changes to the current configuration result in an UPDATE message | |
being published | |
* The UPDATE message contains two JSON encoded objects: | |
.. A dictionary of the updates where the keys are the modified attributes | |
and the values are tuples of (previous_value, new_vaule) | |
.. The complete current configuration | |
== Bootstrapping == | |
On initial start, the daemon will ensure that the database directory | |
exists and will create an empty initial configuration "{}" as well | |
as an empty archive. The intiail content must be inserted by an | |
external entity. | |
== Protocol == | |
=== Message Encoding === | |
ZeroMQ multi-part messages are used to separate the parts of messages. | |
=== Requests === | |
==== Request Encoding ==== | |
Client requests place the request type in part 0 and follow it with any | |
request-specific parts. | |
All requests will receive a reply. If the requested operation was successfull, | |
the reply will contain 'OK' in part0 and the subsequent parts (if any) will be | |
formatted on a per-request basis. If an error occured, the reply will consist | |
of two message parts with 'ERROR' contained in part 0 and a human-readable | |
error message in part 1. | |
==== GET ==== | |
Additional Request Parts: [label] | |
Additional Response parts: json-encoded-configuration | |
Obtains the configuration for the requested label or the current configuration | |
if +[label]+ is omitted | |
==== SET ==== | |
Additional Request Parts: json-encoded-dictionary | |
Additional Response parts: | |
Sets one or more values in the curent configuration. The encoded dictionary | |
must contain string keys mapping to primitive values (numbers and string | |
only). If the value is encoded as a JSON null, the corresponding entry in the | |
config database will be deleted. | |
==== LABEL ==== | |
Additional Request Parts: label | |
Additional Response parts: | |
Sets the supplied label to reference the current configuration | |
==== LIST ==== | |
Additional Request Parts: | |
Additional Response parts: json-encoded-list | |
Obtains a list of all available configuration labels. The entries of | |
the encoded list are tuples of: ( (year,day,month,hours,minutes,seconds), label ) | |
The first element of the tuple is the time at which the label was created. | |
Each entry in the timestamp tuple is an integer. | |
==== RESTORE ==== | |
Additional Request Parts: label | |
Additional Response parts: | |
Restores the configuration to the labeled value | |
=== Publishes === | |
=== UPDATE === | |
Part 0: 'UPDATE' | |
Part 1: json-encoded-deltas-dictionary | |
Part 2: json-encoded-current-configuration-dictionary | |
The deltas dictionary maps the modified keys to tuples of (previous_value, current_value) | |
''' | |
import os | |
import zmq | |
import json | |
import time | |
import os.path | |
import zipfile | |
import tempfile | |
DB_DIR = '/tmp/db_dir' | |
PUB_ADDR = 'ipc:///tmp/config_daemon_pub' | |
REP_ADDR = 'ipc:///tmp/config_daemon_rep' | |
CURRENT_FN = DB_DIR + '/current' | |
ARCHIVE_FN = DB_DIR + '/archive.zip' | |
jcurrent = '{}' # JSON-searialized version of "current" used as a cache to | |
# prevent continual re-serialization | |
current = dict() | |
if not os.path.exists(DB_DIR): | |
os.makedirs(DB_DIR, mode=0755) | |
if os.path.exists(CURRENT_FN): | |
with open(CURRENT_FN) as f: | |
jcurrent = f.read() | |
current = json.loads( jcurrent ) | |
context = zmq.Context() | |
pub_sock = context.socket(zmq.PUB) | |
rep_sock = context.socket(zmq.REP) | |
pub_sock.bind(PUB_ADDR) | |
rep_sock.bind(REP_ADDR) | |
def archive_and_update( new_values, save_name=None, restore=False ): | |
global jcurrent | |
try: | |
archive = zipfile.ZipFile(ARCHIVE_FN, 'a', zipfile.ZIP_DEFLATED) | |
archive.writestr(str(time.time()) if save_name is None else save_name, jcurrent) | |
archive.close() | |
except: | |
pass # Non-essential | |
updated = dict() | |
if restore: | |
for k in set(current.keys()) - set(new_values.keys()): | |
new_values[ k ] = None | |
for k,v in new_values.iteritems(): | |
old = current.get(k,None) | |
if old != v: | |
updated[ k ] = (old, v) | |
if not updated: | |
return | |
current.update( new_values ) | |
for k,v in current.items(): | |
if v is None: | |
del current[k] | |
jcurrent = json.dumps(current) | |
fd, abs_fn = tempfile.mkstemp( dir=DB_DIR ) | |
with os.fdopen(fd, 'w') as f: | |
f.write(jcurrent) | |
f.flush() | |
os.fsync(fd) | |
os.chmod(abs_fn, 0644) | |
os.rename(abs_fn, CURRENT_FN) | |
pub_sock.send_multipart( ['UPDATE', json.dumps(updated), jcurrent] ) | |
def get_labeled_config( label ): | |
try: | |
archive = zipfile.ZipFile(ARCHIVE_FN) | |
new_values = json.loads(archive.read( label )) | |
archive.close() | |
return new_values | |
except Exception, e: | |
raise Exception('Failed to load labled configuration {0}: {1}'.format(label, str(e))) | |
def restore( label ): | |
archive_and_update( get_labeled_config( label ), restore=True ) | |
def list_labels(): | |
try: | |
archive = zipfile.ZipFile(ARCHIVE_FN) | |
ret = [ (zi.date_time, zi.filename) for zi in archive.infolist() if zi.filename[0].isalpha() ] | |
archive.close() | |
return json.dumps(ret) | |
except Exception, e: | |
raise Exception('Failed to obtain archived configurations: {0}'.format(str(e))) | |
while True: | |
try: | |
parts = rep_sock.recv_multipart() | |
except KeyboardInterrupt: | |
break | |
reply_current = True | |
try: | |
if parts[0] == 'GET': | |
if len(parts) > 1: | |
reply_current = False | |
rep_sock.send_multipart(['OK', json.dumps(get_labeled_config(parts[1]))]) | |
elif parts[0] == 'SET': | |
try: | |
new_values = json.loads(parts[1]) | |
except: | |
raise Exception('Failed to parse json value') | |
if not isinstance(new_values, dict): | |
raise Exception('JSON value must be a dictionary') | |
for k,v in new_values.iteritems(): | |
if not isinstance(k,basestring): | |
raise Exception('Keys must be strings') | |
if isinstance(v,list) or isinstance(v,dict): | |
raise Exception('Values must be integers or strings') | |
archive_and_update( new_values ) | |
elif parts[0] == 'LIST': | |
reply_current = False | |
rep_sock.send_multipart(['OK', list_labels()]) | |
elif parts[0] == 'LABEL': | |
if len(parts) != 2: | |
raise Exception('LABEL requires two message parts') | |
if len(parts[1]) > 64: | |
raise Exception('Label name exceeds 64 characters') | |
archive_and_update( dict(), parts[1] ) | |
elif parts[0] == 'RESTORE': | |
if len(parts) != 2: | |
raise Exception('RESTORE requires two message parts') | |
restore( parts[1] ) | |
else: | |
raise Exception('Unsupported request type') | |
if reply_current: | |
rep_sock.send_multipart(['OK', jcurrent]) | |
except Exception, e: | |
import traceback | |
traceback.print_exc() | |
rep_sock.send_multipart(['ERROR', str(e)]) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment