Last active
May 26, 2021 17:42
-
-
Save 4Kaylum/d32ce0e4e8bee4f7a84bb869f0398641 to your computer and use it in GitHub Desktop.
A small UI for the Python Discord server's pixels event
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import typing | |
from datetime import datetime as dt, timedelta | |
import json | |
import sys | |
import multiprocessing | |
from tkinter.colorchooser import askcolor | |
import requests | |
import pygame | |
TOKEN = "" | |
class RateLimit(object): | |
def __init__(self, limit, remaining, reset): | |
self.limit = limit | |
self.remaining = remaining | |
self.reset = reset | |
def decrement(self): | |
if self.time_expired: | |
self.remaining = self.limit | |
self.remaining -= 1 | |
@property | |
def time_expired(self): | |
return dt.utcnow() < self.reset | |
@property | |
def limit_expired(self): | |
return self.remaining <= 0 | |
def wait_until_reset(self): | |
if self.limit_expired: | |
return | |
remaining_time = (self.reset - dt.utcnow()).total_seconds() | |
if remaining_time <= 0: | |
return | |
print(f"Sleeping for {remaining_time} seconds") | |
time.sleep(remaining_time) | |
@classmethod | |
def from_request(cls, request, add_one: bool = True): | |
headers = request.headers | |
limit = headers.get("requests-limit") | |
if limit is None: | |
return | |
limit = int(limit) | |
remaining = headers.get("requests-remaining") | |
if remaining is None: | |
return None | |
remaining = int(remaining) | |
remaining += int(add_one) | |
reset_seconds = headers.get("requests-reset", headers.get("cooldown-reset")) | |
if reset_seconds is None: | |
return None | |
reset_seconds = float(reset_seconds) | |
return cls( | |
limit, remaining, dt.utcnow() + timedelta(seconds=reset_seconds), | |
) | |
def __repr__(self): | |
return f"RateLimit(limit={self.limit}, remaining={self.remaining}, reset={self.reset})" | |
class Pixel(object): | |
def __init__(self, x: int, y: int, colour: int): | |
self.x = x | |
self.y = y | |
self.colour = colour | |
@property | |
def rgb(self): | |
all_colour = int(self.colour, 16) | |
r = (all_colour >> (2 * 8)) & 0xff | |
g = (all_colour >> (1 * 8)) & 0xff | |
b = (all_colour >> (0 * 8)) & 0xff | |
return (r, g, b,) | |
@property | |
def location(self): | |
return (self.x, self.y,) | |
def __repr__(self): | |
return f"Pixel(x={self.x}, y={self.y}, colour={self.colour})" | |
class PixelsClient(object): | |
BASE = "https://pixels.pythondiscord.com" | |
def __init__(self, token: str): | |
self.token = token | |
self._size = None | |
self._pixels = list() | |
self._rate_limit = dict() | |
@property | |
def headers(self): | |
return { | |
"Authorization": f"Bearer {self.token}" | |
} | |
def request(self, method, path, **kwargs): | |
self.wait_for_rate_limit(method, path) | |
meth = getattr(requests, method.lower()) | |
site = meth(self.BASE + path, **kwargs, headers=self.headers) | |
self._update_ratelimit(method, path, site) | |
print(f"{method.upper()} {path} - {site.status_code} ({len(site.content):,} bytes)") | |
if site.status_code == 429: | |
return self.request(method, path, **kwargs) | |
return site.content | |
def request_json(self, *args, **kwargs): | |
data = self.request(*args, **kwargs) | |
return json.loads(data) | |
def wait_for_rate_limit(self, method, path): | |
rate_limit = self._rate_limit.get((method.upper(), path)) | |
if not rate_limit: | |
return | |
if rate_limit.time_expired: | |
return | |
if rate_limit.limit_expired: | |
rate_limit.wait_until_reset() | |
def _update_ratelimit(self, method, path, request): | |
rate_limit = self._rate_limit.get((method.upper(), path)) | |
if rate_limit is None or rate_limit.time_expired: | |
rate_limit = self._rate_limit.setdefault( | |
(method.upper(), path), | |
RateLimit.from_request(request), | |
) | |
if rate_limit is None: | |
return | |
rate_limit.decrement() | |
def get_size(self) -> typing.Tuple[int]: | |
""" | |
Get the size of the board. | |
""" | |
if self._size: | |
return self._size | |
data = self.request_json("GET", "/get_size") | |
self._size = [data['width'], data['height']] | |
print(f"Size found - {self._size}") | |
return self._size | |
def get_pixel(self, x: int, y: int) -> Pixel: | |
""" | |
Get a pixel from the board | |
""" | |
params = {"x": x, "y": y} | |
data = self.request_json("GET", "/get_pixel", params=params) | |
colour = data.pop("rgb") | |
return Pixel(**data, colour=colour) | |
def get_all_pixels(self) -> typing.List[Pixel]: | |
""" | |
Get a pixel from the board | |
""" | |
# Run our requests | |
size = self.get_size() | |
content = self.request("GET", "/get_pixels") | |
# Parse the string | |
x, y = 0, 0 | |
pixels = [] | |
while content: | |
colour = "".join([format(hex(i)[2:], "0>2") for i in content[:3]]) | |
pixel = Pixel(x, y, colour) | |
pixels.append(pixel) | |
x += 1 | |
if x >= size[0]: | |
x = 0 | |
y += 1 | |
content = content[3:] | |
self._pixels = pixels | |
return pixels | |
def set_pixel(self, x: int, y: int, colour: int) -> str: | |
""" | |
Sets the colour of a pixel on the board. | |
""" | |
json = {"x": x, "y": y, "rgb": format(hex(colour)[2:], "0>6")} | |
data = self.request_json("POST", "/set_pixel", json=json) | |
print(data) | |
def ask_colour_set_pixel(client, mouse_location): | |
""" | |
Ask the user for a colour to set the pixel at `mouse_location` to. | |
""" | |
pixel_location = (mouse_location[0] // 10), (mouse_location[1] // 10) | |
_, colour = askcolor() | |
if colour is None: | |
print("Cancelled colour set") | |
return | |
colour = colour.lstrip("#") | |
print(f"Setting pixel at {pixel_location} to {colour}") | |
client.set_pixel(pixel_location[0], pixel_location[1], int(colour, 16)) | |
def main(): | |
# Make a pixel client | |
client = PixelsClient(TOKEN) | |
size = client.get_size() | |
# Init Pygame | |
pygame.init() | |
screen = pygame.display.set_mode((size[0] * 10, size[1] * 10)) | |
clock = pygame.time.Clock() | |
# Main window loop | |
client.get_all_pixels() # This'll get all of the pixels and store them in :attr:`_pixels`. | |
get_new_pixels_process: multiprocessing.Process = None | |
set_pixel_process: multiprocessing.Process = None | |
mouse_location = None | |
while True: | |
# Handle events | |
for event in pygame.event.get(): | |
# See if the user wants to quit | |
if event.type == pygame.QUIT: | |
if get_new_pixels_process: | |
get_new_pixels_process.terminate() | |
if set_pixel_process: | |
set_pixel_process.terminate() | |
sys.exit(0) | |
# See if they want to refresh the grid | |
elif event.type == pygame.KEYDOWN and event.key == pygame.K_SPACE: | |
if get_new_pixels_process and get_new_pixels_process.is_alive(): | |
print("Already waiting on grabbing new pixels") | |
else: | |
print("Space detected - grabbing new pixels") | |
get_new_pixels_process = multiprocessing.Process(None, client.get_all_pixels) | |
get_new_pixels_process.start() | |
# See if they want to set a new pixel | |
elif event.type == pygame.MOUSEBUTTONDOWN and event.button == 1: | |
set_pixel_process = multiprocessing.Process(None, ask_colour_set_pixel, args=(client, mouse_location)) | |
set_pixel_process.start() | |
# Mousemove - highlight hovered over pixel | |
elif event.type == pygame.MOUSEMOTION: | |
mouse_location = pygame.mouse.get_pos() | |
# Get pixels to show on screen | |
for p in client._pixels: | |
rect = pygame.Rect(p.x * 10, p.y * 10, 10, 10) | |
pygame.draw.rect(screen, p.rgb, rect) | |
# Draw the mouse highlight box | |
if mouse_location: | |
rect = pygame.Rect((mouse_location[0] // 10) * 10, (mouse_location[1] // 10) * 10, 10, 10) | |
pygame.draw.rect(screen, (255, 255, 255), rect, 2) | |
# Update display | |
pygame.display.flip() | |
# Tell the window to tick the FPS counter | |
clock.tick(60) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment