Created
September 20, 2015 02:15
-
-
Save marmarek/ea1431b52b741d2e009a to your computer and use it in GitHub Desktop.
Window icon updater
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
# | |
# The Qubes OS Project, http://www.qubes-os.org | |
# | |
# Copyright (C) 2013-2015 Marek Marczykowski-Górecki | |
# <[email protected]> | |
# | |
# This program is free software; you can redistribute it and/or | |
# modify it under the terms of the GNU General Public License | |
# as published by the Free Software Foundation; either version 2 | |
# of the License, or (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with this program; if not, write to the Free Software | |
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |
# | |
# | |
import os | |
import struct | |
import sys | |
import fcntl | |
from qubes.imgconverter import ICON_MAXSIZE, Image | |
import xcb | |
import xcb.xcb | |
import xcb.xproto | |
from qubes.qubes import QubesVmCollection, QubesException | |
class IconReceiver(object): | |
def __init__(self): | |
self.conn = xcb.xcb.connect() | |
self.setup = self.conn.get_setup() | |
self.root = self.setup.roots[0].root | |
try: | |
self.domain = os.environ["QREXEC_REMOTE_DOMAIN"] | |
except KeyError: | |
raise Exception("This service needs to be called from qrexec (" | |
"QREXEC_REMOTE_DOMAIN missing)") | |
self.atom_vmname = self.conn.core.InternAtom(False, | |
len("_QUBES_VMNAME"), "_QUBES_VMNAME").reply().atom | |
self.atom_remote_id = self.conn.core.InternAtom(False, | |
len("_QUBES_VMWINDOWID"), "_QUBES_VMWINDOWID").reply().atom | |
self.atom_net_wm_icon = self.conn.core.InternAtom(False, | |
len("_NET_WM_ICON"), "_NET_WM_ICON").reply().atom | |
self.atom_net_client_list = self.conn.core.InternAtom(False, | |
len("_NET_CLIENT_LIST"), "_NET_CLIENT_LIST").reply().atom | |
self.remote2local_window_map = {} | |
self.local2remote_window_map = {} | |
qc = QubesVmCollection() | |
qc.lock_db_for_reading() | |
qc.load() | |
qc.unlock_db() | |
vm = qc.get_vm_by_name(self.domain) | |
if vm is None: | |
raise QubesException("VM '{}' doesn't exist in qubes.xml".format( | |
self.domain)) | |
self.color = vm.label.color | |
del vm | |
del qc | |
def _unpack_int32_array(self, reply): | |
if reply.value_len == 0: | |
return [] | |
if reply.format == 32: | |
return struct.unpack("I" * reply.value_len, reply.value.buf()) | |
else: | |
raise TypeError("Expected format 32") | |
def watch_window(self, w): | |
self.conn.core.ChangeWindowAttributesChecked( | |
w, | |
xcb.xproto.CW.EventMask, | |
[xcb.xproto.EventMask.StructureNotify] | |
) | |
def refresh_windows_mapping(self): | |
name_queries = {} | |
remote_id_queries = {} | |
# if embedding window manager is running, client windows are not | |
# direct children of root window, so traverse such clients list ... | |
cookie = self.conn.core.GetProperty( | |
False, | |
self.root, | |
self.atom_net_client_list, | |
xcb.xproto.Atom.WINDOW, | |
0, | |
512) | |
client_list_reply = cookie.reply() | |
client_list = self._unpack_int32_array(client_list_reply) | |
if client_list_reply.bytes_after: | |
cookie = self.conn.core.GetProperty( | |
False, | |
self.root, | |
self.atom_net_client_list, | |
xcb.xproto.Atom.WINDOW, | |
client_list_reply.value_len, | |
client_list_reply.bytes_after) | |
client_list_reply = cookie.reply() | |
client_list += self._unpack_int32_array(client_list_reply) | |
if not client_list: | |
# ... otherwise just look at root window children | |
cookie = self.conn.core.QueryTree(self.root) | |
root_tree = cookie.reply() | |
client_list = root_tree.children | |
for w in client_list: | |
if w in self.local2remote_window_map.keys(): | |
# already cached | |
continue | |
name_queries[w] = self.conn.core.GetProperty( | |
False, # delete | |
w, # window | |
self.atom_vmname, | |
xcb.xproto.Atom.STRING, | |
0, # long_offset | |
64 # long_length | |
) | |
remote_id_queries[w] = self.conn.core.GetProperty( | |
False, # delete | |
w, # window | |
self.atom_remote_id, | |
xcb.xproto.Atom.WINDOW, | |
0, # long_offset | |
1 # long_length | |
) | |
for w in name_queries.keys(): | |
vmname = name_queries[w].reply() | |
remote_id_reply = remote_id_queries[w].reply() | |
if vmname.format == 8: | |
if str(vmname.value.buf()) == self.domain: | |
# if _QUBES_VMREMOTEID is set, store it in the map, | |
# otherwise simply ignore the window - most likely it was | |
# just created and don't have that property yet | |
if remote_id_reply.format == 32 and remote_id_reply.value_len: | |
win_remote_id = self._unpack_int32_array( | |
remote_id_reply)[0] | |
self.remote2local_window_map[win_remote_id] = w | |
self.local2remote_window_map[w] = win_remote_id | |
self.watch_window(w) | |
else: | |
# if window is known to be of other domain - cache that | |
# knowledge to not check that every time | |
self.local2remote_window_map[w] = None | |
self.watch_window(w) | |
def search_for_window(self, remote_id): | |
# first handle events - remove outdated IDs | |
self.handle_events() | |
if remote_id not in self.remote2local_window_map: | |
self.refresh_windows_mapping() | |
# may raise KeyError | |
return self.remote2local_window_map[remote_id] | |
def handle_events(self): | |
for ev in iter(self.conn.poll_for_event, None): | |
if isinstance(ev, xcb.xproto.DestroyNotifyEvent): | |
remote_id = self.local2remote_window_map.pop(ev.window, None) | |
if remote_id is not None: | |
self.remote2local_window_map.pop(remote_id) | |
def handle_icon_for_window(self, w): | |
# intentionally don't catch exceptions here | |
icon = Image.get_from_stream(sys.stdin, ICON_MAXSIZE, ICON_MAXSIZE) | |
icon_tinted = icon.tint(self.color) | |
icon_tinted.save("/tmp/icon.png") | |
# conver RGBA (Image.data) -> ARGB (X11) | |
pixel_count = len(icon_tinted.data)/4 | |
icon_tinted_data = struct.pack( | |
"%dI" % pixel_count, | |
*[(p>>8)|((p&0xff)<<24) for p in | |
struct.unpack(">%dI" % pixel_count, icon_tinted.data)]) | |
icon_property_data = struct.pack("II", icon_tinted.width, | |
icon_tinted.height) + \ | |
icon_tinted_data | |
self.conn.core.ChangeProperty(xcb.xproto.PropMode.Replace, | |
w, | |
self.atom_net_wm_icon, | |
xcb.xproto.Atom.CARDINAL, | |
32, | |
len(icon_property_data)/4, | |
icon_property_data) | |
self.conn.flush() | |
def ignore_icon(self): | |
untrusted_header = sys.stdin.readline(64) | |
(untrusted_width, untrusted_height) = [int(i) for i in | |
untrusted_header.split(' ')] | |
if untrusted_width < 0 or untrusted_width > ICON_MAXSIZE: | |
raise ValueError("Invalid image width") | |
if untrusted_height < 0 or untrusted_height > ICON_MAXSIZE: | |
raise ValueError("Invalid image width") | |
width, height = untrusted_width, untrusted_height | |
sys.stdin.read(width * height * 4) # RGBA | |
def handle_input(self): | |
for untrusted_w in iter(lambda: sys.stdin.readline(32), ""): | |
remote_winid = int(untrusted_w) | |
try: | |
local_winid = self.search_for_window(remote_winid) | |
self.handle_icon_for_window(local_winid) | |
except KeyError: | |
self.ignore_icon() | |
if __name__ == '__main__': | |
rcvd = IconReceiver() | |
rcvd.handle_input() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import xcb | |
import xcb.xproto | |
import sys | |
import struct | |
ICON_MAX_SIZE = 128 | |
class NoIconError(KeyError): | |
pass | |
class IconSender(object): | |
def __init__(self): | |
self.conn = xcb.connect() | |
self.setup = self.conn.get_setup() | |
self.root = self.setup.roots[0].root | |
# just created windows for which icon wasn't sent yet - should | |
# be send on MapNotifyEvent | |
self.window_queue = set() | |
self.atom_net_wm_icon = self.conn.core.InternAtom(False, | |
len("_NET_WM_ICON"), "_NET_WM_ICON").reply().atom | |
def watch_window(self, w): | |
self.conn.core.ChangeWindowAttributesChecked(w, | |
xcb.xproto.CW.EventMask, | |
[xcb.xproto.EventMask.PropertyChange]) | |
def get_icons(self, w): | |
# check for initial icon now: | |
prop_cookie = self.conn.core.GetProperty( | |
False, # delete | |
w, # window | |
self.atom_net_wm_icon, | |
xcb.xproto.Atom.CARDINAL, | |
0, # long_offset | |
512*1024 # long_length | |
) | |
try: | |
icon = prop_cookie.reply() | |
except xcb.xproto.BadWindow: | |
# Window disappeared in the meantime | |
raise NoIconError() | |
if icon.format == 0: | |
raise NoIconError() | |
# convert it later to a proper int array | |
icon_data = str(icon.value.buf()) | |
if icon.bytes_after: | |
prop_cookie = self.conn.core.GetProperty( | |
False, # delete | |
w, # window | |
self.atom_net_wm_icon, | |
xcb.xproto.Atom.CARDINAL, | |
icon.value_len, # long_offset | |
icon.bytes_after # long_length | |
) | |
icon_cont = prop_cookie.reply() | |
icon_data += str(icon_cont.value.buf()) | |
# join each 4 bytes into a single int | |
icon_data = struct.unpack("%dI" % (len(icon_data)/4), icon_data) | |
icons = {} | |
index = 0 | |
while index < len(icon_data): | |
size = (icon_data[index], icon_data[index+1]) | |
icons[size] = icon_data[index+2:index+2+(size[0]*size[1])] | |
index += 2+(size[0]*size[1]) | |
return icons | |
def send_icon(self, w): | |
try: | |
icons = self.get_icons(w) | |
chosen_size = sorted([k for k in icons.keys() if k[0] < ICON_MAX_SIZE and k[1] < ICON_MAX_SIZE])[-1] | |
sys.stdout.write("{}\n".format(w)) | |
sys.stdout.write("{} {}\n".format(chosen_size[0], chosen_size[1])) | |
sys.stdout.write(''.join([struct.pack('>I', ((b<<8)&0xffffff00)|(b>>24)) for b in icons[chosen_size]])) | |
sys.stdout.flush() | |
except NoIconError: | |
pass | |
def initial_sync(self): | |
cookie = self.conn.core.QueryTree(self.root) | |
root_tree = cookie.reply() | |
for w in root_tree.children: | |
self.watch_window(w) | |
self.send_icon(w) | |
def watch_and_send_icons(self): | |
self.conn.core.ChangeWindowAttributesChecked(self.root, | |
xcb.xproto.CW.EventMask, | |
[xcb.xproto.EventMask.SubstructureNotify]) | |
self.conn.flush() | |
self.initial_sync() | |
for ev in iter(self.conn.wait_for_event, None): | |
if isinstance(ev, xcb.xproto.CreateNotifyEvent): | |
self.window_queue.add(ev.window) | |
self.watch_window(ev.window) | |
elif isinstance(ev, xcb.xproto.MapNotifyEvent): | |
if ev.window in self.window_queue: | |
self.send_icon(ev.window) | |
self.window_queue.remove(ev.window) | |
elif isinstance(ev, xcb.xproto.PropertyNotifyEvent): | |
if ev.atom == self.atom_net_wm_icon: | |
self.send_icon(ev.window) | |
if __name__ == '__main__': | |
sender = IconSender() | |
sender.watch_and_send_icons() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment