Skip to content

Instantly share code, notes, and snippets.

@ionrock
Created January 16, 2013 07:41
Show Gist options
  • Save ionrock/4545327 to your computer and use it in GitHub Desktop.
Save ionrock/4545327 to your computer and use it in GitHub Desktop.
Some code to turn on and off logging to the pub/sub socket in circus as well as being able to log to the stdout of the circusd process. I've included a example config with details on how to turn on/off sending stdout/err to the pub/sub socket as well as a plugin to read a watchers stdout/err from the pub/sub socket. The config format is just a f…
Changes in HEAD
Modified circus/config.py
diff --git a/circus/config.py b/circus/config.py
index abce19a..d789545 100644
--- a/circus/config.py
+++ b/circus/config.py
@@ -28,8 +28,7 @@ def watcher_defaults():
'max_retry': 5,
'graceful_timeout': 30,
'rlimits': dict(),
- 'stderr_stream': dict(),
- 'stdout_stream': dict(),
+ 'publish': dict(),
'priority': 0,
'use_sockets': False,
'singleton': False,
@@ -181,10 +180,9 @@ def get_config(config_file):
elif opt == 'graceful_timout':
watcher['graceful_timeout'] = dget(
section, "graceful_timeout", 30, int)
- elif opt.startswith('stderr_stream') or \
- opt.startswith('stdout_stream'):
+ elif opt.startswith('publish'):
stream_name, stream_opt = opt.split(".", 1)
- watcher[stream_name][stream_opt] = val
+ watcher['publish'][stream_opt] = val
elif opt.startswith('rlimit_'):
limit = opt[7:]
watcher['rlimits'][limit] = int(val)
@@ -221,7 +219,8 @@ def get_config(config_file):
watcher['env'] = parse_env_str(val)
elif opt == 'autostart':
- watcher['autostart'] = dget(section, "autostart", True, bool)
+ watcher['autostart'] = dget(section, "autostart",
+ True, bool)
else:
# freeform
Modified circus/stream/__init__.py
diff --git a/circus/stream/__init__.py b/circus/stream/__init__.py
index 188172b..9cf5d8e 100644
--- a/circus/stream/__init__.py
+++ b/circus/stream/__init__.py
@@ -33,6 +33,28 @@ class StdoutStream(object):
pass
+class Publisher(object):
+
+ def __init__(self, notify=None, topic=None, **conf):
+ self.topic = topic or 'output'
+ self.notify = notify
+ self.conf = conf
+
+ # See if we output to circusd's stdout
+ self.to_stdout = topic in conf and conf[topic]
+
+ # See if we publish the stream to the socket
+ self.to_socket = conf.get('streams', []) and topic in conf['streams']
+
+ def __call__(self, data):
+ if self.to_stdout:
+ sys.stdout.write(data['data'])
+ sys.stdout.flush()
+
+ if self.to_socket:
+ self.notify(self.topic, data['data'])
+
+
class FancyStdoutStream(StdoutStream):
"""
Write output from watchers using different colors along with a
@@ -108,53 +130,3 @@ class FancyStdoutStream(StdoutStream):
# stop coloring
self.out.write('\033[0m\n')
self.out.flush()
-
-
-def get_stream(conf):
- if not conf:
- return conf
-
- # we can have 'stream' or 'class' or 'filename'
- if 'filename' in conf:
- inst = FileStream(**conf)
- elif 'stream' in conf:
- inst = conf['stream']
- elif 'class' in conf:
- class_name = conf.pop('class')
- if not "." in class_name:
- class_name = "circus.stream.%s" % class_name
- inst = resolve_name(class_name)(**conf)
- else:
- raise ValueError("stream configuration invalid")
-
- # default refresh_time
- refresh_time = float(conf.get('refresh_time', 0.3))
-
- return {'stream': inst, 'refresh_time': refresh_time}
-
-
-def get_pipe_redirector(redirect, extra_info=None, buffer=1024, loop=None):
- """Redirects data received in pipes to the redirect callable.
-
- The data is a mapping with a **data** key containing the data
- received from the pipe, extended with all values passed in
- **extra_info**
-
- Options:
- - **redirect**: the callable to send data to
- - **extra_info**: a mapping of values to add to each call
- - **buffer**: the size of the buffer when reading data
- - **loop**: the ioloop to use. If not provided will use the
- global IOLoop
- """
- # XXX backend is deprecated
-
- # get stream infos
- if 'stream' not in redirect:
- return
-
- stream = redirect.get('stream')
- refresh_time = redirect.get('refresh_time', 0.3)
-
- # finally setup the redirection
- return Redirector(stream, refresh_time, extra_info, buffer, loop=loop)
Modified circus/watcher.py
diff --git a/circus/watcher.py b/circus/watcher.py
index 7f6c7f7..e08e638 100644
--- a/circus/watcher.py
+++ b/circus/watcher.py
@@ -1,4 +1,3 @@
-import copy
import errno
import os
import signal
@@ -13,7 +12,7 @@ from zmq.eventloop import ioloop
from circus.process import Process, DEAD_OR_ZOMBIE, UNEXISTING
from circus import logger
from circus import util
-from circus.stream import get_pipe_redirector, get_stream
+from circus.stream import Redirector, Publisher
from circus.util import parse_env_dict, resolve_name
@@ -151,9 +150,8 @@ class Watcher(object):
def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0.,
working_dir=None, shell=False, uid=None, max_retry=5,
gid=None, send_hup=False, env=None, stopped=True,
- graceful_timeout=30., prereload_fn=None,
- rlimits=None, executable=None, stdout_stream=None,
- stderr_stream=None, priority=0, loop=None,
+ graceful_timeout=30., prereload_fn=None, rlimits=None,
+ executable=None, publish=None, priority=0, loop=None,
singleton=False, use_sockets=False, copy_env=False,
copy_path=False, max_age=0, max_age_variance=30,
hooks=None, respawn=True, autostart=True, **options):
@@ -170,11 +168,8 @@ class Watcher(object):
self.prereload_fn = prereload_fn
self.executable = None
self.priority = priority
- self.stdout_stream_conf = copy.copy(stdout_stream)
- self.stderr_stream_conf = copy.copy(stderr_stream)
- self.stdout_stream = get_stream(self.stdout_stream_conf)
- self.stderr_stream = get_stream(self.stderr_stream_conf)
- self.stdout_redirector = self.stderr_redirector = None
+ self.publish = publish or {}
+ self.stdout = self.stderr = None
self.max_retry = max_retry
self._options = options
self.singleton = singleton
@@ -284,6 +279,19 @@ class Watcher(object):
self.sockets = sockets
self.arbiter = arbiter
+ # default refresh_time
+ refresh_time = float(0.3)
+
+ self.stdout = Redirector(
+ Publisher(self.notify_event, topic='stdout', **self.publish),
+ float(self.publish.get('refresh_time', refresh_time)),
+ loop=self.loop)
+
+ self.stderr = Redirector(
+ Publisher(self.notify_event, topic='stderr', **self.publish),
+ float(self.publish.get('refresh_time', refresh_time)),
+ loop=self.loop)
+
def __len__(self):
return len(self.processes)
@@ -427,16 +435,13 @@ class Watcher(object):
executable=self.executable,
use_fds=self.use_sockets, watcher=self)
- # stream stderr/stdout if configured
- if self.stdout_redirector is not None:
- self.stdout_redirector.add_redirection('stdout',
- process,
- process.stdout)
+ self.stdout.add_redirection('stdout',
+ process,
+ process.stdout)
- if self.stderr_redirector is not None:
- self.stderr_redirector.add_redirection('stderr',
- process,
- process.stderr)
+ self.stderr.add_redirection('stderr',
+ process,
+ process.stderr)
self.processes[process.pid] = process
logger.debug('running %s process [pid %d]', self.name,
@@ -459,11 +464,11 @@ class Watcher(object):
"""Kill process.
"""
# remove redirections
- if self.stdout_redirector is not None:
- self.stdout_redirector.remove_redirection('stdout', process)
+ if self.stdout is not None:
+ self.stdout.remove_redirection('stdout', process)
- if self.stderr_redirector is not None:
- self.stderr_redirector.remove_redirection('stderr', process)
+ if self.stderr is not None:
+ self.stderr.remove_redirection('stderr', process)
logger.debug("%s: kill process %s", self.name, process.pid)
try:
@@ -552,11 +557,11 @@ class Watcher(object):
"""
logger.debug('stopping the %s watcher' % self.name)
# stop redirectors
- if self.stdout_redirector is not None:
- self.stdout_redirector.kill()
+ if self.stdout is not None:
+ self.stdout.kill()
- if self.stderr_redirector is not None:
- self.stderr_redirector.kill()
+ if self.stderr is not None:
+ self.stderr.kill()
limit = time.time() + self.graceful_timeout
@@ -627,7 +632,7 @@ class Watcher(object):
self.stopped = True
return False
- self._create_redirectors()
+ # self._create_redirectors()
self.reap_processes()
self.spawn_processes()
@@ -636,11 +641,11 @@ class Watcher(object):
self.stop()
return False
- if self.stdout_redirector is not None:
- self.stdout_redirector.start()
+ if self.stdout is not None:
+ self.stdout.start()
- if self.stderr_redirector is not None:
- self.stderr_redirector.start()
+ if self.stderr is not None:
+ self.stderr.start()
logger.info('%s started' % self.name)
self.notify_event("start", {"time": time.time()})
[circus]
endpoint = tcp://127.0.0.1:5555
stats_endpoint = tcp://127.0.0.1:5557
[watcher:hello]
cmd = python cp.py 7000
copy_path = True
copy_env = True
# send stdout and stderr to the pub/sub socket
publish.streams = stdout, stderr
publish.refresh_time = 0.3
# also send stdout/stderr to circusd stdout
publish.stdout = True
publish.stderr = True
from __future__ import print_function
import sys
import json
from circus.plugins import CircusPlugin
class OutputStream(CircusPlugin):
name = 'stream_observer'
def __init__(self, *args, **config):
super(OutputStream, self).__init__(*args, **config)
self.topics = config.get('topics', [])
def handle_recv(self, data):
topic, message = data
if topic in self.topics:
sys.stdout.write(json.loads(message))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment