Last active
August 3, 2018 16:56
-
-
Save mutaku/6225171 to your computer and use it in GitHub Desktop.
monitor directory for file changes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import multiprocessing | |
import os | |
import sys | |
import time | |
import Queue | |
import threading | |
from tkwindow import Report | |
from watchdog import WatchDog | |
class WD(threading.Thread): | |
'''Run WatchDog instances as threads''' | |
def __init__(self, threads, q, upload=False, timing=2): | |
self.upload = upload | |
self.threads = threads | |
self.q = q | |
self.timing = timing | |
threading.Thread.__init__(self) | |
def run(self): | |
while True: | |
path = pathPool.get(True, 1) | |
if path: | |
win = WatchDog( | |
path, | |
self.timing, | |
self.q, | |
self.upload) | |
t = multiprocessing.Process( | |
target=logGen, | |
args=(win,)) | |
t.daemon = True | |
t.start() | |
self.threads.append(t) | |
# This would avoid printing Empty queue error | |
# but it also locks up the queue and prevents | |
# exit | |
# | |
#except Queue.Empty: | |
# pass | |
def logGen(win): | |
win.monitor() | |
if __name__ == "__main__": | |
multiprocessing.freeze_support() | |
cwd = os.getcwd() | |
watched = raw_input( | |
"Directories to monitor (comma sep): [{}] ".format(cwd)) | |
if not watched: | |
dirs = [cwd] | |
else: | |
dirs = [dir.strip() for dir in watched.split(",")] | |
upload = raw_input("Upload new images to imgur (y/n): [n] ") | |
if upload in ["y", "Y"]: | |
upload = True | |
else: | |
upload = False | |
q = multiprocessing.Queue() | |
q.cancel_join_thread() | |
gui = Report(q) | |
threads = list() | |
pathPool = Queue.Queue(0) | |
for x in xrange(len(dirs)): | |
WD(threads, q, upload).start() | |
for dir in dirs: | |
pathPool.put(dir) | |
try: | |
gui.root.mainloop() | |
except (KeyboardInterrupt, SystemExit): | |
gui.root.destroy() | |
for thread in threads: | |
thread.join() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from Tkinter import * | |
from Queue import Empty, Full | |
class Report(object): | |
def __init__(self, q): | |
self.root = Tk() | |
self.root.title("WatchDog") | |
self.root.config(bg="grey") | |
self.report = LabelFrame() | |
self.report.config( | |
bg="lightgrey", | |
padx=2, | |
pady=2, | |
relief=GROOVE, | |
borderwidth=1) | |
self.report.grid() | |
self.scrollyMSG = Scrollbar( | |
self.report, | |
orient=VERTICAL) | |
self.scrollxMSG = Scrollbar( | |
self.report, | |
orient=HORIZONTAL) | |
self.msg = Listbox( | |
self.report, | |
selectmode=EXTENDED, | |
fg="black", | |
activestyle="none", | |
selectbackground="grey", | |
width=120, | |
height=20, | |
xscrollcommand=self.scrollxMSG.set, | |
yscrollcommand=self.scrollyMSG.set) | |
self.scrollyMSG.config( | |
command=self.msg.yview, | |
highlightbackground="white") | |
self.scrollyMSG.grid( | |
row=1, | |
column=1, | |
rowspan=4, | |
sticky=N+S) | |
self.scrollxMSG.config( | |
command=self.msg.xview, | |
highlightbackground="white") | |
self.scrollxMSG.grid( | |
column=0, | |
sticky=E+W) | |
self.msg.grid( | |
row=1, | |
column=0, | |
rowspan=4, | |
sticky=E+W) | |
self.root.after(100, | |
self.CheckQueuePoll, | |
q) | |
def CheckQueuePoll(self, c_queue): | |
try: | |
data = c_queue.get(0) | |
self.updateMSG(data) | |
except Empty: | |
pass | |
finally: | |
self.root.after(100, | |
self.CheckQueuePoll, | |
c_queue) | |
def updateMSG(self, data): | |
self.msg.insert(END, data) | |
self.msg.see(END) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# Test loop monitoring program | |
import time | |
import os | |
import webbrowser | |
import sys | |
import imghdr | |
try: | |
sys.path.append("../PyImgur/") | |
from pyimgur import UploadImage | |
except: | |
pass | |
class WatchDog(): | |
''' | |
I'm watching your directory. | |
''' | |
def __init__(self, watched, delay, q, upload=False): | |
self.watched = watched | |
self.delay = delay | |
self.q = q | |
self.upload = upload | |
self.establish() | |
def log(self, data): | |
''' | |
Output some log data with timestamp | |
''' | |
self.q.put("[{}] {} {}".format( | |
time.strftime("%X"), | |
self.watched, | |
data)) | |
def build(self): | |
''' | |
Build a dictionary of files and stats. | |
''' | |
_build_dict = dict() | |
for file_item in os.listdir(self.watched): | |
try: | |
_build_dict[file_item] = os.stat( | |
os.path.join(self.watched, file_item)) | |
except Exception, error: | |
self.log("[E] {}".format(error)) | |
return _build_dict | |
def establish(self): | |
''' | |
Establish base identities for initial file discovery. | |
''' | |
self.log("[!] Building initial data") | |
self.stats = self.build() | |
self.log("[!] Monitoring {}".format(self.watched)) | |
if self.upload: | |
self.log("[!] Uploading new/modified images to imgur.com") | |
def compare(self): | |
''' | |
Do some basic file stat comparing. | |
''' | |
_current = self.build() | |
_old_keys = set(self.stats.keys()) | |
_new_keys = set(_current.keys()) | |
self.added = list(_new_keys - _old_keys) | |
self.modified = [y for y in _old_keys & _new_keys | |
if self.stats[y] != _current[y]] | |
self.removed = list(_old_keys - _new_keys) | |
self.stats = _current | |
def checkimage(self): | |
_images = list() | |
for image in self.added+self.modified: | |
try: | |
if imghdr.what(os.path.join(self.watched, image)): | |
_images.append(image) | |
except Exception, error: | |
self.log("[E] {}".format(error)) | |
return _images | |
def imgur(self): | |
''' | |
Example uploading to imgur. | |
''' | |
for image in [x for x in self.checkimage()]: | |
result = UploadImage(os.path.join(self.watched, image)) | |
if not result.error: | |
url = result.imageURL['url'] | |
webbrowser.open(url) | |
self.log("[!] Uploaded {} to {}".format(image, url)) | |
else: | |
self.log("[E] {}".format(result.error)) | |
def monitor(self): | |
''' | |
Basic check->pause->check loop monitoring for file changes. | |
''' | |
while True: | |
self.compare() | |
_results = [ | |
["[-] Removed", " ".join(self.removed)], | |
["[*] Modified", " ".join(self.modified)], | |
["[+] Added", " ".join(self.added)]] | |
for result in _results: | |
if len(result[1]): | |
self.log(" : ".join(result)) | |
if self.upload and globals().get("UploadImage"): | |
self.imgur() | |
try: | |
time.sleep(self.delay) | |
except (KeyboardInterrupt, SystemExit): | |
self.log("[!] Exiting") | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Issues:
ctrl-c
) with:Requires killing master tkwatchdog.py process