Created
January 16, 2013 07:41
-
-
Save ionrock/4545327 to your computer and use it in GitHub Desktop.
Some code to turn on and off logging to the pub/sub socket in circus as well as being able to log to the stdout of the circusd process. I've included a example config with details on how to turn on/off sending stdout/err to the pub/sub socket as well as a plugin to read a watchers stdout/err from the pub/sub socket. The config format is just a f…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Changes in HEAD | |
Modified circus/config.py | |
diff --git a/circus/config.py b/circus/config.py | |
index abce19a..d789545 100644 | |
--- a/circus/config.py | |
+++ b/circus/config.py | |
@@ -28,8 +28,7 @@ def watcher_defaults(): | |
'max_retry': 5, | |
'graceful_timeout': 30, | |
'rlimits': dict(), | |
- 'stderr_stream': dict(), | |
- 'stdout_stream': dict(), | |
+ 'publish': dict(), | |
'priority': 0, | |
'use_sockets': False, | |
'singleton': False, | |
@@ -181,10 +180,9 @@ def get_config(config_file): | |
elif opt == 'graceful_timout': | |
watcher['graceful_timeout'] = dget( | |
section, "graceful_timeout", 30, int) | |
- elif opt.startswith('stderr_stream') or \ | |
- opt.startswith('stdout_stream'): | |
+ elif opt.startswith('publish'): | |
stream_name, stream_opt = opt.split(".", 1) | |
- watcher[stream_name][stream_opt] = val | |
+ watcher['publish'][stream_opt] = val | |
elif opt.startswith('rlimit_'): | |
limit = opt[7:] | |
watcher['rlimits'][limit] = int(val) | |
@@ -221,7 +219,8 @@ def get_config(config_file): | |
watcher['env'] = parse_env_str(val) | |
elif opt == 'autostart': | |
- watcher['autostart'] = dget(section, "autostart", True, bool) | |
+ watcher['autostart'] = dget(section, "autostart", | |
+ True, bool) | |
else: | |
# freeform | |
Modified circus/stream/__init__.py | |
diff --git a/circus/stream/__init__.py b/circus/stream/__init__.py | |
index 188172b..9cf5d8e 100644 | |
--- a/circus/stream/__init__.py | |
+++ b/circus/stream/__init__.py | |
@@ -33,6 +33,28 @@ class StdoutStream(object): | |
pass | |
+class Publisher(object): | |
+ | |
+ def __init__(self, notify=None, topic=None, **conf): | |
+ self.topic = topic or 'output' | |
+ self.notify = notify | |
+ self.conf = conf | |
+ | |
+ # See if we output to circusd's stdout | |
+ self.to_stdout = topic in conf and conf[topic] | |
+ | |
+ # See if we publish the stream to the socket | |
+ self.to_socket = conf.get('streams', []) and topic in conf['streams'] | |
+ | |
+ def __call__(self, data): | |
+ if self.to_stdout: | |
+ sys.stdout.write(data['data']) | |
+ sys.stdout.flush() | |
+ | |
+ if self.to_socket: | |
+ self.notify(self.topic, data['data']) | |
+ | |
+ | |
class FancyStdoutStream(StdoutStream): | |
""" | |
Write output from watchers using different colors along with a | |
@@ -108,53 +130,3 @@ class FancyStdoutStream(StdoutStream): | |
# stop coloring | |
self.out.write('\033[0m\n') | |
self.out.flush() | |
- | |
- | |
-def get_stream(conf): | |
- if not conf: | |
- return conf | |
- | |
- # we can have 'stream' or 'class' or 'filename' | |
- if 'filename' in conf: | |
- inst = FileStream(**conf) | |
- elif 'stream' in conf: | |
- inst = conf['stream'] | |
- elif 'class' in conf: | |
- class_name = conf.pop('class') | |
- if not "." in class_name: | |
- class_name = "circus.stream.%s" % class_name | |
- inst = resolve_name(class_name)(**conf) | |
- else: | |
- raise ValueError("stream configuration invalid") | |
- | |
- # default refresh_time | |
- refresh_time = float(conf.get('refresh_time', 0.3)) | |
- | |
- return {'stream': inst, 'refresh_time': refresh_time} | |
- | |
- | |
-def get_pipe_redirector(redirect, extra_info=None, buffer=1024, loop=None): | |
- """Redirects data received in pipes to the redirect callable. | |
- | |
- The data is a mapping with a **data** key containing the data | |
- received from the pipe, extended with all values passed in | |
- **extra_info** | |
- | |
- Options: | |
- - **redirect**: the callable to send data to | |
- - **extra_info**: a mapping of values to add to each call | |
- - **buffer**: the size of the buffer when reading data | |
- - **loop**: the ioloop to use. If not provided will use the | |
- global IOLoop | |
- """ | |
- # XXX backend is deprecated | |
- | |
- # get stream infos | |
- if 'stream' not in redirect: | |
- return | |
- | |
- stream = redirect.get('stream') | |
- refresh_time = redirect.get('refresh_time', 0.3) | |
- | |
- # finally setup the redirection | |
- return Redirector(stream, refresh_time, extra_info, buffer, loop=loop) | |
Modified circus/watcher.py | |
diff --git a/circus/watcher.py b/circus/watcher.py | |
index 7f6c7f7..e08e638 100644 | |
--- a/circus/watcher.py | |
+++ b/circus/watcher.py | |
@@ -1,4 +1,3 @@ | |
-import copy | |
import errno | |
import os | |
import signal | |
@@ -13,7 +12,7 @@ from zmq.eventloop import ioloop | |
from circus.process import Process, DEAD_OR_ZOMBIE, UNEXISTING | |
from circus import logger | |
from circus import util | |
-from circus.stream import get_pipe_redirector, get_stream | |
+from circus.stream import Redirector, Publisher | |
from circus.util import parse_env_dict, resolve_name | |
@@ -151,9 +150,8 @@ class Watcher(object): | |
def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0., | |
working_dir=None, shell=False, uid=None, max_retry=5, | |
gid=None, send_hup=False, env=None, stopped=True, | |
- graceful_timeout=30., prereload_fn=None, | |
- rlimits=None, executable=None, stdout_stream=None, | |
- stderr_stream=None, priority=0, loop=None, | |
+ graceful_timeout=30., prereload_fn=None, rlimits=None, | |
+ executable=None, publish=None, priority=0, loop=None, | |
singleton=False, use_sockets=False, copy_env=False, | |
copy_path=False, max_age=0, max_age_variance=30, | |
hooks=None, respawn=True, autostart=True, **options): | |
@@ -170,11 +168,8 @@ class Watcher(object): | |
self.prereload_fn = prereload_fn | |
self.executable = None | |
self.priority = priority | |
- self.stdout_stream_conf = copy.copy(stdout_stream) | |
- self.stderr_stream_conf = copy.copy(stderr_stream) | |
- self.stdout_stream = get_stream(self.stdout_stream_conf) | |
- self.stderr_stream = get_stream(self.stderr_stream_conf) | |
- self.stdout_redirector = self.stderr_redirector = None | |
+ self.publish = publish or {} | |
+ self.stdout = self.stderr = None | |
self.max_retry = max_retry | |
self._options = options | |
self.singleton = singleton | |
@@ -284,6 +279,19 @@ class Watcher(object): | |
self.sockets = sockets | |
self.arbiter = arbiter | |
+ # default refresh_time | |
+ refresh_time = float(0.3) | |
+ | |
+ self.stdout = Redirector( | |
+ Publisher(self.notify_event, topic='stdout', **self.publish), | |
+ float(self.publish.get('refresh_time', refresh_time)), | |
+ loop=self.loop) | |
+ | |
+ self.stderr = Redirector( | |
+ Publisher(self.notify_event, topic='stderr', **self.publish), | |
+ float(self.publish.get('refresh_time', refresh_time)), | |
+ loop=self.loop) | |
+ | |
def __len__(self): | |
return len(self.processes) | |
@@ -427,16 +435,13 @@ class Watcher(object): | |
executable=self.executable, | |
use_fds=self.use_sockets, watcher=self) | |
- # stream stderr/stdout if configured | |
- if self.stdout_redirector is not None: | |
- self.stdout_redirector.add_redirection('stdout', | |
- process, | |
- process.stdout) | |
+ self.stdout.add_redirection('stdout', | |
+ process, | |
+ process.stdout) | |
- if self.stderr_redirector is not None: | |
- self.stderr_redirector.add_redirection('stderr', | |
- process, | |
- process.stderr) | |
+ self.stderr.add_redirection('stderr', | |
+ process, | |
+ process.stderr) | |
self.processes[process.pid] = process | |
logger.debug('running %s process [pid %d]', self.name, | |
@@ -459,11 +464,11 @@ class Watcher(object): | |
"""Kill process. | |
""" | |
# remove redirections | |
- if self.stdout_redirector is not None: | |
- self.stdout_redirector.remove_redirection('stdout', process) | |
+ if self.stdout is not None: | |
+ self.stdout.remove_redirection('stdout', process) | |
- if self.stderr_redirector is not None: | |
- self.stderr_redirector.remove_redirection('stderr', process) | |
+ if self.stderr is not None: | |
+ self.stderr.remove_redirection('stderr', process) | |
logger.debug("%s: kill process %s", self.name, process.pid) | |
try: | |
@@ -552,11 +557,11 @@ class Watcher(object): | |
""" | |
logger.debug('stopping the %s watcher' % self.name) | |
# stop redirectors | |
- if self.stdout_redirector is not None: | |
- self.stdout_redirector.kill() | |
+ if self.stdout is not None: | |
+ self.stdout.kill() | |
- if self.stderr_redirector is not None: | |
- self.stderr_redirector.kill() | |
+ if self.stderr is not None: | |
+ self.stderr.kill() | |
limit = time.time() + self.graceful_timeout | |
@@ -627,7 +632,7 @@ class Watcher(object): | |
self.stopped = True | |
return False | |
- self._create_redirectors() | |
+ # self._create_redirectors() | |
self.reap_processes() | |
self.spawn_processes() | |
@@ -636,11 +641,11 @@ class Watcher(object): | |
self.stop() | |
return False | |
- if self.stdout_redirector is not None: | |
- self.stdout_redirector.start() | |
+ if self.stdout is not None: | |
+ self.stdout.start() | |
- if self.stderr_redirector is not None: | |
- self.stderr_redirector.start() | |
+ if self.stderr is not None: | |
+ self.stderr.start() | |
logger.info('%s started' % self.name) | |
self.notify_event("start", {"time": time.time()}) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[circus] | |
endpoint = tcp://127.0.0.1:5555 | |
stats_endpoint = tcp://127.0.0.1:5557 | |
[watcher:hello] | |
cmd = python cp.py 7000 | |
copy_path = True | |
copy_env = True | |
# send stdout and stderr to the pub/sub socket | |
publish.streams = stdout, stderr | |
publish.refresh_time = 0.3 | |
# also send stdout/stderr to circusd stdout | |
publish.stdout = True | |
publish.stderr = True | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import sys | |
import json | |
from circus.plugins import CircusPlugin | |
class OutputStream(CircusPlugin): | |
name = 'stream_observer' | |
def __init__(self, *args, **config): | |
super(OutputStream, self).__init__(*args, **config) | |
self.topics = config.get('topics', []) | |
def handle_recv(self, data): | |
topic, message = data | |
if topic in self.topics: | |
sys.stdout.write(json.loads(message)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment