Skip to content

Instantly share code, notes, and snippets.

@limitedeternity
Last active March 19, 2020 15:45
Show Gist options
  • Save limitedeternity/8b26c6c9d78c4b1447cd4cf4b3b74e95 to your computer and use it in GitHub Desktop.
Save limitedeternity/8b26c6c9d78c4b1447cd4cf4b3b74e95 to your computer and use it in GitHub Desktop.
WellDungeon Auto-Buy
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from collections import deque
from copy import deepcopy
from datetime import datetime, timedelta
from random import getrandbits
from re import search
from signal import signal, SIGINT, SIGTERM
from sys import exit
from time import sleep
from threading import Thread
import vk_api
class Call:
def __init__(self, fn, *args, **kwargs):
self.fn = fn
self.args = args
self.kwargs = kwargs
def __call__(self, *ignored_args, **ignored_kwargs):
return self.fn(*self.args, **self.kwargs)
class TimedExecutionQueue:
def __init__(self, times, interval):
self.queue = deque()
self.times = times
self.interval = interval
self.teq_state = [None] * 1000
self.teq_state_id = 0
t = Thread(target=self._monitor)
t.daemon = True
t.start()
def append(self, fn, assign_state_id=False):
if assign_state_id:
if self.teq_state_id == 999:
self.teq_state_id = 0
self.queue.append([fn, self.teq_state_id])
self.teq_state_id += 1
return self.teq_state_id - 1
else:
self.queue.append([fn, None])
def awt(self, state_id):
while self.teq_state[state_id] == None:
sleep(0.1)
returned_value = deepcopy(self.teq_state[state_id])
self.teq_state[state_id] = None
return returned_value
def _monitor(self):
function_calls = 0
last_finished_period = datetime.now() - timedelta(seconds=self.interval)
while True:
if function_calls > 0 and function_calls < self.times and (datetime.now() - last_finished_period).total_seconds() >= self.interval:
function_calls = 0
for [fn, state_id] in list(self.queue):
returned_value = fn()
if state_id != None:
self.teq_state[state_id] = returned_value
self.queue.popleft()
function_calls += 1
if function_calls == self.times:
sleep(self.interval)
function_calls = 0
sleep(0.1)
if function_calls > 0 and function_calls < self.times:
last_finished_period = datetime.now()
def auth_handler():
key = input("Enter authentication code: ").strip()
remember_device = True
return key, remember_device
def main():
login, password = "логин", "пароль"
vk_session = vk_api.VkApi(
login,
password,
app_id=2685278,
scope=339968,
auth_handler=auth_handler
)
try:
vk_session.auth()
except vk_api.AuthError as error_msg:
print(error_msg)
return
vk = vk_session.get_api()
API_QUEUE = TimedExecutionQueue(times=3, interval=1)
BUY_QUEUE = TimedExecutionQueue(times=1, interval=12)
COMMENTS_SEEN = []
GOODS_NEEDED = [{"name": "Пещерный корень", "upper_price_bound": 100}]
signal(SIGINT, Call(exit, 0))
signal(SIGTERM, Call(exit, 0))
while True:
try:
if len(COMMENTS_SEEN) > 100:
COMMENTS_SEEN = COMMENTS_SEEN[20:]
comments = API_QUEUE.awt(API_QUEUE.append(Call(vk.board.getComments, group_id=182985865, topic_id=40112239, count=20, sort="desc"), assign_state_id=True))["items"][::-1]
comments_filtered = list(filter(lambda c: c not in COMMENTS_SEEN, comments))
COMMENTS_SEEN += comments_filtered
for comment in comments_filtered:
matched_good_object = None
matched_good_regex = None
for good in GOODS_NEEDED:
regex = search(rf"(\d+\* )?{good['name'].lower()} - (\d+) золота", comment["text"].lower())
if regex:
matched_good_object = good
matched_good_regex = regex
break
if matched_good_object != None and matched_good_regex != None:
matched_good_text = matched_good_regex[0]
matched_good_amount = matched_good_regex[1] or "1* "
matched_good_price = matched_good_regex[2]
amount = int(matched_good_amount[:-2])
price = int(matched_good_price)
if matched_good_object["upper_price_bound"] * amount >= price:
BUY_QUEUE.append(
Call(
API_QUEUE.append,
Call(
vk.messages.send,
peer_id=-182985865,
random_id=getrandbits(64),
message=matched_good_text
)
)
)
sleep(0.1)
except KeyboardInterrupt:
exit(0)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment