Skip to content

Instantly share code, notes, and snippets.

@marmarek
Created September 20, 2015 02:15
Show Gist options
  • Save marmarek/ea1431b52b741d2e009a to your computer and use it in GitHub Desktop.
Save marmarek/ea1431b52b741d2e009a to your computer and use it in GitHub Desktop.
Window icon updater
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2013-2015 Marek Marczykowski-Górecki
# <[email protected]>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
#
import os
import struct
import sys
import fcntl
from qubes.imgconverter import ICON_MAXSIZE, Image
import xcb
import xcb.xcb
import xcb.xproto
from qubes.qubes import QubesVmCollection, QubesException
class IconReceiver(object):
def __init__(self):
self.conn = xcb.xcb.connect()
self.setup = self.conn.get_setup()
self.root = self.setup.roots[0].root
try:
self.domain = os.environ["QREXEC_REMOTE_DOMAIN"]
except KeyError:
raise Exception("This service needs to be called from qrexec ("
"QREXEC_REMOTE_DOMAIN missing)")
self.atom_vmname = self.conn.core.InternAtom(False,
len("_QUBES_VMNAME"), "_QUBES_VMNAME").reply().atom
self.atom_remote_id = self.conn.core.InternAtom(False,
len("_QUBES_VMWINDOWID"), "_QUBES_VMWINDOWID").reply().atom
self.atom_net_wm_icon = self.conn.core.InternAtom(False,
len("_NET_WM_ICON"), "_NET_WM_ICON").reply().atom
self.atom_net_client_list = self.conn.core.InternAtom(False,
len("_NET_CLIENT_LIST"), "_NET_CLIENT_LIST").reply().atom
self.remote2local_window_map = {}
self.local2remote_window_map = {}
qc = QubesVmCollection()
qc.lock_db_for_reading()
qc.load()
qc.unlock_db()
vm = qc.get_vm_by_name(self.domain)
if vm is None:
raise QubesException("VM '{}' doesn't exist in qubes.xml".format(
self.domain))
self.color = vm.label.color
del vm
del qc
def _unpack_int32_array(self, reply):
if reply.value_len == 0:
return []
if reply.format == 32:
return struct.unpack("I" * reply.value_len, reply.value.buf())
else:
raise TypeError("Expected format 32")
def watch_window(self, w):
self.conn.core.ChangeWindowAttributesChecked(
w,
xcb.xproto.CW.EventMask,
[xcb.xproto.EventMask.StructureNotify]
)
def refresh_windows_mapping(self):
name_queries = {}
remote_id_queries = {}
# if embedding window manager is running, client windows are not
# direct children of root window, so traverse such clients list ...
cookie = self.conn.core.GetProperty(
False,
self.root,
self.atom_net_client_list,
xcb.xproto.Atom.WINDOW,
0,
512)
client_list_reply = cookie.reply()
client_list = self._unpack_int32_array(client_list_reply)
if client_list_reply.bytes_after:
cookie = self.conn.core.GetProperty(
False,
self.root,
self.atom_net_client_list,
xcb.xproto.Atom.WINDOW,
client_list_reply.value_len,
client_list_reply.bytes_after)
client_list_reply = cookie.reply()
client_list += self._unpack_int32_array(client_list_reply)
if not client_list:
# ... otherwise just look at root window children
cookie = self.conn.core.QueryTree(self.root)
root_tree = cookie.reply()
client_list = root_tree.children
for w in client_list:
if w in self.local2remote_window_map.keys():
# already cached
continue
name_queries[w] = self.conn.core.GetProperty(
False, # delete
w, # window
self.atom_vmname,
xcb.xproto.Atom.STRING,
0, # long_offset
64 # long_length
)
remote_id_queries[w] = self.conn.core.GetProperty(
False, # delete
w, # window
self.atom_remote_id,
xcb.xproto.Atom.WINDOW,
0, # long_offset
1 # long_length
)
for w in name_queries.keys():
vmname = name_queries[w].reply()
remote_id_reply = remote_id_queries[w].reply()
if vmname.format == 8:
if str(vmname.value.buf()) == self.domain:
# if _QUBES_VMREMOTEID is set, store it in the map,
# otherwise simply ignore the window - most likely it was
# just created and don't have that property yet
if remote_id_reply.format == 32 and remote_id_reply.value_len:
win_remote_id = self._unpack_int32_array(
remote_id_reply)[0]
self.remote2local_window_map[win_remote_id] = w
self.local2remote_window_map[w] = win_remote_id
self.watch_window(w)
else:
# if window is known to be of other domain - cache that
# knowledge to not check that every time
self.local2remote_window_map[w] = None
self.watch_window(w)
def search_for_window(self, remote_id):
# first handle events - remove outdated IDs
self.handle_events()
if remote_id not in self.remote2local_window_map:
self.refresh_windows_mapping()
# may raise KeyError
return self.remote2local_window_map[remote_id]
def handle_events(self):
for ev in iter(self.conn.poll_for_event, None):
if isinstance(ev, xcb.xproto.DestroyNotifyEvent):
remote_id = self.local2remote_window_map.pop(ev.window, None)
if remote_id is not None:
self.remote2local_window_map.pop(remote_id)
def handle_icon_for_window(self, w):
# intentionally don't catch exceptions here
icon = Image.get_from_stream(sys.stdin, ICON_MAXSIZE, ICON_MAXSIZE)
icon_tinted = icon.tint(self.color)
icon_tinted.save("/tmp/icon.png")
# conver RGBA (Image.data) -> ARGB (X11)
pixel_count = len(icon_tinted.data)/4
icon_tinted_data = struct.pack(
"%dI" % pixel_count,
*[(p>>8)|((p&0xff)<<24) for p in
struct.unpack(">%dI" % pixel_count, icon_tinted.data)])
icon_property_data = struct.pack("II", icon_tinted.width,
icon_tinted.height) + \
icon_tinted_data
self.conn.core.ChangeProperty(xcb.xproto.PropMode.Replace,
w,
self.atom_net_wm_icon,
xcb.xproto.Atom.CARDINAL,
32,
len(icon_property_data)/4,
icon_property_data)
self.conn.flush()
def ignore_icon(self):
untrusted_header = sys.stdin.readline(64)
(untrusted_width, untrusted_height) = [int(i) for i in
untrusted_header.split(' ')]
if untrusted_width < 0 or untrusted_width > ICON_MAXSIZE:
raise ValueError("Invalid image width")
if untrusted_height < 0 or untrusted_height > ICON_MAXSIZE:
raise ValueError("Invalid image width")
width, height = untrusted_width, untrusted_height
sys.stdin.read(width * height * 4) # RGBA
def handle_input(self):
for untrusted_w in iter(lambda: sys.stdin.readline(32), ""):
remote_winid = int(untrusted_w)
try:
local_winid = self.search_for_window(remote_winid)
self.handle_icon_for_window(local_winid)
except KeyError:
self.ignore_icon()
if __name__ == '__main__':
rcvd = IconReceiver()
rcvd.handle_input()
#!/usr/bin/python
import xcb
import xcb.xproto
import sys
import struct
ICON_MAX_SIZE = 128
class NoIconError(KeyError):
pass
class IconSender(object):
def __init__(self):
self.conn = xcb.connect()
self.setup = self.conn.get_setup()
self.root = self.setup.roots[0].root
# just created windows for which icon wasn't sent yet - should
# be send on MapNotifyEvent
self.window_queue = set()
self.atom_net_wm_icon = self.conn.core.InternAtom(False,
len("_NET_WM_ICON"), "_NET_WM_ICON").reply().atom
def watch_window(self, w):
self.conn.core.ChangeWindowAttributesChecked(w,
xcb.xproto.CW.EventMask,
[xcb.xproto.EventMask.PropertyChange])
def get_icons(self, w):
# check for initial icon now:
prop_cookie = self.conn.core.GetProperty(
False, # delete
w, # window
self.atom_net_wm_icon,
xcb.xproto.Atom.CARDINAL,
0, # long_offset
512*1024 # long_length
)
try:
icon = prop_cookie.reply()
except xcb.xproto.BadWindow:
# Window disappeared in the meantime
raise NoIconError()
if icon.format == 0:
raise NoIconError()
# convert it later to a proper int array
icon_data = str(icon.value.buf())
if icon.bytes_after:
prop_cookie = self.conn.core.GetProperty(
False, # delete
w, # window
self.atom_net_wm_icon,
xcb.xproto.Atom.CARDINAL,
icon.value_len, # long_offset
icon.bytes_after # long_length
)
icon_cont = prop_cookie.reply()
icon_data += str(icon_cont.value.buf())
# join each 4 bytes into a single int
icon_data = struct.unpack("%dI" % (len(icon_data)/4), icon_data)
icons = {}
index = 0
while index < len(icon_data):
size = (icon_data[index], icon_data[index+1])
icons[size] = icon_data[index+2:index+2+(size[0]*size[1])]
index += 2+(size[0]*size[1])
return icons
def send_icon(self, w):
try:
icons = self.get_icons(w)
chosen_size = sorted([k for k in icons.keys() if k[0] < ICON_MAX_SIZE and k[1] < ICON_MAX_SIZE])[-1]
sys.stdout.write("{}\n".format(w))
sys.stdout.write("{} {}\n".format(chosen_size[0], chosen_size[1]))
sys.stdout.write(''.join([struct.pack('>I', ((b<<8)&0xffffff00)|(b>>24)) for b in icons[chosen_size]]))
sys.stdout.flush()
except NoIconError:
pass
def initial_sync(self):
cookie = self.conn.core.QueryTree(self.root)
root_tree = cookie.reply()
for w in root_tree.children:
self.watch_window(w)
self.send_icon(w)
def watch_and_send_icons(self):
self.conn.core.ChangeWindowAttributesChecked(self.root,
xcb.xproto.CW.EventMask,
[xcb.xproto.EventMask.SubstructureNotify])
self.conn.flush()
self.initial_sync()
for ev in iter(self.conn.wait_for_event, None):
if isinstance(ev, xcb.xproto.CreateNotifyEvent):
self.window_queue.add(ev.window)
self.watch_window(ev.window)
elif isinstance(ev, xcb.xproto.MapNotifyEvent):
if ev.window in self.window_queue:
self.send_icon(ev.window)
self.window_queue.remove(ev.window)
elif isinstance(ev, xcb.xproto.PropertyNotifyEvent):
if ev.atom == self.atom_net_wm_icon:
self.send_icon(ev.window)
if __name__ == '__main__':
sender = IconSender()
sender.watch_and_send_icons()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment