Created
November 27, 2013 13:27
-
-
Save roman-yepishev/7675658 to your computer and use it in GitHub Desktop.
Telepathy/desktopcouch logger POC
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
"""Telepathy CouchDB Logger""" | |
from dbus.mainloop.glib import DBusGMainLoop | |
import gobject | |
import dbus | |
#for observer thing | |
import telepathy | |
import telepathy.server | |
# for datetime format | |
from datetime import datetime | |
from telepathy.interfaces import CLIENT, \ | |
CLIENT_OBSERVER, \ | |
CHANNEL_INTERFACE, \ | |
CHANNEL_INTERFACE_MESSAGES, \ | |
CONNECTION, \ | |
CONNECTION_INTERFACE_CONTACTS, \ | |
CONNECTION_INTERFACE_ALIASING | |
from telepathy.server.channel import CHANNEL_TYPE_TEXT | |
from desktopcouch.records.server import CouchDatabase | |
from desktopcouch.records.record import Record | |
from telepathy.server import DBusProperties | |
from telepathy._generated.Client_Observer import ClientObserver | |
RECORD_TYPE = 'http://www.rtg.in.ua/empathy-im-couchdb' | |
CHANNEL_INTERFACE_CHANNEL_TYPE = CHANNEL_INTERFACE + '.ChannelType' | |
CHANNEL_INTERFACE_TARGET_HANDLE = CHANNEL_INTERFACE + '.TargetHandle' | |
class InstantMessage(): | |
"""Empty class for every instant message""" | |
def __init__(self): | |
self.sender = None | |
self.recepient = None | |
self.time = None | |
self.message = None | |
self.protocol = None | |
class InstantMessageStorage(): | |
"""Simple interface for message storage, CouchDB""" | |
def __init__(self): | |
self.couchdb = CouchDatabase("im", create=True) | |
def put(self, message): | |
"""Adds the message to CouchDB database""" | |
message_time = datetime.fromtimestamp(message.time).isoformat() | |
record = Record({ "from" : message.sender, \ | |
"to" : message.recepient, \ | |
"time" : message_time, \ | |
"message" : message.message, \ | |
"protocol": message.protocol, \ | |
}, | |
record_type = RECORD_TYPE) | |
record_id = self.couchdb.put_record(record) | |
return record_id | |
print "CouchdbLogger()" | |
class CouchdbLogger(ClientObserver, DBusProperties): | |
"""Couchdb Logger, implementation is not tied to CouchDB | |
so it is probably a bad name""" | |
def __init__(self, *args): | |
print "Calling parent ClientObserver methods" | |
ClientObserver.__init__(self, *args) | |
self.channel_cache = {} | |
# FIXME: is it ok to get session bus again? | |
# some parent class should define that as well | |
self.bus = dbus.SessionBus() | |
self._interfaces = set() | |
self._interfaces.add(CLIENT) | |
self._interfaces.add(CLIENT_OBSERVER) | |
telepathy.server.DBusProperties.__init__(self) | |
self._implement_property_get(CLIENT, { | |
'Interfaces': lambda: [ CLIENT_OBSERVER ], | |
}) | |
self._implement_property_get(CLIENT_OBSERVER, { | |
'ObserverChannelFilter': lambda: dbus.Array([ | |
dbus.Dictionary({ | |
CHANNEL_INTERFACE_CHANNEL_TYPE: CHANNEL_TYPE_TEXT | |
}, signature = 'sv') | |
], signature='a{sv}') | |
}) | |
self.storage = InstantMessageStorage() | |
# 1. set up the signal handlers | |
self.bus.add_signal_receiver( | |
handler_function = self.message_sent_handler, \ | |
dbus_interface = CHANNEL_INTERFACE_MESSAGES, \ | |
signal_name = 'MessageSent', \ | |
path_keyword = 'path' | |
) | |
self.bus.add_signal_receiver( | |
handler_function = self.message_received_handler, \ | |
dbus_interface = CHANNEL_INTERFACE_MESSAGES, \ | |
signal_name = 'MessageReceived', \ | |
path_keyword = 'path' | |
) | |
# 2. let it run | |
def message_sent_handler(self, content=None, flags=0, \ | |
message_token='', path=None): | |
"""Called for every MessageSent signal received""" | |
header = content[0] | |
parts = content[1:] | |
if not path in self.channel_cache: | |
# TODO: request channel details since we | |
# may join the conversation later than it | |
# was started | |
print "MessageSent: path {0} is not in channel cache".format(path) | |
return | |
channel_info = self.channel_cache[path] | |
sender_info = channel_info['self-handle'] | |
recepient_info = channel_info['target-handle'] | |
sender_id = sender_info[CONNECTION + '/contact-id'] | |
recepient_id = recepient_info[CONNECTION + '/contact-id'] | |
content = '' | |
for part in parts: | |
if not "content-type" in part: | |
continue | |
if part["content-type"] == "text/plain": | |
content += part["content"] | |
im = InstantMessage() | |
im.sender = sender_id | |
im.recepient = recepient_id | |
im.time = header['message-sent'] | |
im.protocol = channel_info['protocol'] | |
im.message = content | |
record_id = self.storage.put(im) | |
print "{0} -> {1}\n\tRecord Id: {2}".format(sender_id, \ | |
recepient_id, \ | |
record_id) | |
def message_received_handler(self, message=None, path=None): | |
"""Called for every MessageReceived signal received""" | |
header = message[0] | |
parts = message[1:] | |
if "delivery-error" in header and header["delivery-error"] == 1: | |
print "Delivery errors are not logged" # FIXME: or should they? | |
return | |
# FIXME: Duplicating code with sent handler | |
if not path in self.channel_cache: | |
print 'MessageReceived: path {0} is ' \ | |
'not in channel_cache'.format(path) | |
return | |
channel_info = self.channel_cache[path] | |
sender_info = channel_info['target-handle'] | |
recepient_info = channel_info['self-handle'] | |
sender_id = sender_info[CONNECTION + '/contact-id'] | |
recepient_id = recepient_info[CONNECTION + '/contact-id'] | |
content = '' | |
for part in parts: | |
if not "content-type" in part: | |
continue | |
if part["content-type"] == "text/plain": | |
content += part["content"] | |
im = InstantMessage() | |
im.sender = sender_id | |
im.recepient = recepient_id | |
im.time = header['message-received'] | |
im.protocol = channel_info['protocol'] | |
im.message = content | |
record_id = self.storage.put(im) | |
print "{0} -> {1}\n\tRecord Id: {2}".format(sender_id, \ | |
recepient_id, \ | |
record_id) | |
def ObserveChannels(self, account, connection, channels, \ | |
dispatch_operation, requests_satisfied, \ | |
observer_info): | |
"""DBus method called by Telepathy""" | |
print "Incoming channels on {0}".format(connection) | |
for object_path, props in channels: | |
self.register_channel(account, connection, object_path, props) | |
def register_channel(self, account, connection, channel, props): | |
"""Registers the channel in internal cache, handles pending messages""" | |
print "Registering channel: {0}".format(channel) | |
self.channel_cache[channel] = {} | |
channel_info = self.channel_cache[channel] | |
# Clients MAY parse the object path to determine | |
# the connection manager name and the protocol, | |
# but MUST NOT attempt to parse the account part. | |
# Connection managers MAY use any unique string for this part. | |
#/org/freedesktop/Telepathy/Connection/gabble/jabber/[email protected] | |
# Connection Manager ------------------^ ^--- Protocol | |
#0 1 2 3 4 5 6 7 | |
channel_info['protocol'] = connection.split('/')[6] | |
connection_bus_name = connection[1:].replace("/", ".") | |
print "Connection Bus Name: {0}".format(connection_bus_name) | |
# We need to find out who is who in this channel | |
# the local party handle is SelfHandle, | |
# the remote party handle arrived in TargetHandle propery | |
account_proxy = self.bus.get_object(connection_bus_name, \ | |
connection) | |
self_handle = account_proxy.Get(CONNECTION, \ | |
'SelfHandle') | |
target_handle = props[CHANNEL_INTERFACE_TARGET_HANDLE] | |
# FIXME: we might not need to call GetContactAttributes | |
# immediately since some of the relevant bits are already | |
# available in props | |
connection_iface = dbus.Interface( \ | |
account_proxy, \ | |
dbus_interface = CONNECTION_INTERFACE_CONTACTS) | |
handle_info = connection_iface.GetContactAttributes( | |
# Handles | |
[ self_handle, target_handle ], \ | |
# Interfaces | |
[ CONNECTION_INTERFACE_ALIASING ], \ | |
# Hold | |
# If true, all handles in the result | |
# have been held on behalf of the | |
# calling process, as if by a call | |
# to Connection.HoldHandles. | |
True ) | |
# iface=org.freedesktop.Telepathy.Connection.Interface.Contacts | |
# path=/org/freedesktop/Telepathy/Connection/gabble/\ | |
# jabber/ | |
# roman_2eyepishev_40gmail_2ecom_2fTelepathy_2e1ac6c8e0 | |
# (connection) | |
# not intercepting KeyError for now | |
# do we really need to carry the int handle around? | |
channel_info['self-handle'] = handle_info[self_handle] | |
channel_info['target-handle'] = handle_info[target_handle] | |
# Getting channel proxy | |
channel_proxy = self.bus.get_object(connection_bus_name, channel) | |
pending_messages = channel_proxy.Get(CHANNEL_INTERFACE_MESSAGES, \ | |
'PendingMessages') | |
# Now we have something to do with these pending messages | |
for message in pending_messages: | |
print "Processing pending message {0}".format(message) | |
self.message_received_handler(message, path=channel) | |
def publish(): | |
"""Registers the bus name and creates the logger""" | |
print "Connecting to session bus..." | |
bus = dbus.SessionBus() | |
bus_name = '.'.join([CLIENT, 'CouchdbLogger']) | |
object_path = '/' + bus_name.replace('.', '/') | |
print "Registering bus name {0}".format(bus_name) | |
bus_name = dbus.service.BusName(bus_name, bus=bus) | |
print "Starting CouchdbLogger()" | |
CouchdbLogger(bus_name, object_path) | |
def main(): | |
"""Application entry point""" | |
print "Initializing Telepathy CouchDB logger..." | |
DBusGMainLoop(set_as_default=True) | |
gobject.timeout_add(0, publish) | |
loop = gobject.MainLoop() | |
loop.run() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment