-
-
Save windbg/1970286 to your computer and use it in GitHub Desktop.
Python logger ZeroMQ, Gevent, CouchDB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# encoding: utf-8 | |
""" | |
A python logging Handler that use ZeroMQ (ØMQ). | |
+------+ +------+ +-------+ | |
| app1 | | app2 | | app X | | |
+------+ +------+ +-------+ | |
| PUSH | PUSH | PUSH | |
| | | | |
+------------+--------------+ | |
| | |
| PULL | |
+-------------+ | |
| Dispatcher | | |
+-------------+ | |
| PUB | |
| | |
+--------------- + -------------------+ | |
| | | | |
SUB | SUB | SUB | | |
+----------+ +---------------+ +-----------------+ | |
| irc bot | | console print | | couch db client | | |
+----------+ +---------------+ +-----------------+ | |
(not implemented yet) | HTTP REST | |
| | |
+------------+ | |
| couch db | | |
+------------+ | |
This project aims at providing an overview of how to use ØMQ brokerless | |
messaging facilities do dispach log entries to various channels. | |
This is a toy project developed as I learn ØMQ and gevent, | |
DO NOT USE IN PRODUCTION. | |
Any feedback appreciated : [email protected] @gwik | |
"app"'s can be any python process that log messages using python | |
stdlib logging module. Log messages are dispatched using | |
PUSH/PULL to a dispatcher that broadcast the messages to subscribers. | |
Suscribers can send the messages to a IRC channel, print them on the | |
console or store them in a couch database. | |
You'll need the following packages installed : | |
gevent gevent_zeromq couchdbkit | |
You can start the different process as follow : | |
python2.7 zeromqlogger.py log|dispatcher|couchsub|printersub | |
You can start as many `log` as you want. | |
Tested with python2.7 on MAC OS X 10.6. | |
Copyleft! | |
""" | |
from gevent_zeromq import zmq | |
from gevent import monkey | |
import gevent.queue | |
monkey.patch_all() | |
import gevent.pool | |
import logging | |
import socket | |
import json | |
from datetime import datetime | |
from restkit.globals import set_manager | |
from restkit.manager.mgevent import GeventManager | |
# set the gevent connection manager | |
set_manager(GeventManager()) | |
from couchdbkit import Server | |
from couchdbkit.schema import properties, Document | |
class Handler(logging.Handler): | |
""" A logging handler for sending notifications to a ømq PUSH. | |
""" | |
def __init__(self, address, | |
pool_size=5, level=logging.NOTSET): | |
context = zmq.Context() | |
self.hostname = socket.gethostname() | |
self.socket = context.socket(zmq.PUSH) | |
self.context = self.socket.context | |
self.socket.connect(address) | |
# channel queue, put always blocks until delivered | |
self.channel = gevent.queue.Queue(0) | |
self._job = gevent.spawn(self.__send) | |
super(Handler, self).__init__(level) | |
def createLock(self): | |
pass | |
def acquire(self): | |
pass | |
def release(self): | |
pass | |
def flush(self): | |
pass | |
def close(self): | |
self._job.kill(timeout=2) | |
def emit(self, record): | |
self.channel.put(record) | |
def __send(self): | |
while True: | |
record = self.channel.get() | |
message = record.__dict__ | |
message['hostname'] = self.hostname | |
self.socket.send_json(message) | |
class Dispatcher(gevent.Greenlet): | |
""" PULL for messages and PUBlish them to SUBscribers. | |
The pulling and publishing is happening in there own separate | |
greenlet. | |
They communicate via a channel queue. | |
""" | |
def __init__(self, pull_address, publish_address): | |
super(Dispatcher, self).__init__() | |
self.context = zmq.Context() | |
self.pull_socket = self.context.socket(zmq.PULL) | |
self.pull_socket.bind(pull_address) | |
context = zmq.Context() | |
self.publish_socket = context.socket(zmq.PUB) | |
self.publish_socket.bind(publish_address) | |
self.channel = gevent.queue.Queue(0) | |
def _run(self): | |
self._pull_job = gevent.spawn(self.__pull) | |
self._publish_job = gevent.spawn(self.__publish) | |
self._pull_job.join() | |
self._publish_job.join() | |
def __pull(self): | |
while True: | |
info = self.pull_socket.recv_json() | |
self.channel.put(info) | |
def __publish(self): | |
while True: | |
info = self.channel.get() | |
self.publish_socket.send_multipart([ | |
info['name'].encode('utf-8'), | |
json.dumps(info)]) | |
class PrintSubscriber(gevent.Greenlet): | |
""" Subscribe to dispatcher and print on console standard output | |
""" | |
def __init__(self, address, topic=''): | |
super(PrintSubscriber, self).__init__() | |
self.context = zmq.Context() | |
self.socket = self.context.socket(zmq.SUB) | |
self.socket.setsockopt(zmq.SUBSCRIBE, topic) | |
self.topic = topic | |
self.socket.connect(address) | |
def _run(self): | |
while True: | |
topic, info = self.socket.recv_multipart() | |
info = json.loads(info) | |
print "topic %s/%s [%d] %s" % ( | |
self.topic, topic, info['process'], info['msg']) | |
class LogEntry(Document): | |
relativeCreated = properties.FloatProperty() | |
msecs = properties.FloatProperty() | |
args = properties.StringListProperty() | |
name = properties.StringProperty() | |
thread = properties.IntegerProperty() | |
created = properties.DateTimeProperty() | |
process = properties.IntegerProperty() | |
threadNam = properties.StringProperty() | |
module = properties.StringProperty() | |
filename = properties.StringProperty() | |
levelno = properties.IntegerProperty() | |
processName = properties.StringProperty() | |
pathname = properties.StringProperty() | |
lineno = properties.IntegerProperty() | |
exc_text = properties.StringProperty() | |
exc_info = properties.StringProperty() | |
funcName = properties.StringProperty() | |
hostname = properties.StringProperty() | |
levelname = properties.StringProperty() | |
msg = properties.StringProperty() | |
class CouchSubscriber(gevent.Greenlet): | |
""" Subscriber that stores messages in a couch db. | |
""" | |
def __init__(self, couch, address): | |
super(CouchSubscriber, self).__init__() | |
self.server = couch | |
self.context = zmq.Context() | |
self.socket = self.context.socket(zmq.SUB) | |
self.socket.setsockopt(zmq.SUBSCRIBE, '') | |
self.socket.connect(address) | |
def _run(self): | |
while True: | |
topic, info = self.socket.recv_multipart() | |
info = json.loads(info) | |
info['created'] = datetime.fromtimestamp(info['created']) | |
doc = LogEntry(**info) | |
doc.save() | |
def run_couch_sub(address): | |
couch = Server() | |
db = couch.get_or_create_db('log') | |
LogEntry.set_db(db) | |
job = CouchSubscriber(couch, pub_address) | |
job.start() | |
return job | |
def run_logger(log_address): | |
import logging.config | |
config = { | |
'version': 1, | |
'handlers': { | |
'zmq': { | |
'class': '__main__.Handler', | |
'level': 'DEBUG', | |
'address': log_address | |
} | |
}, | |
'root': { | |
'level': 'DEBUG', | |
'handlers': ['zmq'] | |
}, | |
} | |
logging.config.dictConfig(config) | |
def log(topic, wait=1): | |
logger = logging.getLogger(topic) | |
logger.setLevel(logging.DEBUG) | |
while True: | |
logger.info('some info behind logged') | |
gevent.sleep(wait) | |
return gevent.spawn(log, __name__, wait=1) | |
if __name__ == '__main__': | |
import sys | |
name = sys.argv.pop() | |
log_address = 'ipc:///tmp/zmqlog' | |
pub_address = 'ipc:///tmp/logpub' | |
if name == 'log': | |
print "starting example logging application..." | |
job = run_logger(log_address) | |
job.join() | |
elif name == 'dispatcher': | |
print "starting dispatcher..." | |
job = Dispatcher(log_address, pub_address) | |
job.start() | |
job.join() | |
elif name == 'printersub': | |
print "starting printer subscriber..." | |
job = PrintSubscriber(pub_address) | |
job.start() | |
job.join() | |
elif name == 'couchsub': | |
print "starting couchdb subscriber..." | |
job = run_couch_sub(pub_address) | |
job.join() | |
else: | |
print "invalid usage : log|dispatcher|couchsub|printersub" | |
exit(1) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment