Skip to content

Instantly share code, notes, and snippets.

@e000
Created February 13, 2011 18:32
Show Gist options
  • Select an option

  • Save e000/824929 to your computer and use it in GitHub Desktop.

Select an option

Save e000/824929 to your computer and use it in GitHub Desktop.
"""
Omegle twisted client
"""
DISCONNECTED = 0
CONNECTING = 1
WAITING = 2
CONNECTED = 3
from random import choice
from twisted.internet.defer import DeferredLock, inlineCallbacks, CancelledError, returnValue
from Core.ZalgoUtil.CancellableClient import getPage
from Core.ZalgoUtil.ModUtil import json_decode, utf8
from urllib import urlencode
from twisted.internet import reactor
import re
_userAgents = [
'Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.2.10) Gecko/20100915 Ubuntu/10.04 (lucid) Firefox/3.6.10',
'Mozilla/5.0 (X11; U; Linux i686; en-US) AppleWebKit/534.16 (KHTML, like Gecko) Chrome/10.0.648.45 Safari/534.16'
]
def getRandomUserAgent():
"Gives us a random user-agent to spoof with!"
return choice(_userAgents)
class AlreadyRunningError(Exception):
pass
class NotConnectedError(Exception):
pass
class CaptchaNotRequired(Exception):
pass
class OmegleBot():
DISCONNECTED = 0
CONNECTING = 1
WAITING = 2
CONNECTED = 3
_serverRegex = re.compile('\<i?frame src="(.*?)"\>')
def __init__(self, typingCallback = None, stoppedTypingCallback = None,
disconnectCallback = None, messageCallback = None,
recaptchaFailedCallback = None, recaptchaRequiredCallback = None,
connectCallback = None, waitingCallback = None):
self.typingCallback = typingCallback
self.stoppedTypingCallback = stoppedTypingCallback
self.disconnectCallback = disconnectCallback
self.messageCallback = messageCallback
self.recaptchaFailedCallback = recaptchaFailedCallback
self.recaptchaRequiredCallback = recaptchaRequiredCallback
self.connectCallback = connectCallback
self.waitingCallback = waitingCallback
self.status = DISCONNECTED
self.server = None
self.id = None
self.lock = DeferredLock()
self.activeRequests = set()
self.challenge = None
def disconnect(self):
oldStatus = self.status
if self.status in (WAITING, CONNECTED):
self.getPage('disconnect', addToActive = False, data = {'id': self.id}).addErrback(lambda r: None) ## /dev/null it lol
if self.status == DISCONNECTED:
return
self.status = DISCONNECTED
self.id = None
self.challenge = None
self.server = None
self._cancelAllRequests()
self.onDisconnect()
def _cancelAllRequests(self):
self.lock.waiting[:] = []
for d in list(self.activeRequests):
d.cancel()
self.activeRequests.clear()
def getPage(self, url, addToActive = True, data = None, *args, **kwargs):
def removeFromActive(r):
self.activeRequests.discard(d)
return r
if not url.startswith('http://') and self.server:
url = self.server + url
if data is not None:
data = urlencode(data)
kwargs.update({
'method': 'POST',
'postdata': data,
'headers': {
'Content-Type': 'application/x-www-form-urlencoded',
'Content-Length': '%i' % len(data)
}
})
d = getPage(url, agent = self.userAgent, *args, **kwargs)
if addToActive:
self.activeRequests.add(d)
d.addBoth(removeFromActive)
return d
def say(self, message):
if self.status != CONNECTED:
raise NotConnectedError()
return self._doLockedCommand(
'send', data = {'id': self.id, 'msg': message}
).addCallback(lambda r: True if r == 'win' else False)
def typing(self):
if self.status != CONNECTED:
raise NotConnectedError()
self._doLockedCommand(
'typing', data = {'id': self.id}
)
def stoppedTyping(self):
if self.status != CONNECTED:
raise NotConnectedError()
self._doLockedCommand(
'stoppedtyping', data = {'id': self.id}
)
def solveCaptcha(self, solution):
if not self.challenge:
raise CaptchaNotRequired()
# do the stuff here
def _doLockedCommand(self, url, data):
l = self.lock.acquire()
def gotLock(lock):
if self.status == CONNECTED:
def releaseLock(r):
lock.release()
return r
d = self.getPage(url, data = data)
d.addBoth(releaseLock)
return d
else:
lock.release()
return l.addCallback(gotLock)
@inlineCallbacks
def connect(self):
if self.status != DISCONNECTED:
raise AlreadyRunningError()
self.userAgent = getRandomUserAgent()
self.status = CONNECTING
homePage = yield self.getPage('http://omegle.com/')
match = self._serverRegex.search(homePage)
if not match:
raise ValueError("Could not find a server to connect to!")
else:
self.server = match.group(1)
id = yield self.getPage('start?rcs=1&spid=')
self.id = json_decode(id)
self.status = WAITING
self.doEvents()
returnValue((self.id, self.server))
def doEvents(self):
if self.status not in (CONNECTED, WAITING):
return
def gotEvents(response):
events = json_decode(response)
if events is None:
self.disconnect()
else:
for event in events:
event, params = event[0], event[1:]
callback = getattr(self, 'EVENT_%s' % event, None)
if callback:
callback(params)
self.doEvents()
def gotError(error):
if not isinstance(error.value, CancelledError):
self.disconnect()
self.onError(error)
return self.getPage('events', data = {
'id': self.id
}).addCallbacks(gotEvents, gotError)
def EVENT_waiting(self, params):
self.status = WAITING
self.runCallback(self.waitingCallback)
def EVENT_connected(self, params):
self.status = CONNECTED
self.runCallback(self.connectCallback)
def EVENT_gotMessage(self, params):
self.runCallback(self.messageCallback, params)
def EVENT_typing(self, params):
self.runCallback(self.typingCallback)
def EVENT_stoppedTyping(self, params):
self.runCallback(self.stoppedTypingCallback)
def EVENT_strangerDisconnected(self, params):
self.disconnect()
def EVENT_recaptchaRequired(self, params):
print 'captcha', params
self.runCallback(self.recaptchaRequiredCallback, params)
def EVENT_recaptchaRejected(self, params):
print 'badcaptcha', params
self.runCallback(self.recaptchaFailedCallback, params)
def onDisconnect(self):
self.runCallback(self.disconnectCallback)
def onError(self, error):
error.printDetailedTraceback()
def runCallback(self, callback, params = None):
if callback is None:
return
try:
callback(self, params)
except:
from twisted.python import failure
failure.Failure().printBriefTraceback()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment