Skip to content

Instantly share code, notes, and snippets.

@ramayac
Created June 13, 2011 17:24
Show Gist options
  • Save ramayac/1023241 to your computer and use it in GitHub Desktop.
Save ramayac/1023241 to your computer and use it in GitHub Desktop.
App Engine app to let two anonymous users talk to eachother via XMPP -- http://chatwithstrangers.appspot.com
#!/usr/bin/env python
import logging
from google.appengine.api import memcache
from google.appengine.api import xmpp
from google.appengine.ext import db
from google.appengine.ext import webapp
from google.appengine.ext.webapp import util
from google.appengine.ext.webapp import xmpp_handlers
class User(db.Model):
partner = db.SelfReferenceProperty()
blocked = db.ListProperty(db.Key)
class HtmlHandler(webapp.RequestHandler):
def get(self):
self.response.out.write(
"""<html><head><title>Chat with Strangers</title></head><body>
<h1>Send an IM to [email protected]</h1>
<ul><li>Send '/join' to connect with an anonymous user</li>
<li>Send '/next' to find another user</li>
<li>Send '/leave' to quit</li></ul>
<p>That's it!</p>
<small><a href="http://gist.github.com/344873">src</a></small></body></html>""")
class XmppHandler(xmpp_handlers.CommandHandler):
def connect_user(self, user):
other = db.Query(User).filter('partner =', None).fetch(2)
other = [u for u in other if u.key() != user.key()
and u.key() not in user.blocked]
if other:
other = other[0]
# tell the users you're connecting
logging.info("Connecting %s to %s"
% (user.key().name(), other.key().name()))
xmpp.send_message([user.key().name(), other.key().name()],
"***You are now connected.")
# connect the users
other.partner = user
other.put()
user.partner = other
user.put()
else:
# no one to connect to; tell the user to wait for a partner
logging.info("%s has no one to connect to" % user.key().name())
xmpp.send_message(user.key().name(),
"***Please wait while we find you a partner...")
def disconnect_user(self, user):
# tell the users they're being disconnecting
logging.info("Disconnecting %s from %s" % (user.key().name(),
user.partner.key().name()))
xmpp.send_message(user.partner.key().name(), "***Your partner left.")
xmpp.send_message(user.key().name(), "***You left your partner.")
# disconnect the users
user.partner.partner = None
user.partner.put()
user.blocked.append(user.partner.key())
user.partner = None
user.put()
def help_command(self, msg):
xmpp.send_message(msg.sender,
"""Welcome to Chat With Strangers!
* Send '/join' to connect with an anonymous user
* Send '/next' to find another user
* Send '/leave' to quit""")
def join_command(self, msg):
user = User.get_or_insert(msg.sender)
if user.partner:
xmpp.send_message(msg.sender, "***You are already connected")
else:
self.connect_user(user)
def next_command(self, msg):
# leave your current partner and find another one
user = User.get_or_insert(msg.sender)
if user.partner:
self.disconnect_user(user)
self.connect_user(user)
else:
# user isn't connected yet, just /join
self.join_command(self, msg)
def leave_command(self, msg):
user = User.get_by_key_name(msg.sender)
if user:
# leave your current partner then leave the app altogether
if user.partner:
self.disconnect_user(user)
user.delete()
xmpp.send_message(msg.sender, "***You are disconnected.")
def text_message(self, msg):
user = User.get_or_insert(msg.sender)
if user.partner:
# user has a partner, send the message
logging.info("Sending '%s' from %s to %s"
% (msg.body, user.key().name(), user.partner.key().name()))
xmpp.send_message(user.partner.key().name(), msg.body)
else:
xmpp.send_message(msg.sender, "***You are not connected yet. "
+ "Type '/join' or wait for someone to find you")
def main():
application = webapp.WSGIApplication([
('/_ah/xmpp/message/chat/', XmppHandler),
('/.*', HtmlHandler)
], debug=True)
util.run_wsgi_app(application)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment