Skip to content

Instantly share code, notes, and snippets.

@poi519
Last active February 2, 2017 14:08
Show Gist options
  • Save poi519/1d17abc01fe646ce2c7115ebed585b5a to your computer and use it in GitHub Desktop.
Save poi519/1d17abc01fe646ce2c7115ebed585b5a to your computer and use it in GitHub Desktop.
class Conf:
pass
xmpp = Conf()
tele = Conf()
xmpp.jid = "[email protected]"
xmpp.password = "xxxx"
xmpp.room = "[email protected]"
xmpp.listener_nick = "anonka"
tele.chat_id = -111111111
tele.listener_token = "token"
tele.anonymous_token = "token"
tele.named_bots = {"token" : ["poi", "iop*"],
"token" : ["nick"],
"token" : ["nick"],
"token" : ["nick"],
"token" : ["nick"]}
import telebot
import xmppbot
import config
import logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.DEBUG)
def main():
tele_pool = telebot.TelePool(config.tele.listener_token,
config.tele.anonymous_token,
config.tele.named_bots,
config.tele.chat_id)
xmpp_pool = xmppbot.XmppPool(config.xmpp.jid,
config.xmpp.password,
config.xmpp.room,
config.xmpp.listener_nick)
tele_pool.xmpp = xmpp_pool
xmpp_pool.tele = tele_pool
if __name__ == "__main__":
main()
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters
from telegram import MessageEntity
import logging
logger = logging.getLogger(__name__)
class TeleBot:
def error(self, bot, update, error):
logger.warn('Update "%s" caused error "%s"' % (update, error))
def __init__(self, token):
self.token = token
self.updater = Updater(token)
self.dp = self.updater.dispatcher
self.dp.add_error_handler(self.error)
self.bot = self.dp.bot
self.updater.start_polling()
class TelePool:
def __init__(self, listener_token, anon_token, named_bots_config, chat_id):
self.chat_id = chat_id
self.listener = TeleBot(listener_token)
self.listener.dp.add_handler(MessageHandler(Filters.text, self.listen))
self.anonymous = TeleBot(anon_token)
self.named_bots = {}
for token, nicks in named_bots_config.items():
bot = TeleBot(token)
for nick in nicks:
self.named_bots[nick] = bot
def listen(self, bot, update):
logger.info(update.message.text)
logger.info(update.message.chat_id)
if hasattr(self, 'xmpp'):
m_from = update.message.from_user.username
self.xmpp.route_message(m_from, update.message.text)
def route_message(self, m_from, text):
if m_from in self.named_bots:
self.named_bots[m_from].bot.sendMessage(chat_id=self.chat_id, text=text)
else:
self.anonymous.bot.sendMessage(chat_id=self.chat_id, text=text)
import sys
import logging
import getpass
import sleekxmpp
raw_input = input
logger = logging.getLogger(__name__)
class MUCBot(sleekxmpp.ClientXMPP):
def __init__(self, jid_without_resource, password, room, nick):
jid = jid_without_resource + "/" + nick
sleekxmpp.ClientXMPP.__init__(self, jid, password)
self.room = room
self.nick = nick
self.add_event_handler("session_start", self.start)
def start(self, event):
self.plugin['xep_0045'].joinMUC(self.room,
self.nick,
wait=True)
def send_muc_message(self, text):
self.send_message(mto=self.room, mbody=text, mtype='groupchat')
class XmppPool:
def __init__(self, jid, password, muc, listener_nick):
self.jid = jid
self.password = password
self.muc = muc
self.listener_nick = listener_nick
self.listener = self.create_bot(listener_nick)
self.listener.add_event_handler("groupchat_message", self.listen)
self.bots = {}
def create_bot(self, nick):
xmpp = MUCBot(self.jid, self.password, self.muc, nick)
xmpp.register_plugin('xep_0030') # Service Discovery
xmpp.register_plugin('xep_0045') # Multi-User Chat
xmpp.register_plugin('xep_0199') # XMPP Ping
if xmpp.connect(("95.108.194.209", 5222)):
xmpp.process(block=False)
logger.info("Done")
return xmpp
else:
logger.error("Unable to connect.")
def listen(self, msg):
logger.info(msg['mucnick'])
logger.info(msg['body'])
if hasattr(self, 'tele') and msg['mucnick'][:-1] not in self.bots:
self.tele.route_message(msg['mucnick'], msg['body'])
def route_message(self, m_from, text):
if m_from in self.bots:
self.bots[m_from].send_muc_message(text)
else:
bot = self.create_bot(m_from + '+')
self.bots[m_from] = bot
bot.send_muc_message(text)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment