Skip to content

Instantly share code, notes, and snippets.

@windbg
Forked from gwik/zerologger.py
Created March 4, 2012 03:01
Show Gist options
  • Save windbg/1970286 to your computer and use it in GitHub Desktop.
Save windbg/1970286 to your computer and use it in GitHub Desktop.
Python logger ZeroMQ, Gevent, CouchDB
# encoding: utf-8
"""
A python logging Handler that use ZeroMQ (ØMQ).
+------+ +------+ +-------+
| app1 | | app2 | | app X |
+------+ +------+ +-------+
| PUSH | PUSH | PUSH
| | |
+------------+--------------+
|
| PULL
+-------------+
| Dispatcher |
+-------------+
| PUB
|
+--------------- + -------------------+
| | |
SUB | SUB | SUB |
+----------+ +---------------+ +-----------------+
| irc bot | | console print | | couch db client |
+----------+ +---------------+ +-----------------+
(not implemented yet) | HTTP REST
|
+------------+
| couch db |
+------------+
This project aims at providing an overview of how to use ØMQ brokerless
messaging facilities do dispach log entries to various channels.
This is a toy project developed as I learn ØMQ and gevent,
DO NOT USE IN PRODUCTION.
Any feedback appreciated : [email protected] @gwik
"app"'s can be any python process that log messages using python
stdlib logging module. Log messages are dispatched using
PUSH/PULL to a dispatcher that broadcast the messages to subscribers.
Suscribers can send the messages to a IRC channel, print them on the
console or store them in a couch database.
You'll need the following packages installed :
gevent gevent_zeromq couchdbkit
You can start the different process as follow :
python2.7 zeromqlogger.py log|dispatcher|couchsub|printersub
You can start as many `log` as you want.
Tested with python2.7 on MAC OS X 10.6.
Copyleft!
"""
from gevent_zeromq import zmq
from gevent import monkey
import gevent.queue
monkey.patch_all()
import gevent.pool
import logging
import socket
import json
from datetime import datetime
from restkit.globals import set_manager
from restkit.manager.mgevent import GeventManager
# set the gevent connection manager
set_manager(GeventManager())
from couchdbkit import Server
from couchdbkit.schema import properties, Document
class Handler(logging.Handler):
""" A logging handler for sending notifications to a ømq PUSH.
"""
def __init__(self, address,
pool_size=5, level=logging.NOTSET):
context = zmq.Context()
self.hostname = socket.gethostname()
self.socket = context.socket(zmq.PUSH)
self.context = self.socket.context
self.socket.connect(address)
# channel queue, put always blocks until delivered
self.channel = gevent.queue.Queue(0)
self._job = gevent.spawn(self.__send)
super(Handler, self).__init__(level)
def createLock(self):
pass
def acquire(self):
pass
def release(self):
pass
def flush(self):
pass
def close(self):
self._job.kill(timeout=2)
def emit(self, record):
self.channel.put(record)
def __send(self):
while True:
record = self.channel.get()
message = record.__dict__
message['hostname'] = self.hostname
self.socket.send_json(message)
class Dispatcher(gevent.Greenlet):
""" PULL for messages and PUBlish them to SUBscribers.
The pulling and publishing is happening in there own separate
greenlet.
They communicate via a channel queue.
"""
def __init__(self, pull_address, publish_address):
super(Dispatcher, self).__init__()
self.context = zmq.Context()
self.pull_socket = self.context.socket(zmq.PULL)
self.pull_socket.bind(pull_address)
context = zmq.Context()
self.publish_socket = context.socket(zmq.PUB)
self.publish_socket.bind(publish_address)
self.channel = gevent.queue.Queue(0)
def _run(self):
self._pull_job = gevent.spawn(self.__pull)
self._publish_job = gevent.spawn(self.__publish)
self._pull_job.join()
self._publish_job.join()
def __pull(self):
while True:
info = self.pull_socket.recv_json()
self.channel.put(info)
def __publish(self):
while True:
info = self.channel.get()
self.publish_socket.send_multipart([
info['name'].encode('utf-8'),
json.dumps(info)])
class PrintSubscriber(gevent.Greenlet):
""" Subscribe to dispatcher and print on console standard output
"""
def __init__(self, address, topic=''):
super(PrintSubscriber, self).__init__()
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
self.socket.setsockopt(zmq.SUBSCRIBE, topic)
self.topic = topic
self.socket.connect(address)
def _run(self):
while True:
topic, info = self.socket.recv_multipart()
info = json.loads(info)
print "topic %s/%s [%d] %s" % (
self.topic, topic, info['process'], info['msg'])
class LogEntry(Document):
relativeCreated = properties.FloatProperty()
msecs = properties.FloatProperty()
args = properties.StringListProperty()
name = properties.StringProperty()
thread = properties.IntegerProperty()
created = properties.DateTimeProperty()
process = properties.IntegerProperty()
threadNam = properties.StringProperty()
module = properties.StringProperty()
filename = properties.StringProperty()
levelno = properties.IntegerProperty()
processName = properties.StringProperty()
pathname = properties.StringProperty()
lineno = properties.IntegerProperty()
exc_text = properties.StringProperty()
exc_info = properties.StringProperty()
funcName = properties.StringProperty()
hostname = properties.StringProperty()
levelname = properties.StringProperty()
msg = properties.StringProperty()
class CouchSubscriber(gevent.Greenlet):
""" Subscriber that stores messages in a couch db.
"""
def __init__(self, couch, address):
super(CouchSubscriber, self).__init__()
self.server = couch
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
self.socket.setsockopt(zmq.SUBSCRIBE, '')
self.socket.connect(address)
def _run(self):
while True:
topic, info = self.socket.recv_multipart()
info = json.loads(info)
info['created'] = datetime.fromtimestamp(info['created'])
doc = LogEntry(**info)
doc.save()
def run_couch_sub(address):
couch = Server()
db = couch.get_or_create_db('log')
LogEntry.set_db(db)
job = CouchSubscriber(couch, pub_address)
job.start()
return job
def run_logger(log_address):
import logging.config
config = {
'version': 1,
'handlers': {
'zmq': {
'class': '__main__.Handler',
'level': 'DEBUG',
'address': log_address
}
},
'root': {
'level': 'DEBUG',
'handlers': ['zmq']
},
}
logging.config.dictConfig(config)
def log(topic, wait=1):
logger = logging.getLogger(topic)
logger.setLevel(logging.DEBUG)
while True:
logger.info('some info behind logged')
gevent.sleep(wait)
return gevent.spawn(log, __name__, wait=1)
if __name__ == '__main__':
import sys
name = sys.argv.pop()
log_address = 'ipc:///tmp/zmqlog'
pub_address = 'ipc:///tmp/logpub'
if name == 'log':
print "starting example logging application..."
job = run_logger(log_address)
job.join()
elif name == 'dispatcher':
print "starting dispatcher..."
job = Dispatcher(log_address, pub_address)
job.start()
job.join()
elif name == 'printersub':
print "starting printer subscriber..."
job = PrintSubscriber(pub_address)
job.start()
job.join()
elif name == 'couchsub':
print "starting couchdb subscriber..."
job = run_couch_sub(pub_address)
job.join()
else:
print "invalid usage : log|dispatcher|couchsub|printersub"
exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment