Skip to content

Instantly share code, notes, and snippets.

@lun-4
Created June 20, 2017 23:46
Show Gist options
  • Save lun-4/08de5328ec3a7b5ccd221fbca21351d9 to your computer and use it in GitHub Desktop.
Save lun-4/08de5328ec3a7b5ccd221fbca21351d9 to your computer and use it in GitHub Desktop.
Raw bot that works with python 3.5, requires aiohttp and websockets. Made with <3 by Memework™
#!/usr/bin/env python3.5
# vi suck my dick vim is better
import websockets
import asyncio
import aiohttp
import logging
import json
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger(__name__)
# yes we are doing global variables
# i dont want to do classes fuck off
seq = None
# User received in the thing called ready
user = None
# the best token
# why the fuck cant u shift+intert
TOKEN = None
with open('token', 'r') as f:
TOKEN = f.read().strip()
API_BASE = 'https://discordapp.com/api'
# fuck classes
session = aiohttp.ClientSession()
HEADERS = {
"Authorization": "Bot {}".format(TOKEN),
"User-Agent": "DiscordBot (LunaDies, 0.0.1)",
"Content-Type": "application/json",
}
class OP:
DISPATCH = 0
HEARTBEAT = 1
IDENTIFY = 2
HENLO = 10
HEARTBEAT_ACK = 11
IDENTIFY_DATA = {
'token': TOKEN,
'properties': {
'$os': 'linux',
'$browser': 'luna_dies [Real] [Not ClickBait] [not Fake]',
'$device': 'luna_dies [Real] [Not ClickBait] [not Fake]',
'$referrer': '',
'$referring_domain': '',
},
'compress': False,
'large_threshold': 50,
'shard': [0, 1],
}
async def sendjson(ws, obj):
await ws.send(json.dumps(obj))
async def recvjson(ws):
raw = await ws.recv()
log.debug('Websocket receive: {}'.format(raw))
try:
j = json.loads(raw)
return j
except Exception as err:
log.warning('fuck json')
return None
async def heartbeater(ws, period):
"""Send OP 1 Heartbeat packets"""
global seq
#print("HEARTBEAT A A A A A A")
try:
while True:
await sendjson(ws, {'op': 1, 'd': seq})
await asyncio.sleep(period/1000)
except asyncio.CancelledError:
log.warning('Heartbeater got cancelled')
async def http_post(url, data):
log.info('Requesting {}/{}'.format(API_BASE, url))
print(url, HEADERS, json.dumps(data))
async with session.post("{}/{}".format(API_BASE, url), headers=HEADERS, data=json.dumps(data)) as r:
if r.status == 200:
log.info('All good')
return r
else:
log.info('not good: {}'.format(repr(r)))
async def handle_message(payload):
"""Processes a MESSAGE_CREATE payload form the gateway."""
data = payload['d']
# data is a message object
content = data['content']
if content == '!ping':
await http_post('channels/{}/messages'.format(data['channel_id']), {'content': 'pong'})
async def _handle(payload):
global user
op = payload['op']
if op == OP.DISPATCH:
evt_name = payload['t']
data = payload['d']
if evt_name == 'READY':
user = data['user']
log.info('We are READY! {}#{}'.format(user['username'], user['discriminator']))
elif evt_name == 'MESSAGE_CREATE':
await handle_message(payload)
else:
log.info('Unhandled event: {}'.format(evt_name))
else:
log.info('Unhandled OP {}'.format(op))
async def handle_packet(loop, ws):
"""Start an infinite loop waiting for packets from the gateway."""
global seq
# Identify
await sendjson(ws, {
'op': OP.IDENTIFY,
'd': IDENTIFY_DATA
})
while True:
payload = await recvjson(ws)
op = payload['op']
if 's' in payload:
seq = payload['s']
await _handle(payload)
async def main_entry(loop):
"""Main entry point for this gateway handler
This will call necessary Discord endpoints, get a gateway URL
and start heartbeating.
"""
log.info('Starting')
gateway_url = None
async with session.get("{}/gateway".format(API_BASE)) as resp:
j = await resp.json()
log.debug('gateway URL: {}'.format(j))
gateway_url = j['url']
gateway_url = '{}?v=5&encoding=json'.format(gateway_url)
log.info('Connecting to gateway: {}'.format(gateway_url))
async with websockets.connect(gateway_url) as ws:
# main WS logic goes here
log.info('Waiting for OP 10 Hello')
henlo = await recvjson(ws)
if henlo['op'] != OP.HENLO: log.warning("fuck discord")
hb_interval = henlo['d']['heartbeat_interval']
log.info('Creating heartbeater')
loop.create_task(heartbeater(ws, hb_interval))
# we should keep the loop busy so that we can FUCKING HEARTBEAT
await handle_packet(loop, ws)
def main():
loop = asyncio.get_event_loop()
loop.create_task(main_entry(loop))
loop.run_forever()
if __name__ == '__main__':
main()
@relative2
Copy link

fuck

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment