Skip to content

Instantly share code, notes, and snippets.

@ste-fan
Last active November 30, 2017 03:23
Show Gist options
  • Save ste-fan/0fe2d14ced250c2ef962 to your computer and use it in GitHub Desktop.
Save ste-fan/0fe2d14ced250c2ef962 to your computer and use it in GitHub Desktop.
Manage app dependant suspend / hibernate and shutdown actions (example: Pidgin disconnecting IM accounts), easily adaptable
#!/usr/bin/env python
## /usr/local/share/functions/inhibit-shutdown-sleep-wrapper.py
##
## inhibit shutdown and sleep as long as program is running (using delay mode)
## so that linked app can do necessary stuff before shutdown and sleep
##
## make symbolic link to this file named as the program executable
## that shall be run between DBus calls and put it in /usr/local/...
## directory that is in the PATH
import os, sys, subprocess, time, atexit, signal
import dbus, gobject, multiprocessing
from glib import MainLoop as MainLoopType
from dbus.mainloop import glib
gobject.threads_init()
glib.threads_init()
glib.DBusGMainLoop(set_as_default=True)
XTABLE = sys.argv[0].split('/')[-1]
ARGS = sys.argv[1:]
def run( xtable=XTABLE, args=ARGS ):
## temporarily remove all 'local' containing folders from path
oldPath = os.environ['PATH']
newPathLst = []
for p in oldPath.split(os.pathsep):
if 'local' not in p:
newPathLst.append(p)
newPath = os.pathsep.join(newPathLst)
os.environ['PATH'] = newPath
pipe = subprocess.Popen( [xtable] + args )
os.environ['PATH'] = oldPath
return pipe
def run_wrapper( xtable=XTABLE, args=ARGS ):
pipe = run( xtable=xtable, args=args )
## close subprocess if it is still running
def close_pipe(pipe):
if pipe.poll() == None:
global SIGNUM
if SIGNUM:
pipe.send_signal(SIGNUM)
else:
pipe.terminate()
atexit.register( close_pipe, pipe )
if xtable == 'pidgin':
purple = pidgin_connect_DBus(timeout=10)
## toggle idle and current status (so that XMPP priority is set correctly by plugin)
if purple != None:
current = purple.PurpleSavedstatusGetCurrent()
idle = purple.PurpleSavedstatusGetIdleaway()
purple.PurpleSavedstatusActivate(idle)
purple.PurpleSavedstatusActivate(current)
return pipe
def pidgin_connect_DBus(timeout=None):
if timeout:
start = time.time()
session_bus = dbus.SessionBus()
## try to connect to Pidgin's DBus until it's available
while True:
try:
proxy = session_bus.get_object( 'im.pidgin.purple.PurpleService',
'/im/pidgin/purple/PurpleObject' )
except dbus.DBusException:
if timeout and ( time.time() - start ) > timeout:
return None
time.sleep(0.1)
else:
break
purple = dbus.Interface( proxy, 'im.pidgin.purple.PurpleInterface' )
return purple
def freedesktop_connect_DBus():
system_bus = dbus.SystemBus()
proxy = system_bus.get_object( 'org.freedesktop.login1',
'/org/freedesktop/login1' )
login1 = dbus.Interface( proxy, 'org.freedesktop.login1.Manager')
return login1
def gnome_connect_DBus():
session_bus = dbus.SessionBus()
proxy = session_bus.get_object( 'org.gnome.SessionManager',
'/org/gnome/SessionManager' )
session = dbus.Interface( proxy, 'org.gnome.SessionManager' )
return session
def close_subproc(p):
if p.is_alive():
p.terminate()
p.join()
def pidgin_wait_for(purple, signal, lst=None, cnt=1, timeout=None):
if timeout:
start = time.time()
def signal_worker(q, purple, signal):
def putter(*args):
if args:
if len(args) == 1:
q.put(args[0])
else:
q.put(args)
else:
q.put(1)
purple.connect_to_signal(signal, putter)
loop = gobject.MainLoop()
loop.run()
q = multiprocessing.Queue()
p = multiprocessing.Process( target=signal_worker,
args=(q, purple, signal) )
p.start()
atexit.register( close_subproc, p )
while True:
try:
if timeout:
to = timeout - (time.time() - start)
else:
to = None
a = q.get(timeout=to)
except Queue.Empty:
break
if isinstance(a, MainLoopType):
atexit.register( a.quit )
continue
if lst:
try:
lst.remove(a)
except ValueError:
pass
if len(lst) == 0:
break
elif cnt:
cnt -= 1
if cnt == 0:
break
def wait_for_ping(ip='109.193.193.59', timeout=None): ## google.com
if timeout:
start = time.time()
fnull = open(os.devnull, 'w')
while True:
if timeout and ( time.time() - start ) > timeout:
break
rc = subprocess.call(['ping', '-c1', ip], stdout=fnull, stderr=fnull)
if rc == 0:
break
time.sleep(0.25)
## DBus signal handler function (on_signal)
def pidgin_on_PrepareForSleepShutdown(boolean, member=None):
global CURRENT
## prepare for sleep (=suspend or hibernate) or shutdown
if boolean:
purple = pidgin_connect_DBus(timeout=0.55)
if purple != None:
if not purple.PurpleSavedstatusIsIdleaway():
CURRENT = purple.PurpleSavedstatusGetCurrent()
current_type = purple.PurpleSavedstatusGetType(CURRENT)
available_type = purple.PurplePrimitiveGetTypeFromId('available')
if current_type == available_type:
idle = purple.PurpleSavedstatusGetIdleaway()
purple.PurpleSavedstatusActivate(idle)
offline_type = purple.PurplePrimitiveGetTypeFromId('offline')
offline = purple.PurpleSavedstatusNew('', offline_type)
purple.PurpleSavedstatusActivate(offline)
rm_lock()
## resume from sleep
else:
if member == 'PrepareForSleep':
purple = pidgin_connect_DBus(timeout=10)
if purple != None:
wait_for_ping(timeout=10)
if CURRENT:
purple.PurpleSavedstatusActivate(CURRENT)
CURRENT = None
else:
available_type = purple.PurplePrimitiveGetTypeFromId('available')
available = purple.PurpleSavedstatusNew('', available_type)
purple.PurpleSavedstatusActivate(available)
take_lock()
def listen_to_DBus_signals(prefs_dict):
for key in prefs_dict:
if key == 'login1':
iface = freedesktop_connect_DBus()
else:
continue
signals = prefs_dict[key]['signals']
on_signal = prefs_dict[key]['on_signal']
take_lock = prefs_dict[key]['take_lock']
rm_lock = prefs_dict[key]['rm_lock']
def signal_worker(iface, signals, on_signal, take_lock, rm_lock):
take_lock()
atexit.register( rm_lock )
on_signal.func_globals.update({ 'take_lock': take_lock,
'rm_lock': rm_lock })
for signal in signals:
iface.connect_to_signal( signal, on_signal,
member_keyword='member' )
loop = gobject.MainLoop()
loop.run()
p = multiprocessing.Process(
target=signal_worker,
args=( iface, signals, on_signal, take_lock, rm_lock ),
name='DBus-signal_worker_%s' % key
)
p.start()
atexit.register( close_subproc, p )
def freedesktop_Inhibit_ShutdownSleep( who, why, what='shutdown:sleep',
mode='delay' ):
login1 = freedesktop_connect_DBus()
fd = login1.Inhibit( what, who, why, mode )
return fd
## handle some signals to trigger normal exit (and atexit then triggers
## its own stuff)
def sigHandler(signum, stack=None):
sig = convSignum2Sig[signum]
if sig in ['SIGHUP', 'SIGINT', 'SIGQUIT', 'SIGTERM']:
exitCode = convSig2exitCode.get(sig, 1)
#~ print("Caught signal %d '%s', exiting with code %d ..." %(signum, sig, exitCode))
if sig == 'SIGQUIT':
global CAUGHT_SIGQUIT
CAUGHT_SIGQUIT = True
global SIGNUM
SIGNUM = signum
sys.exit(exitCode)
## for other signals
else:
pass
if __name__ == '__main__':
## SIGNAL HANDLING
## set up dictionary to convert from signal (e.g. 'SIGTERM') to proper
## exit code (e.g. 143)
convSig2exitCode = {
'SIGINT': 1,
'SIGQUIT': 131,
'SIGTERM': 143,
'SIGHUP': 129,
}
## handle SIG... signals correctly (like SIGTERM)
## set up dictionary to convert from signal number (e.g. 15) to signal
## (e.g. 'SIGTERM')
convSignum2Sig = {}
#~ for i in [x for x in dir(signal) if x.startswith("SIG") and \
#~ not x.startswith('SIG_')]:
for i in ['SIGINT', 'SIGQUIT', 'SIGTERM', 'SIGHUP']:
try:
signum = getattr(signal, i)
signal.signal(signum, sigHandler)
convSignum2Sig[signum] = i
except RuntimeError as m:
pass ## do not consider signals like SIGKILL, which cannot be
## handled (by definition)
## keep track of a caught SIGQUIT, so temp. files (PID file) will not be removed
CAUGHT_SIGQUIT = False
## keep track of catched signum
SIGNUM = None
## ===================== MAIN =====================
## keep track of the file descriptor containing the inhibit lock
FD = None
argsStr = ' '.join(ARGS)
if XTABLE == 'pidgin':
CURRENT = None ## keep track of current status
on_PrepareForSleepShutdown = pidgin_on_PrepareForSleepShutdown
msg = 'Disconnecting IM accounts before suspend/shutdown...'
if argsStr != '':
msg += ' (%s)' % argsStr
argsStr = msg
if argsStr == '':
argsStr = 'n/a'
def login1_take_lock(who=XTABLE, why=argsStr):
global FD
FD = freedesktop_Inhibit_ShutdownSleep( who, why )
def login1_rm_lock(): ## close file descriptor
global FD
if FD:
os.close( FD.take() )
FD = None
if 'on_PrepareForSleepShutdown' in locals():
## start subprocess (and take_lock() there)
listen_to_DBus_signals(
{ 'login1': { 'signals': ['PrepareForSleep', 'PrepareForShutdown'],
'on_signal': on_PrepareForSleepShutdown,
'take_lock': login1_take_lock,
'rm_lock': login1_rm_lock } }
)
pipe = run_wrapper()
ec = pipe.wait()
sys.exit(ec)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment