Skip to content

Instantly share code, notes, and snippets.

@ganadist
Last active December 12, 2015 06:28
Show Gist options
  • Save ganadist/4729093 to your computer and use it in GitHub Desktop.
Save ganadist/4729093 to your computer and use it in GitHub Desktop.
simple push message script via facebook messenger
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# vim: ts=4 st=4 sts=4 expandtab syntax=python
import sys, os, time
if not 'DISPLAY' in os.environ:
os.environ['DISPLAY'] = ':2'
import gi
from gi.repository import GLib, Gio, Soup
gi.require_version('Gtk', '2.0')
gi.require_version('Gdk', '2.0')
gi.require_version('WebKit', '1.0')
from gi.repository import Gtk, Gdk, WebKit
import urllib
BASE_PATH = os.path.join(os.getenv('HOME'), ".push")
DATABASE_PATH = os.path.join(BASE_PATH, "database")
COOKIE_FILE = os.path.join(BASE_PATH, "cookie.txt")
USER_ID = 'facebook_id'
URL = 'http://www.facebook.com/messages/%s'%USER_ID
GIO_URL = 'https://developers.google.com/events/io/'
class Timer:
def __init__(self, duration, cb, *args):
self.tag = GLib.timeout_add(duration, cb, *args)
def __del__(self):
GLib.source_remove(self.tag)
class GioMonitor(object):
def __init__(self, cb):
object.__init__(self)
self.cb = cb
self.data = ''
self.schedule_fetch(1)
def schedule_fetch(self, sec):
self.tag = Timer(sec * 1000, self.fetch)
def fetch(self):
data = ''
try:
data = urllib.urlopen(GIO_URL).read()
except:
pass
if data == '':
# fetch failed. retry after 5 sec
self.schedule_fetch(5)
return
if self.data != data:
if self.data != '':
# data is changed! run callback
self.cb()
else:
print 'not changed'
self.data = data
self.schedule_fetch(30)
class Window(Gtk.Window):
def __init__(self):
Gtk.Window.__init__(self)
vbox = Gtk.VBox(False)
bbox = Gtk.HButtonBox()
test_button = Gtk.Button("Test")
test_button.connect("clicked", self.click_test_button)
bbox.add(test_button)
scroll = Gtk.ScrolledWindow()
view = WebKit.WebView()
scroll.add(view)
vbox.add(scroll)
vbox.add(bbox)
self.add(vbox)
self.webview = view
self.setup_webview(view)
self.connect("destroy", Gtk.main_quit)
view.connect("notify::load-status", self.handle_load_status)
view.connect("script-alert", self.handle_alert)
view.grab_focus()
self.set_default_size(800, 600)
self.load_uri(URL)
def load_uri(self, uri):
self.webview.load_uri(uri)
def handle_alert(self, *args):
return True
def handle_load_status(self, webview, psepc):
status = webview.get_load_status()
def click_test_button(self, button, *args):
self.webview.grab_focus()
self.send_message('open %s'%GIO_URL)
def send_message(self, message, repeat = 1):
def gen(repeat):
script = """document.querySelector("textarea[name='message_body']").value = "%s"
document.querySelector("input[class='_1qp5']").click()"""%message
for i in range(repeat):
self.webview.execute_script(script)
yield True
g = gen(repeat)
self.message_tag = Timer(5 * 1000, g.next)
def setup_webview(self, webview):
try:
os.makedirs(BASE_PATH)
except:
pass
session = WebKit.get_default_session()
jar = Soup.CookieJarText.new(COOKIE_FILE, False)
jar.set_accept_policy(Soup.CookieJarAcceptPolicy.ALWAYS)
session.add_feature(jar)
WebKit.set_web_database_directory_path(DATABASE_PATH)
settings = webview.get_settings()
settings.props.enable_page_cache = True
settings.props.enable_dns_prefetching = True
webview.set_settings(settings)
if __name__ == '__main__':
w = Window()
w.show_all()
def cb():
w.send_message('open %s'%GIO_URL, 20)
g = GioMonitor(cb)
exchook = sys.excepthook
def new_hook(type, value, traceback):
if isinstance(value, KeyboardInterrupt):
Gtk.main_quit()
return
return exchook(type, value, traceback)
sys.excepthook = exchook
GLib.timeout_add(500, lambda : True)
Gtk.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment