Last active
March 19, 2020 15:45
-
-
Save limitedeternity/8b26c6c9d78c4b1447cd4cf4b3b74e95 to your computer and use it in GitHub Desktop.
WellDungeon Auto-Buy
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
from collections import deque | |
from copy import deepcopy | |
from datetime import datetime, timedelta | |
from random import getrandbits | |
from re import search | |
from signal import signal, SIGINT, SIGTERM | |
from sys import exit | |
from time import sleep | |
from threading import Thread | |
import vk_api | |
class Call: | |
def __init__(self, fn, *args, **kwargs): | |
self.fn = fn | |
self.args = args | |
self.kwargs = kwargs | |
def __call__(self, *ignored_args, **ignored_kwargs): | |
return self.fn(*self.args, **self.kwargs) | |
class TimedExecutionQueue: | |
def __init__(self, times, interval): | |
self.queue = deque() | |
self.times = times | |
self.interval = interval | |
self.teq_state = [None] * 1000 | |
self.teq_state_id = 0 | |
t = Thread(target=self._monitor) | |
t.daemon = True | |
t.start() | |
def append(self, fn, assign_state_id=False): | |
if assign_state_id: | |
if self.teq_state_id == 999: | |
self.teq_state_id = 0 | |
self.queue.append([fn, self.teq_state_id]) | |
self.teq_state_id += 1 | |
return self.teq_state_id - 1 | |
else: | |
self.queue.append([fn, None]) | |
def awt(self, state_id): | |
while self.teq_state[state_id] == None: | |
sleep(0.1) | |
returned_value = deepcopy(self.teq_state[state_id]) | |
self.teq_state[state_id] = None | |
return returned_value | |
def _monitor(self): | |
function_calls = 0 | |
last_finished_period = datetime.now() - timedelta(seconds=self.interval) | |
while True: | |
if function_calls > 0 and function_calls < self.times and (datetime.now() - last_finished_period).total_seconds() >= self.interval: | |
function_calls = 0 | |
for [fn, state_id] in list(self.queue): | |
returned_value = fn() | |
if state_id != None: | |
self.teq_state[state_id] = returned_value | |
self.queue.popleft() | |
function_calls += 1 | |
if function_calls == self.times: | |
sleep(self.interval) | |
function_calls = 0 | |
sleep(0.1) | |
if function_calls > 0 and function_calls < self.times: | |
last_finished_period = datetime.now() | |
def auth_handler(): | |
key = input("Enter authentication code: ").strip() | |
remember_device = True | |
return key, remember_device | |
def main(): | |
login, password = "логин", "пароль" | |
vk_session = vk_api.VkApi( | |
login, | |
password, | |
app_id=2685278, | |
scope=339968, | |
auth_handler=auth_handler | |
) | |
try: | |
vk_session.auth() | |
except vk_api.AuthError as error_msg: | |
print(error_msg) | |
return | |
vk = vk_session.get_api() | |
API_QUEUE = TimedExecutionQueue(times=3, interval=1) | |
BUY_QUEUE = TimedExecutionQueue(times=1, interval=12) | |
COMMENTS_SEEN = [] | |
GOODS_NEEDED = [{"name": "Пещерный корень", "upper_price_bound": 100}] | |
signal(SIGINT, Call(exit, 0)) | |
signal(SIGTERM, Call(exit, 0)) | |
while True: | |
try: | |
if len(COMMENTS_SEEN) > 100: | |
COMMENTS_SEEN = COMMENTS_SEEN[20:] | |
comments = API_QUEUE.awt(API_QUEUE.append(Call(vk.board.getComments, group_id=182985865, topic_id=40112239, count=20, sort="desc"), assign_state_id=True))["items"][::-1] | |
comments_filtered = list(filter(lambda c: c not in COMMENTS_SEEN, comments)) | |
COMMENTS_SEEN += comments_filtered | |
for comment in comments_filtered: | |
matched_good_object = None | |
matched_good_regex = None | |
for good in GOODS_NEEDED: | |
regex = search(rf"(\d+\* )?{good['name'].lower()} - (\d+) золота", comment["text"].lower()) | |
if regex: | |
matched_good_object = good | |
matched_good_regex = regex | |
break | |
if matched_good_object != None and matched_good_regex != None: | |
matched_good_text = matched_good_regex[0] | |
matched_good_amount = matched_good_regex[1] or "1* " | |
matched_good_price = matched_good_regex[2] | |
amount = int(matched_good_amount[:-2]) | |
price = int(matched_good_price) | |
if matched_good_object["upper_price_bound"] * amount >= price: | |
BUY_QUEUE.append( | |
Call( | |
API_QUEUE.append, | |
Call( | |
vk.messages.send, | |
peer_id=-182985865, | |
random_id=getrandbits(64), | |
message=matched_good_text | |
) | |
) | |
) | |
sleep(0.1) | |
except KeyboardInterrupt: | |
exit(0) | |
if __name__ == "__main__": | |
main() | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
vk-api==11.7.0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment