Last active
September 15, 2015 07:53
-
-
Save justinc1/675da187ab0dc7547102 to your computer and use it in GitHub Desktop.
xen-ringwatch scrip, fixed int32/unit32
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
# justinc, autostart VMs after server reboot | |
# Only VMs with tag=autostart, or (in) appliance named autostart-vms | |
LOG=/root/autostart-vm.log | |
function main() { | |
# wait on working xapi | |
echo '************************************************' | |
echo "Start autostart `date`" | |
# xe vm-start uuid=c7e3e810-1265-317b-fa1d-36ce01786b53 | |
while [ 1 ]; do | |
echo "...loop" | |
xe vm-list is-control-domain=false >> /dev/null 2>&1 | |
RET=$? | |
if [ "$RET" == "0" ]; then | |
break | |
fi | |
sleep 1 | |
done | |
sleep 10 | |
echo '----------------' | |
echo 'Starting tagged VMs:' | |
xe vm-list tags=autostart --multiple | |
echo '----------------' | |
xe vm-start tags=autostart --multiple | |
aap_uuid=`xe appliance-list name-label=autostart-vms --minimal` | |
xe appliance-start uuid=${aap_uuid} | |
echo "Done autostart `date`" | |
} | |
main >> $LOG 2>&1 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# | |
# Copyright (C) 2011 Citrix Systems, Inc. | |
# | |
# This library is free software; you can redistribute it and/or modify | |
# it under the terms of version 2.1 of the GNU Lesser General Public | |
# License as published by the Free Software Foundation. | |
# | |
# This library is distributed in the hope that it will be useful, but | |
# WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
# Lesser General Public License for more details. | |
# | |
# You should have received a copy of the GNU Lesser General Public | |
# License along with this library; if not, write to the Free Software | |
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 | |
# USA | |
# | |
"""Overview: | |
- Gather Xen I/O ring states | |
(from %s/*/ring) | |
- Update ring states every -T seconds. | |
- Determine if rings are idle or make progress. | |
- Determine if idle rings dropped notifications (%s). | |
- Instruct stuck backends to reissue notifications. | |
""" | |
import os | |
import glob | |
class Pattern(object): | |
"""A regex pattern. Compiled on demand, then persisted.""" | |
def __init__(self, regex): | |
self.regex = regex | |
self.__pattern = None | |
def get(self): | |
import re | |
if not self.__pattern: | |
self.__pattern = re.compile(self.regex) | |
return self.__pattern | |
def search(self, s): | |
return self.get().search(s) | |
class XenBackend(object): | |
"""A Xen I/O backend.""" | |
SYSFS_BASEDIR = "/sys/devices/xen-backend" | |
def __init__(self, rd, devid): | |
self.rd = int(rd) | |
self.devid = int(devid) | |
def __repr__(self): | |
return "%s(%d, %d)" % (type(self).__name__, | |
self.rd, self.devid) | |
def name(self): | |
raise NotImplementedError | |
def path(self): | |
return "%s/%s" % (self.SYSFS_BASEDIR, self.name()) | |
_name_pattern = None | |
@classmethod | |
def from_name(cls, name): | |
match = cls._name_pattern.search(name) | |
if not match: | |
raise Exception, "Malformed %s name: %s" % \ | |
(type(self).__name__, name) | |
rd = match.group(1) | |
devid = match.group(2) | |
return cls(rd, devid) | |
_name_glob = None | |
@classmethod | |
def find(cls): | |
paths = glob.glob("%s/%s" % (cls.SYSFS_BASEDIR, | |
cls._name_glob)) | |
for path in paths: | |
name = os.path.basename(path) | |
yield cls.from_name(name) | |
def find_rings(self): | |
for ring in self.Ring.find(self): | |
yield ring | |
class Ring(object): | |
def __init__(self, backend, name): | |
self.backend = backend | |
self.name = name | |
__size = None | |
def key(self): | |
return "%s/%s" % (self.backend.name(), | |
self.name) | |
def __str__(self): | |
return "%s(%s)" % (type(self).__name__, self.key()) | |
@classmethod | |
def from_name(cls, backend, name): | |
return cls(backend, name) | |
_name_glob = None | |
@classmethod | |
def find(cls, backend): | |
paths = glob.glob("%s/%s" % (backend.path(), | |
cls._name_glob)) | |
for path in paths: | |
name = os.path.basename(path) | |
yield cls.from_name(backend, name) | |
def path(self): | |
return "%s/%s" % (self.backend.path(), | |
self.name) | |
def read(self): | |
state = RingState.from_sysfs(self.path()) | |
return state | |
def write(self, cmd): | |
f = file(self.path(), 'w') | |
try: | |
f.write(cmd.rstrip()) | |
finally: | |
f.close() | |
def kick(self): | |
self.write("kick") | |
def poll(self): | |
self.write("poll") | |
__ring = None | |
TYPES = {} | |
XEN_BACKEND_NAME = None | |
@classmethod | |
def register(cls): | |
XenBackend.TYPES[cls.XEN_BACKEND_NAME] = cls | |
class VBD(XenBackend): | |
"""Xen blkif backends.""" | |
XEN_BACKEND_NAME = 'vbd' | |
_name_pattern = Pattern("vbd-(\d+)-(\d+)") | |
_name_glob = "vbd-*-*" | |
def name(self): | |
return "vbd-%d-%d" % (self.rd, self.devid) | |
class Ring(XenBackend.Ring): | |
_name_glob = "io_ring" | |
VBD.register() | |
class VIF(XenBackend): | |
"""Xen netif backends.""" | |
XEN_BACKEND_NAME = 'vif' | |
_name_pattern = Pattern("vif-(\d+)-(\d+)") | |
_name_glob = "vif-*-*" | |
def name(self): | |
return "vif-%d-%d" % (self.rd, self.devid) | |
class Ring(XenBackend.Ring): | |
_name_glob = "{rx,tx}_ring" | |
#VIF.register() | |
class RingState(object): | |
"""Overall backend ring state. Comprising req and rsp queue | |
indexes, and analysis.""" | |
def __init__(self, size, req, rsp): | |
self.size = int(size) | |
self.req = req | |
self.rsp = rsp | |
_size_pattern = Pattern("nr_ents (\d+)") | |
@classmethod | |
def from_sysfs(cls, path): | |
f = file(path, "r") | |
try: | |
s = f.read() | |
finally: | |
f.close() | |
try: | |
(_nr_ents, _req, _rsp, _) = s.split("\n") | |
match = cls._size_pattern.search(_nr_ents) | |
nr_ents = int(match.group(1)) | |
except Exception, e: | |
raise Exception, "Malformed %s input: %s (%s)" % \ | |
(cls.__name__, repr(s), str(e)) | |
req = cls.Req.from_sysfs(_req, size=nr_ents) | |
rsp = cls.Rsp.from_sysfs(_rsp, size=nr_ents) | |
return cls(nr_ents, req, rsp) | |
class Queue(dict): | |
def __init__(self, size): | |
self.size = int(size) | |
prod = None | |
@classmethod | |
def from_sysfs(cls, line, **d): | |
match = cls._pattern.search(line) | |
if not match: | |
raise Exception, "Malformed %s input: %s" % \ | |
(cls.__name__, repr(line)) | |
i = iter(match.groups()) | |
for k in i: | |
d[k] = i.next() | |
return cls(**d) | |
def is_consumed(self): | |
return self.prod == self._cons() | |
class Req(Queue): | |
_pattern = Pattern("req (prod) (\d+) (cons) ([-]*\d+) (event) (\d+)") | |
def __init__(self, prod, cons, event, **d): | |
RingState.Queue.__init__(self, **d) | |
self.prod = int(prod) | |
self.cons = int(cons) | |
self.event = int(event) | |
if self.cons < 0: | |
self.cons = 4294967296 + self.cons | |
if self.prod < 0: | |
self.prod = 4294967296 + self.prod | |
if self.event < 0: | |
self.event = 4294967296 + self.event | |
def __repr__(self): | |
return "%s(prod=%d, cons=%d, event=%d)" % \ | |
(type(self).__name__, self.prod, self.cons, self.event) | |
def _cons(self): | |
return self.cons | |
def __eq__(self, other): | |
return \ | |
self.prod == other.prod and \ | |
self.cons == other.cons and \ | |
self.event == other.event | |
class Rsp(Queue): | |
_pattern = Pattern("rsp (prod) (\d+) (pvt) ([-]*\d+) (event) (\d+)") | |
def __init__(self, prod, pvt, event, **d): | |
RingState.Queue.__init__(self, **d) | |
self.prod = int(prod) | |
self.pvt = int(pvt) | |
self.event = int(event) | |
if self.pvt < 0: | |
self.pvt = 4294967296 + self.pvt | |
if self.prod < 0: | |
self.prod = 4294967296 + self.prod | |
if self.event < 0: | |
self.event = 4294967296 + self.event | |
def __repr__(self): | |
return "%s(prod=%d, pvt=%d, event=%d)" % \ | |
(type(self).__name__, self.prod, self.pvt, self.event) | |
def _cons(self): | |
return self.event - 1 | |
def __eq__(self, other): | |
return \ | |
self.prod == other.prod and \ | |
self.pvt == other.pvt and \ | |
self.event == other.event | |
def is_consumed(self): | |
return \ | |
self.rsp.is_consumed() and \ | |
self.req.is_consumed() | |
def is_pending(self): | |
return self.rsp.prod != self.req.prod | |
def kick(self, ring): | |
action = False | |
if not self.req.is_consumed(): | |
action |= True | |
ring.poll() | |
if not self.rsp.is_consumed(): | |
action |= True | |
ring.kick() | |
return action | |
def __eq__(self, other): | |
return \ | |
self.size == other.size and \ | |
self.req == other.req and \ | |
self.rsp == other.rsp | |
def __repr__(self): | |
return "%s(size=%d, %s, %s)" % \ | |
(type(self).__name__, self.size, self.req, self.rsp) | |
def display(self): | |
complete = { True: "complete", False: "pending" } | |
io = complete[not self.is_pending()] | |
req = complete[self.req.is_consumed()] | |
rsp = complete[self.rsp.is_consumed()] | |
return "%s: io: %s, req: %s, rsp: %s" % (self, io, req, rsp) | |
class RingWatch(object): | |
"""State machine watching I/O individual ring state""" | |
_NEW = "_NEW" | |
BUSY = "BUSY" | |
IDLE = "IDLE" | |
STCK = "STCK" | |
COMMENTS = { BUSY: "Message traffic observed (OK)", | |
IDLE: "No messages observed (Ring OK, I/O depends)", | |
STCK: "No pending req/rsp consumer progress observed (BUG)" } | |
def __init__(self, ring, state): | |
self.ring = ring | |
self.state = state | |
self.status = RingWatch._NEW | |
@classmethod | |
def new(cls, ring): | |
state = ring.read() | |
return cls(ring, state) | |
def __str__(self): | |
return "%s(%s)[%s]" % \ | |
(type(self).__name__, self.ring.key(), self.status) | |
def is_stuck(self): | |
return self.status == self.STCK | |
def is_idle(self): | |
return self.status == self.IDLE | |
def kick(self): | |
if self.is_stuck(): | |
return self.state.kick(self.ring) | |
def update(self): | |
prev = self.state | |
curr = self.ring.read() | |
if curr == prev: | |
if not curr.is_consumed(): | |
self.status = self.STCK | |
else: | |
self.status = self.IDLE | |
else: | |
self.status = self.BUSY | |
self.state = curr | |
def display(self): | |
return "%s: %s" % (self, | |
self.state.display()) | |
class WatchList(object): | |
"""Managed collection of I/O rings under surveillance.""" | |
def __init__(self, gen): | |
self.gen = gen | |
self.list = {} | |
def update(self): | |
# NB. clear the watch list, then rebuild it. new entries get | |
# added, existing ones updates, those gone discarded. | |
prev = self.list | |
self.list = {} | |
for ring in self.gen(): | |
key = ring.key() | |
entry = prev.get(key) | |
try: | |
if not entry: | |
entry = RingWatch.new(ring) | |
else: | |
entry.update() | |
except IOError, e: | |
pass | |
# NB. racing unplug, any ring.read() may raise. | |
# nothing left to memorize then. | |
else: | |
self.list[key] = entry | |
def __iter__(self): | |
return self.list.itervalues() | |
def pending(self): | |
for entry in self: | |
if entry.is_idle() and entry.state.is_pending(): | |
yield entry | |
def stuck(self): | |
for entry in self: | |
if entry.is_stuck(): | |
yield entry | |
def kick(self): | |
for entry in self.stuck(): | |
try: | |
entry.kick() | |
except IOError: | |
# NB. racing unplug, any ring.write() may raise. | |
pass | |
if __name__ == '__main__': | |
from sys import argv, stdout, stderr, exit | |
from getopt import gnu_getopt, GetoptError | |
from pprint import pprint | |
DEFAULT_PERIOD = 1 # secs | |
verbose = 0 | |
period = DEFAULT_PERIOD | |
backends = XenBackend.TYPES.values() | |
kick = False | |
iowatch = False | |
OPTIONS = ((('h', 'help'), | |
"Print this help screen."), | |
(('v', 'verbose'), | |
"Increase output verbosity level (use n-times)."), | |
(('I', 'io'), | |
"Watch out for stuck I/O (not messaging), too. (%s)" % \ | |
(iowatch)), | |
(('t', 'types'), | |
"Comma separated list of backend types to watch. (%s)" % \ | |
",".join(map(lambda t: t.XEN_BACKEND_NAME, backends))), | |
(('T', 'period'), | |
"Watch update period. (%d) [secs]" % \ | |
(period)), | |
(('k', 'kick'), | |
"Kick broken guests out of cardiac arrest. (%s)" % \ | |
(kick)) | |
) | |
COMMANDS = {"check": | |
"Single iteration quick test (takes -T seconds)."} | |
def usage(stream): | |
prog = os.path.basename(argv[0]) | |
print >>stream | |
print >>stream, "Usage:" | |
print >>stream, "\t%s [options] {%s}" % (prog, "|".join(COMMANDS)) | |
print >>stream | |
print >>stream, "Commands:" | |
for (name, desc) in COMMANDS.iteritems(): | |
print >>stream, "\t%s: \t%s" % (name, desc) | |
print >>stream | |
print >>stream, "Options:" | |
for ((short, _long), desc) in OPTIONS: | |
print >>stream, "\t-%s, --%s: \t%s" % (short, _long, desc) | |
print >>stream | |
def fail(msg = None): | |
if msg: print >>stderr, "Error: %s" % msg | |
usage(stderr) | |
exit(1) | |
def help(): | |
usage(stdout) | |
print __doc__ % (XenBackend.SYSFS_BASEDIR, RingWatch.STCK) | |
print "Backend Types:" | |
for k, v in XenBackend.TYPES.iteritems(): | |
print "\t%s: \t%s (%s)" % (k, v.__doc__, v._name_glob) | |
print "Ring States:" | |
for k, v in RingWatch.COMMENTS.iteritems(): | |
print "\t%s: \t%s" % (k, v) | |
try: | |
opts, args = gnu_getopt(argv[1:], | |
"hIkt:vT:", | |
["help", | |
"io", | |
"kick", | |
"type=", | |
"verbose", | |
"period="]) | |
except GetoptError, e: | |
fail(str(e)) | |
for (o, arg) in opts: | |
try: | |
if o in ('-h', '--help'): | |
help() | |
exit(0) | |
elif o in ['-v', '--verbose']: | |
verbose += 1 | |
elif o in ['-I', '--io']: | |
iowatch = True | |
elif o in ('-T', '--period'): | |
period = int(arg) | |
elif o in ('-t', '--type'): | |
backends = ",".split(arg) | |
backends = map(lambda t: XenBackend.TYPES[t], backends) | |
elif o in ('-k', '--kick'): | |
kick = True | |
else: | |
raise "BUG: option %s unhandled." % o | |
except ValueError: | |
fail("%s: invalid argument '%s'." % (o, arg)) | |
try: | |
cmd = args[0] | |
except IndexError: | |
fail("Missing command.") | |
def ring_select(): | |
for _type in backends: | |
for backend in _type.find(): | |
for ring in backend.find_rings(): | |
yield ring | |
def show(entries): | |
for watch in entries: | |
print watch.display() | |
def pause(): | |
import time | |
time.sleep(period) | |
watches = WatchList(ring_select) | |
if cmd == "check": | |
# init | |
watches.update() | |
if verbose >= 2: | |
show(watches) | |
# watch for one round | |
pause() | |
watches.update() | |
# show result | |
crit = list(watches.stuck()) | |
stuck = bool(crit) | |
if (iowatch): | |
crit.extend(watches.pending()) | |
if verbose >= 1: | |
show(watches) | |
elif crit: | |
show(crit) | |
if stuck and kick: | |
# deal with it | |
watches.kick() | |
else: | |
fail("Invalid command.") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# | |
# Copyright (C) 2011 Citrix Systems, Inc. | |
# | |
# This library is free software; you can redistribute it and/or modify | |
# it under the terms of version 2.1 of the GNU Lesser General Public | |
# License as published by the Free Software Foundation. | |
# | |
# This library is distributed in the hope that it will be useful, but | |
# WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
# Lesser General Public License for more details. | |
# | |
# You should have received a copy of the GNU Lesser General Public | |
# License along with this library; if not, write to the Free Software | |
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 | |
# USA | |
# | |
"""Overview: | |
- Gather Xen I/O ring states | |
(from %s/*/ring) | |
- Update ring states every -T seconds. | |
- Determine if rings are idle or make progress. | |
- Determine if idle rings dropped notifications (%s). | |
- Instruct stuck backends to reissue notifications. | |
""" | |
import os | |
import glob | |
class Pattern(object): | |
"""A regex pattern. Compiled on demand, then persisted.""" | |
def __init__(self, regex): | |
self.regex = regex | |
self.__pattern = None | |
def get(self): | |
import re | |
if not self.__pattern: | |
self.__pattern = re.compile(self.regex) | |
return self.__pattern | |
def search(self, s): | |
return self.get().search(s) | |
class XenBackend(object): | |
"""A Xen I/O backend.""" | |
SYSFS_BASEDIR = "/sys/bus/xen-backend/devices" | |
SHM_BASEDIR = "/dev/shm" | |
def __init__(self, rd, devid): | |
self.rd = int(rd) | |
self.devid = int(devid) | |
def __repr__(self): | |
return "%s(%d, %d)" % (type(self).__name__, | |
self.rd, self.devid) | |
def name(self): | |
raise NotImplementedError | |
def path(self): | |
return "%s/%s" % (self.SYSFS_BASEDIR, self.name()) | |
_name_pattern = None | |
@classmethod | |
def from_name(cls, name): | |
match = cls._name_pattern.search(name) | |
if not match: | |
raise Exception, "Malformed %s name: %s" % \ | |
(type(self).__name__, name) | |
rd = match.group(1) | |
devid = match.group(2) | |
return cls(rd, devid) | |
_name_glob = None | |
@classmethod | |
def find(cls): | |
paths = glob.glob("%s/%s" % (cls.SYSFS_BASEDIR, | |
cls._name_glob)) + \ | |
glob.glob("%s/%s" % (cls.SHM_BASEDIR, | |
cls._name_glob)) | |
for path in paths: | |
name = os.path.basename(path) | |
yield cls.from_name(name) | |
def find_rings(self): | |
for ring in self.Ring.find(self): | |
yield ring | |
class Ring(object): | |
def __init__(self, backend, name): | |
self.backend = backend | |
self.name = name | |
__size = None | |
def key(self): | |
return "%s/%s" % (self.backend.name(), | |
self.name) | |
def __str__(self): | |
return "%s(%s)" % (type(self).__name__, self.key()) | |
@classmethod | |
def from_name(cls, backend, name): | |
return cls(backend, name) | |
_name_glob = None | |
@classmethod | |
def find(cls, backend): | |
paths = glob.glob("%s/%s" % (backend.path(), | |
cls._name_glob)) | |
for path in paths: | |
name = os.path.basename(path) | |
yield cls.from_name(backend, name) | |
def path(self): | |
return "%s/%s" % (self.backend.path(), | |
self.name) | |
def read(self): | |
state = RingState.from_sysfs(self.path()) | |
return state | |
def write(self, cmd): | |
f = file(self.path(), 'w') | |
try: | |
f.write(cmd.rstrip()) | |
finally: | |
f.close() | |
def kick(self): | |
self.write("kick") | |
def poll(self): | |
self.write("poll") | |
__ring = None | |
TYPES = {} | |
XEN_BACKEND_NAME = None | |
@classmethod | |
def register(cls): | |
XenBackend.TYPES[cls.XEN_BACKEND_NAME] = cls | |
class VBD(XenBackend): | |
"""Xen blkif backends.""" | |
XEN_BACKEND_NAME = 'vbd' | |
_name_pattern = Pattern("vbd-(\d+)-(\d+)") | |
_name_glob = "vbd-*-*" | |
def name(self): | |
return "vbd-%d-%d" % (self.rd, self.devid) | |
class Ring(XenBackend.Ring): | |
_name_glob = "io_ring" | |
VBD.register() | |
class VBD3(XenBackend): | |
"""blktap3 backends.""" | |
XEN_BACKEND_NAME = 'vbd3' | |
_name_pattern = Pattern("vbd3-(\d+)-(\d+)") | |
_name_glob = "vbd3-*-*" | |
def path(self): | |
return os.path.join("/dev/shm", self.name()) | |
def name(self): | |
return "vbd3-%d-%d" % (self.rd, self.devid) | |
class Ring(XenBackend.Ring): | |
_name_glob = "io_ring" | |
def write(self, cmd): | |
pass | |
VBD3.register() | |
class VIF(XenBackend): | |
"""Xen netif backends.""" | |
XEN_BACKEND_NAME = 'vif' | |
_name_pattern = Pattern("vif-(\d+)-(\d+)") | |
_name_glob = "vif-*-*" | |
def name(self): | |
return "vif-%d-%d" % (self.rd, self.devid) | |
class Ring(XenBackend.Ring): | |
_name_glob = "{rx,tx}_ring" | |
#VIF.register() | |
class RingState(object): | |
"""Overall backend ring state. Comprising req and rsp queue | |
indexes, and analysis.""" | |
def __init__(self, size, req, rsp): | |
self.size = int(size) | |
self.req = req | |
self.rsp = rsp | |
_size_pattern = Pattern("nr_ents (\d+)") | |
@classmethod | |
def from_sysfs(cls, path): | |
f = file(path, "r") | |
try: | |
s = f.read() | |
finally: | |
f.close() | |
try: | |
(_nr_ents, _req, _rsp, _) = s.split("\n") | |
match = cls._size_pattern.search(_nr_ents) | |
nr_ents = int(match.group(1)) | |
except Exception, e: | |
raise Exception, "Malformed %s input: %s (%s)" % \ | |
(cls.__name__, repr(s), str(e)) | |
req = cls.Req.from_sysfs(_req, size=nr_ents) | |
rsp = cls.Rsp.from_sysfs(_rsp, size=nr_ents) | |
return cls(nr_ents, req, rsp) | |
class Queue(dict): | |
def __init__(self, size): | |
self.size = int(size) | |
prod = None | |
@classmethod | |
def from_sysfs(cls, line, **d): | |
match = cls._pattern.search(line) | |
if not match: | |
raise Exception, "Malformed %s input: %s" % \ | |
(cls.__name__, repr(line)) | |
i = iter(match.groups()) | |
for k in i: | |
d[k] = i.next() | |
return cls(**d) | |
def is_consumed(self): | |
return self.prod == self._cons() | |
class Req(Queue): | |
_pattern = Pattern("req (prod) (\d+) (cons) ([-]*\d+) (event) (\d+)") | |
def __init__(self, prod, cons, event, **d): | |
RingState.Queue.__init__(self, **d) | |
self.prod = int(prod) | |
self.cons = int(cons) | |
self.event = int(event) | |
if self.cons < 0: | |
self.cons = 4294967296 + self.cons | |
if self.prod < 0: | |
self.prod = 4294967296 + self.prod | |
if self.event < 0: | |
self.event = 4294967296 + self.event | |
def __repr__(self): | |
return "%s(prod=%d, cons=%d, event=%d)" % \ | |
(type(self).__name__, self.prod, self.cons, self.event) | |
def _cons(self): | |
return self.cons | |
def __eq__(self, other): | |
return \ | |
self.prod == other.prod and \ | |
self.cons == other.cons and \ | |
self.event == other.event | |
class Rsp(Queue): | |
_pattern = Pattern("rsp (prod) (\d+) (pvt) ([-]*\d+) (event) (\d+)") | |
def __init__(self, prod, pvt, event, **d): | |
RingState.Queue.__init__(self, **d) | |
self.prod = int(prod) | |
self.pvt = int(pvt) | |
self.event = int(event) | |
if self.pvt < 0: | |
self.pvt = 4294967296 + self.pvt | |
if self.prod < 0: | |
self.prod = 4294967296 + self.prod | |
if self.event < 0: | |
self.event = 4294967296 + self.event | |
def __repr__(self): | |
return "%s(prod=%d, pvt=%d, event=%d)" % \ | |
(type(self).__name__, self.prod, self.pvt, self.event) | |
def _cons(self): | |
return self.event - 1 | |
def __eq__(self, other): | |
return \ | |
self.prod == other.prod and \ | |
self.pvt == other.pvt and \ | |
self.event == other.event | |
def is_consumed(self): | |
return \ | |
self.rsp.is_consumed() and \ | |
self.req.is_consumed() | |
def is_pending(self): | |
return self.rsp.prod != self.req.prod | |
def kick(self, ring): | |
action = False | |
if not self.req.is_consumed(): | |
action |= True | |
ring.poll() | |
if not self.rsp.is_consumed(): | |
action |= True | |
ring.kick() | |
return action | |
def __eq__(self, other): | |
return \ | |
self.size == other.size and \ | |
self.req == other.req and \ | |
self.rsp == other.rsp | |
def __repr__(self): | |
return "%s(size=%d, %s, %s)" % \ | |
(type(self).__name__, self.size, self.req, self.rsp) | |
def display(self): | |
complete = { True: "complete", False: "pending" } | |
io = complete[not self.is_pending()] | |
req = complete[self.req.is_consumed()] | |
rsp = complete[self.rsp.is_consumed()] | |
return "%s: io: %s, req: %s, rsp: %s" % (self, io, req, rsp) | |
class RingWatch(object): | |
"""State machine watching I/O individual ring state""" | |
_NEW = "_NEW" | |
BUSY = "BUSY" | |
IDLE = "IDLE" | |
STCK = "STCK" | |
COMMENTS = { BUSY: "Message traffic observed (OK)", | |
IDLE: "No messages observed (Ring OK, I/O depends)", | |
STCK: "No pending req/rsp consumer progress observed (BUG)" } | |
def __init__(self, ring, state): | |
self.ring = ring | |
self.state = state | |
self.status = RingWatch._NEW | |
@classmethod | |
def new(cls, ring): | |
state = ring.read() | |
return cls(ring, state) | |
def __str__(self): | |
return "%s(%s)[%s]" % \ | |
(type(self).__name__, self.ring.key(), self.status) | |
def is_stuck(self): | |
return self.status == self.STCK | |
def is_idle(self): | |
return self.status == self.IDLE | |
def kick(self): | |
if self.is_stuck(): | |
return self.state.kick(self.ring) | |
def update(self): | |
prev = self.state | |
curr = self.ring.read() | |
if curr == prev: | |
if not curr.is_consumed(): | |
self.status = self.STCK | |
else: | |
self.status = self.IDLE | |
else: | |
self.status = self.BUSY | |
self.state = curr | |
def display(self): | |
return "%s: %s" % (self, | |
self.state.display()) | |
class WatchList(object): | |
"""Managed collection of I/O rings under surveillance.""" | |
def __init__(self, gen): | |
self.gen = gen | |
self.list = {} | |
def update(self): | |
# NB. clear the watch list, then rebuild it. new entries get | |
# added, existing ones updates, those gone discarded. | |
prev = self.list | |
self.list = {} | |
for ring in self.gen(): | |
key = ring.key() | |
entry = prev.get(key) | |
try: | |
if not entry: | |
entry = RingWatch.new(ring) | |
else: | |
entry.update() | |
except IOError, e: | |
pass | |
# NB. racing unplug, any ring.read() may raise. | |
# nothing left to memorize then. | |
else: | |
self.list[key] = entry | |
def __iter__(self): | |
return self.list.itervalues() | |
def pending(self): | |
for entry in self: | |
if entry.is_idle() and entry.state.is_pending(): | |
yield entry | |
def stuck(self): | |
for entry in self: | |
if entry.is_stuck(): | |
yield entry | |
def kick(self): | |
for entry in self.stuck(): | |
try: | |
entry.kick() | |
except IOError: | |
# NB. racing unplug, any ring.write() may raise. | |
pass | |
if __name__ == '__main__': | |
from sys import argv, stdout, stderr, exit | |
from getopt import gnu_getopt, GetoptError | |
from pprint import pprint | |
DEFAULT_PERIOD = 1 # secs | |
verbose = 0 | |
period = DEFAULT_PERIOD | |
backends = XenBackend.TYPES.values() | |
kick = False | |
iowatch = False | |
OPTIONS = ((('h', 'help'), | |
"Print this help screen."), | |
(('v', 'verbose'), | |
"Increase output verbosity level (use n-times)."), | |
(('I', 'io'), | |
"Watch out for stuck I/O (not messaging), too. (%s)" % \ | |
(iowatch)), | |
(('t', 'types'), | |
"Comma separated list of backend types to watch. (%s)" % \ | |
",".join(map(lambda t: t.XEN_BACKEND_NAME, backends))), | |
(('T', 'period'), | |
"Watch update period. (%d) [secs]" % \ | |
(period)), | |
(('k', 'kick'), | |
"Kick broken guests out of cardiac arrest. (%s)" % \ | |
(kick)) | |
) | |
COMMANDS = {"check": | |
"Single iteration quick test (takes -T seconds)."} | |
def usage(stream): | |
prog = os.path.basename(argv[0]) | |
print >>stream | |
print >>stream, "Usage:" | |
print >>stream, "\t%s [options] {%s}" % (prog, "|".join(COMMANDS)) | |
print >>stream | |
print >>stream, "Commands:" | |
for (name, desc) in COMMANDS.iteritems(): | |
print >>stream, "\t%s: \t%s" % (name, desc) | |
print >>stream | |
print >>stream, "Options:" | |
for ((short, _long), desc) in OPTIONS: | |
print >>stream, "\t-%s, --%s: \t%s" % (short, _long, desc) | |
print >>stream | |
def fail(msg = None): | |
if msg: print >>stderr, "Error: %s" % msg | |
usage(stderr) | |
exit(1) | |
def help(): | |
usage(stdout) | |
print __doc__ % (XenBackend.SYSFS_BASEDIR, RingWatch.STCK) | |
print "Backend Types:" | |
for k, v in XenBackend.TYPES.iteritems(): | |
print "\t%s: \t%s (%s)" % (k, v.__doc__, v._name_glob) | |
print "Ring States:" | |
for k, v in RingWatch.COMMENTS.iteritems(): | |
print "\t%s: \t%s" % (k, v) | |
try: | |
opts, args = gnu_getopt(argv[1:], | |
"hIkt:vT:", | |
["help", | |
"io", | |
"kick", | |
"type=", | |
"verbose", | |
"period="]) | |
except GetoptError, e: | |
fail(str(e)) | |
for (o, arg) in opts: | |
try: | |
if o in ('-h', '--help'): | |
help() | |
exit(0) | |
elif o in ['-v', '--verbose']: | |
verbose += 1 | |
elif o in ['-I', '--io']: | |
iowatch = True | |
elif o in ('-T', '--period'): | |
period = int(arg) | |
elif o in ('-t', '--type'): | |
backends = ",".split(arg) | |
backends = map(lambda t: XenBackend.TYPES[t], backends) | |
elif o in ('-k', '--kick'): | |
kick = True | |
else: | |
raise "BUG: option %s unhandled." % o | |
except ValueError: | |
fail("%s: invalid argument '%s'." % (o, arg)) | |
try: | |
cmd = args[0] | |
except IndexError: | |
fail("Missing command.") | |
def ring_select(): | |
for _type in backends: | |
for backend in _type.find(): | |
for ring in backend.find_rings(): | |
yield ring | |
def show(entries): | |
for watch in entries: | |
print watch.display() | |
def pause(): | |
import time | |
time.sleep(period) | |
watches = WatchList(ring_select) | |
if cmd == "check": | |
# init | |
watches.update() | |
if verbose >= 2: | |
show(watches) | |
# watch for one round | |
pause() | |
watches.update() | |
# show result | |
crit = list(watches.stuck()) | |
stuck = bool(crit) | |
if (iowatch): | |
crit.extend(watches.pending()) | |
if verbose >= 1: | |
show(watches) | |
elif crit: | |
show(crit) | |
if stuck and kick: | |
# deal with it | |
watches.kick() | |
else: | |
fail("Invalid command.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment