Created
July 27, 2011 18:27
-
-
Save legastero/1110035 to your computer and use it in GitHub Desktop.
XMPPFlask MUC Bot using SleekXMPP
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
xmpp_wsgi_runner for xmppflask (or just XMPPWSGI) | |
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
This application is intended to be something like a web server bug for | |
XMPPWSGI and serve XMPPWSGI applications. It is kept simple, but | |
potentially can be rewritten to handle more load. Future plans are also | |
support of gevent-like coroutines, but we will need to try to import | |
gevent's monkey.patch_sockets() to PyPy somehow. | |
:copyright: (c) 2011 Kostyantyn Rybnikov <[email protected]> | |
:license: BSD. | |
""" | |
import sleekxmpp | |
import logging | |
log = logging.getLogger(__name__) | |
logging.basicConfig(level=logging.DEBUG) | |
class XMPPWSGIConnectionMUC(sleekxmpp.ClientXMPP): | |
def __init__(self, app, jid, password, room, nick): | |
sleekxmpp.ClientXMPP.__init__(self, jid, password) | |
self.app = app | |
self.room = room | |
self.nick = nick | |
self.register_plugin('xep_0045') | |
self.add_event_handler('session_start', self.start) | |
self.add_event_handler('message', self.message) | |
self.add_event_handler('groupchat_message', self.muc_message) | |
def start(self, session): | |
self.send_presence() | |
self.get_roster() | |
self['xep_0045'].joinMUC(self.room, self.nick, wait=True) | |
log.info('> bot started!') | |
def muc_message(self, msg): | |
if msg['mucnick'] != self.nick and msg['body']: | |
# As an example, only reply if the message starts with | |
# the bot's nickname and colon, as so: | |
# nickname: weather in Kiev | |
if not msg['body'].lower().startswith('%s: ' % self.nick.lower()): | |
return | |
environ = { | |
'MESSAGE': msg['body'].split(': ')[1], | |
'XMPP_JID': msg['from'].full | |
} | |
notification_queue = [] | |
resp = u"".join(self.app(environ, notification_queue)) | |
if resp: | |
self.send_message(mto=msg['from'].bare, mbody=resp, mtype='groupchat') | |
for jid, msg in notification_queue: | |
self.send_message(mto=jid, mbody=msg) | |
def message(self, msg): | |
if msg['body'] and msg['type'] != 'groupchat': | |
environ = { | |
'MESSAGE': msg['body'], | |
'XMPP_JID': msg['from'].bare | |
} | |
notification_queue = [] | |
resp = u"".join(self.app(environ, notification_queue)) | |
if resp: | |
self.send_message(mto=msg['from'], mbody=resp) | |
for jid, msg in notification_queue: | |
self.send_message(mto=jid, mbody=msg) | |
def main(): | |
import os | |
import sys | |
import argparse | |
def load_app_from_configstr(app_str): | |
path, app_varname = app_str.rsplit(u':', 1) | |
path = os.path.abspath(path) | |
if not os.path.exists(path): | |
print u'Path %s does not exist' % path | |
sys.exit(1) | |
if os.path.isdir(path): | |
print u'Path %s is a directory' % path | |
sys.exit(1) | |
dir_ = os.path.dirname(path) | |
filename = os.path.basename(path) | |
sys.path[0:0] = [dir_] | |
module = __import__(filename.rstrip('.py')) | |
app = getattr(module, app_varname) | |
return app | |
parser = ( | |
argparse.ArgumentParser( | |
description=u'XMPPWSGI server to run XmppFlask apps.')) | |
parser.add_argument( | |
'--jid', type=unicode, | |
help=u'jid of a bot. You might want to register one for your bot.') | |
parser.add_argument( | |
'--password', | |
required=False, | |
type=unicode, | |
help=u'password to that jid.') | |
parser.add_argument( | |
'--room', | |
required=False, | |
type=unicode, | |
help=u'Chat room to join') | |
parser.add_argument( | |
'--nick', | |
required=False, | |
type=unicode, | |
help=u'Nickname to use in the chat room') | |
parser.add_argument( | |
'app', | |
type=unicode, | |
help=(u'a path to application python file and variable in it separated ' | |
u'by ":". For example: ./cool_weather/weather.py:app means that ' | |
u'application is in file ./cool_weather/weather.py and stored in ' | |
u'variable called app')) | |
args = parser.parse_args() | |
app = load_app_from_configstr(args.app) | |
xmpp = XMPPWSGIConnectionMUC(app, args.jid, args.password, args.room, args.nick) | |
xmpp.connect() | |
xmpp.process(threaded=False) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment