Skip to content

Instantly share code, notes, and snippets.

@averrin
Created July 24, 2012 09:26
Show Gist options
  • Select an option

  • Save averrin/3169066 to your computer and use it in GitHub Desktop.

Select an option

Save averrin/3169066 to your computer and use it in GitHub Desktop.
My minimal implementation of Jarvis. IN PROGRESS.
python ./watcher.py ./relax.py:main
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from watcher import mate
from time import sleep
import test
def main():
status = "init"
mate.debug(status)
# mate.watcher.close()
watcher = mate.addWatcher("N", 0)
watcher.setColor("lightblue")
watcher2 = mate.addWatcher("N^3", 0)
watcher2.setColor("lightgreen")
watcher.setValue(1000)
# n = 1/0
# while True:
# n+=1
# watcher.setValue(n)
# watcher2.setValue(n**3)
# if n**3 > 1000:
# watcher2.setColor("#ff5555")
# sleep(1)
if __name__ == '__main__':
main()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@TODO: objectBrowser
"""
__author__ = "Alexey 'Averrin' Nabrodov"
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from PyQt4.QtGui import *
from PyQt4.QtCore import *
import sys
import os
import imp
from datetime import datetime
import inspect
from modulefinder import ModuleFinder
class ImportThread(QThread):
def __init__(self, path, func, parent=None):
QThread.__init__(self)
self.path = path
self.func = func
self.parent = parent
def run(self):
watch_file = imp.load_source('watch_file', self.path)
watch_func = getattr(watch_file, self.func)
api = getattr(watch_file, "mate")
self.api = api
api.watcher = self.parent
self.connect(api, SIGNAL("append"), self.parent.echo_append)
self.connect(api, SIGNAL("echo"), self.parent.echo_setHtml)
try:
watch_func()
except SystemExit:
pass
api.debug("Watched function exited.")
except Exception, e:
curframe = inspect.currentframe()
api.error(e, frame=curframe)
# finally:
# del curframe
class WorkThread(QThread):
def __init__(self):
QThread.__init__(self)
def run(self):
event_handler = self.handler()
event_handler.thread = self
self.observers = []
for d in DIRS:
observer = Observer()
observer.schedule(event_handler, path=d, recursive=False)
observer.start()
self.observers.append(observer)
self.stop = False
while not self.stop:
time.sleep(1)
observer.join()
class handler(FileSystemEventHandler):
def on_any_event(self, event):
self.thread.emit(SIGNAL("event"), event)
class Watcher(QMainWindow):
def __init__(self):
QMainWindow.__init__(self)
self.echo = QTextBrowser()
self.log = QListWidget()
self.vars = QTableWidget(1, 2)
self.vars.setItem(0, 0, QTableWidgetItem(FILENAME))
self.vars.setItem(0, 1, QTableWidgetItem(FUNC))
# self.objects = QTreeWidget()
panel = QWidget()
panel.setLayout(QVBoxLayout())
self.setCentralWidget(panel)
hl = QHBoxLayout()
hl.addWidget(self.echo)
hl.addWidget(self.vars)
# hl.addWidget(self.objects)
w = QWidget()
w.setLayout(hl)
panel.layout().addWidget(w)
panel.layout().addWidget(self.log)
self.resize(800, 600)
self.echo.append("<b>Watched files:</b>")
for p in PATHES:
self.echo.append(p)
self.echo.append("<b>Watched dirs:</b>")
for p in DIRS[1:]:
self.echo.append(p)
self.watchers = {}
self.event_fired()
def event_fired(self, event=None):
if event is not None:
d = os.path.split(event.src_path)[0]
else:
d = ''
if event is None or\
(event.src_path in PATHES and\
event.event_type == "modified") or\
(d in DIRS and d != WATCH_DIR and\
event.event_type == "modified"):
self.info("Watch file updated.")
if hasattr(self, "it"):
self.disconnect(self.it.api, SIGNAL("append"), self.echo_append)
self.disconnect(self.it.api, SIGNAL("echo"), self.echo_setHtml)
self.it.exit()
self.it.terminate()
self.vars.clear()
self.watchers = {}
for r in xrange(self.vars.rowCount() - 1):
self.vars.removeRow(1)
self.vars.setItem(0, 0, QTableWidgetItem(FILENAME))
self.vars.setItem(0, 1, QTableWidgetItem(FUNC))
self.it = ImportThread(PATH, FUNC, self)
self.it.start()
def makeMessage(self, msg, color='', icon='', bold=True, fgcolor='', timestamp=False):
"""""
Return listitem with nize attrs
"""""
msg = str(msg)
if timestamp:
timestamp = datetime.now().strftime('%H:%M:%S')
item = QListWidgetItem('[%s] %s' % (timestamp, msg))
else:
item = QListWidgetItem(msg)
if color:
item.setBackground(QColor(color))
font = QFont('Sans')
font.setBold(bold)
font.setPointSize(8)
item.setFont(font)
item.setTextColor(QColor(fgcolor))
# if icon:
# item.setIcon(QIcon(self.api.icons[icon]))
return item
def info(self, msg):
"""""
Add info message to list
"""""
self.log.addItem(self.makeMessage(msg, "skyblue", 'ok', timestamp=True))
def error(self, msg, frame=None, obj=''):
"""""
Add error message to list
"""""
if not isinstance(msg, Exception):
exc = Exception(msg)
else:
exc = msg
if frame is not None:
f = inspect.trace()[1][0]
calframe = inspect.getouterframes(f)
vmsg = '%s\nMethod: %s; locals: %s\nin line %s: %s' % (exc, calframe[0][3], calframe[0][0].f_locals, calframe[0][2], calframe[0][4][0].rstrip())
else:
vmsg = msg
if not obj:
item = self.makeMessage(vmsg, "red", 'error', timestamp=True)
else:
item = self.makeMessage('%s::%s' % (obj, vmsg), "red", 'error', timestamp=True)
# item.e = exc
self.log.addItem(item)
def debug(self, msg):
"""""
Add debug message to list
"""""
self.log.addItem(self.makeMessage(msg, "lightyellow", 'warning', timestamp=True))
def echo_setHtml(self, msg):
self.echo.setHtml(msg)
def echo_append(self, msg):
self.echo.append(msg)
def add_watcher(self, label, value):
l = QTableWidgetItem(label)
v = QTableWidgetItem(str(value))
count = self.vars.rowCount()
self.vars.insertRow(count)
if self.vars.columnCount() < 2:
self.vars.insertColumn(1)
self.vars.setItem(count, 0, l)
self.vars.setItem(count, 1, v)
self.watchers[label] = count
def set_watcher_value(self, label, value, color):
row = self.watchers[label]
v = QTableWidgetItem(str(value))
self.vars.setItem(row, 1, v)
self.set_watcher_color(label, color)
def set_watcher_color(self, label, color):
row = self.watchers[label]
l = self.vars.item(row, 0)
v = self.vars.item(row, 1)
l.setBackgroundColor(QColor(color))
v.setBackgroundColor(QColor(color))
def main():
qtapp = QApplication(sys.argv)
window = Watcher()
watchdog = WorkThread()
watchdog.connect(watchdog, SIGNAL("event"), window.event_fired)
watchdog.start()
window.show()
qtapp.exec_()
watchdog.stop = True
class Commands(QObject):
def __init__(self, watcher=None):
QObject.__init__(self)
self.watchers = []
def debug(self, msg):
try:
self.watcher.debug(msg)
except AttributeError:
print "DEBUG:\t %s" % msg
def info(self, msg):
try:
self.watcher.info(msg)
except AttributeError:
print "INFO:\t %s" % msg
def error(self, msg, frame=None):
try:
self.watcher.error(msg, frame)
except AttributeError:
print "ERROR:\t %s" % msg
def echo(self, msg, append=False):
try:
if append:
self.emit(SIGNAL("append"), msg)
else:
self.emit(SIGNAL("echo"), msg)
except AttributeError:
print "ERROR:\t %s" % msg
def addWatcher(self, label, value):
w = self.VarWatcher(label, value)
try:
self.connect(w, SIGNAL("add_watcher"), self.watcher.add_watcher)
self.connect(w, SIGNAL("set_watcher_value"), self.watcher.set_watcher_value)
self.connect(w, SIGNAL("set_watcher_color"), self.watcher.set_watcher_color)
w.afterInit()
except AttributeError:
print "ERROR:\t %s" % "Cant add watcher. Plz run watcher.py"
self.watchers.append(w)
return w
class VarWatcher(QObject):
def __init__(self, label, value):
QObject.__init__(self)
self.label = label
self.value = value
self.color = "white"
def afterInit(self):
self.emit(SIGNAL("add_watcher"), self.label, self.value)
def setValue(self, value):
self.value = value
self.emit(SIGNAL("set_watcher_value"), self.label, value, self.color)
def setColor(self, color):
self.color = color
self.emit(SIGNAL("set_watcher_color"), self.label, color)
mate = Commands()
if __name__ == "__main__":
if len(sys.argv) > 1:
arg = sys.argv[1]
PATH, FUNC = arg.split(":")
WATCH_DIR, FILENAME = os.path.split(PATH)
PATH = os.path.abspath(PATH)
finder = ModuleFinder()
finder.run_script(PATH)
PATHES = [PATH]
modules = finder.modules.values()
modules.extend(sys.modules.values())
for module in modules:
if module is not None and hasattr(module, "__file__"):
p = module.__file__
if p is not None and not p.endswith(".so") and not p.endswith(".pyc"):
PATHES.append(p)
DIRS = [WATCH_DIR]
for path in sys.argv[2:]:
if os.path.isfile(path):
PATHES.append(os.path.abspath(path))
elif os.path.isdir(path):
DIRS.append(os.path.abspath(path))
main()
else:
print "Plz, specify file and function"
exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment