Created
July 24, 2012 09:26
-
-
Save averrin/3169066 to your computer and use it in GitHub Desktop.
My minimal implementation of Jarvis. IN PROGRESS.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| python ./watcher.py ./relax.py:main |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| # -*- coding: utf-8 -*- | |
| from watcher import mate | |
| from time import sleep | |
| import test | |
| def main(): | |
| status = "init" | |
| mate.debug(status) | |
| # mate.watcher.close() | |
| watcher = mate.addWatcher("N", 0) | |
| watcher.setColor("lightblue") | |
| watcher2 = mate.addWatcher("N^3", 0) | |
| watcher2.setColor("lightgreen") | |
| watcher.setValue(1000) | |
| # n = 1/0 | |
| # while True: | |
| # n+=1 | |
| # watcher.setValue(n) | |
| # watcher2.setValue(n**3) | |
| # if n**3 > 1000: | |
| # watcher2.setColor("#ff5555") | |
| # sleep(1) | |
| if __name__ == '__main__': | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| # -*- coding: utf-8 -*- | |
| """ | |
| @TODO: objectBrowser | |
| """ | |
| __author__ = "Alexey 'Averrin' Nabrodov" | |
| import time | |
| from watchdog.observers import Observer | |
| from watchdog.events import FileSystemEventHandler | |
| from PyQt4.QtGui import * | |
| from PyQt4.QtCore import * | |
| import sys | |
| import os | |
| import imp | |
| from datetime import datetime | |
| import inspect | |
| from modulefinder import ModuleFinder | |
| class ImportThread(QThread): | |
| def __init__(self, path, func, parent=None): | |
| QThread.__init__(self) | |
| self.path = path | |
| self.func = func | |
| self.parent = parent | |
| def run(self): | |
| watch_file = imp.load_source('watch_file', self.path) | |
| watch_func = getattr(watch_file, self.func) | |
| api = getattr(watch_file, "mate") | |
| self.api = api | |
| api.watcher = self.parent | |
| self.connect(api, SIGNAL("append"), self.parent.echo_append) | |
| self.connect(api, SIGNAL("echo"), self.parent.echo_setHtml) | |
| try: | |
| watch_func() | |
| except SystemExit: | |
| pass | |
| api.debug("Watched function exited.") | |
| except Exception, e: | |
| curframe = inspect.currentframe() | |
| api.error(e, frame=curframe) | |
| # finally: | |
| # del curframe | |
| class WorkThread(QThread): | |
| def __init__(self): | |
| QThread.__init__(self) | |
| def run(self): | |
| event_handler = self.handler() | |
| event_handler.thread = self | |
| self.observers = [] | |
| for d in DIRS: | |
| observer = Observer() | |
| observer.schedule(event_handler, path=d, recursive=False) | |
| observer.start() | |
| self.observers.append(observer) | |
| self.stop = False | |
| while not self.stop: | |
| time.sleep(1) | |
| observer.join() | |
| class handler(FileSystemEventHandler): | |
| def on_any_event(self, event): | |
| self.thread.emit(SIGNAL("event"), event) | |
| class Watcher(QMainWindow): | |
| def __init__(self): | |
| QMainWindow.__init__(self) | |
| self.echo = QTextBrowser() | |
| self.log = QListWidget() | |
| self.vars = QTableWidget(1, 2) | |
| self.vars.setItem(0, 0, QTableWidgetItem(FILENAME)) | |
| self.vars.setItem(0, 1, QTableWidgetItem(FUNC)) | |
| # self.objects = QTreeWidget() | |
| panel = QWidget() | |
| panel.setLayout(QVBoxLayout()) | |
| self.setCentralWidget(panel) | |
| hl = QHBoxLayout() | |
| hl.addWidget(self.echo) | |
| hl.addWidget(self.vars) | |
| # hl.addWidget(self.objects) | |
| w = QWidget() | |
| w.setLayout(hl) | |
| panel.layout().addWidget(w) | |
| panel.layout().addWidget(self.log) | |
| self.resize(800, 600) | |
| self.echo.append("<b>Watched files:</b>") | |
| for p in PATHES: | |
| self.echo.append(p) | |
| self.echo.append("<b>Watched dirs:</b>") | |
| for p in DIRS[1:]: | |
| self.echo.append(p) | |
| self.watchers = {} | |
| self.event_fired() | |
| def event_fired(self, event=None): | |
| if event is not None: | |
| d = os.path.split(event.src_path)[0] | |
| else: | |
| d = '' | |
| if event is None or\ | |
| (event.src_path in PATHES and\ | |
| event.event_type == "modified") or\ | |
| (d in DIRS and d != WATCH_DIR and\ | |
| event.event_type == "modified"): | |
| self.info("Watch file updated.") | |
| if hasattr(self, "it"): | |
| self.disconnect(self.it.api, SIGNAL("append"), self.echo_append) | |
| self.disconnect(self.it.api, SIGNAL("echo"), self.echo_setHtml) | |
| self.it.exit() | |
| self.it.terminate() | |
| self.vars.clear() | |
| self.watchers = {} | |
| for r in xrange(self.vars.rowCount() - 1): | |
| self.vars.removeRow(1) | |
| self.vars.setItem(0, 0, QTableWidgetItem(FILENAME)) | |
| self.vars.setItem(0, 1, QTableWidgetItem(FUNC)) | |
| self.it = ImportThread(PATH, FUNC, self) | |
| self.it.start() | |
| def makeMessage(self, msg, color='', icon='', bold=True, fgcolor='', timestamp=False): | |
| """"" | |
| Return listitem with nize attrs | |
| """"" | |
| msg = str(msg) | |
| if timestamp: | |
| timestamp = datetime.now().strftime('%H:%M:%S') | |
| item = QListWidgetItem('[%s] %s' % (timestamp, msg)) | |
| else: | |
| item = QListWidgetItem(msg) | |
| if color: | |
| item.setBackground(QColor(color)) | |
| font = QFont('Sans') | |
| font.setBold(bold) | |
| font.setPointSize(8) | |
| item.setFont(font) | |
| item.setTextColor(QColor(fgcolor)) | |
| # if icon: | |
| # item.setIcon(QIcon(self.api.icons[icon])) | |
| return item | |
| def info(self, msg): | |
| """"" | |
| Add info message to list | |
| """"" | |
| self.log.addItem(self.makeMessage(msg, "skyblue", 'ok', timestamp=True)) | |
| def error(self, msg, frame=None, obj=''): | |
| """"" | |
| Add error message to list | |
| """"" | |
| if not isinstance(msg, Exception): | |
| exc = Exception(msg) | |
| else: | |
| exc = msg | |
| if frame is not None: | |
| f = inspect.trace()[1][0] | |
| calframe = inspect.getouterframes(f) | |
| vmsg = '%s\nMethod: %s; locals: %s\nin line %s: %s' % (exc, calframe[0][3], calframe[0][0].f_locals, calframe[0][2], calframe[0][4][0].rstrip()) | |
| else: | |
| vmsg = msg | |
| if not obj: | |
| item = self.makeMessage(vmsg, "red", 'error', timestamp=True) | |
| else: | |
| item = self.makeMessage('%s::%s' % (obj, vmsg), "red", 'error', timestamp=True) | |
| # item.e = exc | |
| self.log.addItem(item) | |
| def debug(self, msg): | |
| """"" | |
| Add debug message to list | |
| """"" | |
| self.log.addItem(self.makeMessage(msg, "lightyellow", 'warning', timestamp=True)) | |
| def echo_setHtml(self, msg): | |
| self.echo.setHtml(msg) | |
| def echo_append(self, msg): | |
| self.echo.append(msg) | |
| def add_watcher(self, label, value): | |
| l = QTableWidgetItem(label) | |
| v = QTableWidgetItem(str(value)) | |
| count = self.vars.rowCount() | |
| self.vars.insertRow(count) | |
| if self.vars.columnCount() < 2: | |
| self.vars.insertColumn(1) | |
| self.vars.setItem(count, 0, l) | |
| self.vars.setItem(count, 1, v) | |
| self.watchers[label] = count | |
| def set_watcher_value(self, label, value, color): | |
| row = self.watchers[label] | |
| v = QTableWidgetItem(str(value)) | |
| self.vars.setItem(row, 1, v) | |
| self.set_watcher_color(label, color) | |
| def set_watcher_color(self, label, color): | |
| row = self.watchers[label] | |
| l = self.vars.item(row, 0) | |
| v = self.vars.item(row, 1) | |
| l.setBackgroundColor(QColor(color)) | |
| v.setBackgroundColor(QColor(color)) | |
| def main(): | |
| qtapp = QApplication(sys.argv) | |
| window = Watcher() | |
| watchdog = WorkThread() | |
| watchdog.connect(watchdog, SIGNAL("event"), window.event_fired) | |
| watchdog.start() | |
| window.show() | |
| qtapp.exec_() | |
| watchdog.stop = True | |
| class Commands(QObject): | |
| def __init__(self, watcher=None): | |
| QObject.__init__(self) | |
| self.watchers = [] | |
| def debug(self, msg): | |
| try: | |
| self.watcher.debug(msg) | |
| except AttributeError: | |
| print "DEBUG:\t %s" % msg | |
| def info(self, msg): | |
| try: | |
| self.watcher.info(msg) | |
| except AttributeError: | |
| print "INFO:\t %s" % msg | |
| def error(self, msg, frame=None): | |
| try: | |
| self.watcher.error(msg, frame) | |
| except AttributeError: | |
| print "ERROR:\t %s" % msg | |
| def echo(self, msg, append=False): | |
| try: | |
| if append: | |
| self.emit(SIGNAL("append"), msg) | |
| else: | |
| self.emit(SIGNAL("echo"), msg) | |
| except AttributeError: | |
| print "ERROR:\t %s" % msg | |
| def addWatcher(self, label, value): | |
| w = self.VarWatcher(label, value) | |
| try: | |
| self.connect(w, SIGNAL("add_watcher"), self.watcher.add_watcher) | |
| self.connect(w, SIGNAL("set_watcher_value"), self.watcher.set_watcher_value) | |
| self.connect(w, SIGNAL("set_watcher_color"), self.watcher.set_watcher_color) | |
| w.afterInit() | |
| except AttributeError: | |
| print "ERROR:\t %s" % "Cant add watcher. Plz run watcher.py" | |
| self.watchers.append(w) | |
| return w | |
| class VarWatcher(QObject): | |
| def __init__(self, label, value): | |
| QObject.__init__(self) | |
| self.label = label | |
| self.value = value | |
| self.color = "white" | |
| def afterInit(self): | |
| self.emit(SIGNAL("add_watcher"), self.label, self.value) | |
| def setValue(self, value): | |
| self.value = value | |
| self.emit(SIGNAL("set_watcher_value"), self.label, value, self.color) | |
| def setColor(self, color): | |
| self.color = color | |
| self.emit(SIGNAL("set_watcher_color"), self.label, color) | |
| mate = Commands() | |
| if __name__ == "__main__": | |
| if len(sys.argv) > 1: | |
| arg = sys.argv[1] | |
| PATH, FUNC = arg.split(":") | |
| WATCH_DIR, FILENAME = os.path.split(PATH) | |
| PATH = os.path.abspath(PATH) | |
| finder = ModuleFinder() | |
| finder.run_script(PATH) | |
| PATHES = [PATH] | |
| modules = finder.modules.values() | |
| modules.extend(sys.modules.values()) | |
| for module in modules: | |
| if module is not None and hasattr(module, "__file__"): | |
| p = module.__file__ | |
| if p is not None and not p.endswith(".so") and not p.endswith(".pyc"): | |
| PATHES.append(p) | |
| DIRS = [WATCH_DIR] | |
| for path in sys.argv[2:]: | |
| if os.path.isfile(path): | |
| PATHES.append(os.path.abspath(path)) | |
| elif os.path.isdir(path): | |
| DIRS.append(os.path.abspath(path)) | |
| main() | |
| else: | |
| print "Plz, specify file and function" | |
| exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment