Last active
October 27, 2018 08:43
-
-
Save JrooTJunior/7f21527238bf06570f5978b8eb671065 to your computer and use it in GitHub Desktop.
Stickers download bot. License: MIT
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Requirements: | |
aiogram>=2.0.rc1 | |
Pillow | |
emoji | |
""" | |
import asyncio | |
import io | |
import logging | |
import os | |
import zipfile | |
from PIL import Image | |
from aiogram import Bot, Dispatcher, executor, md, types | |
from aiogram.contrib.middlewares.logging import LoggingMiddleware | |
from aiogram.types import ChatActions, ParseMode | |
from aiogram.utils.emoji import demojize | |
from aiogram.utils.exceptions import BotBlocked, InvalidStickersSet | |
TOKEN = os.environ['TOKEN'] | |
TIMEOUT = 15 | |
logging.basicConfig(level=logging.INFO) | |
log = logging.getLogger('bot') | |
loop = asyncio.get_event_loop() | |
bot = Bot(token=TOKEN, loop=loop, parse_mode=ParseMode.HTML) | |
dp = Dispatcher(bot) | |
dp.middleware.setup(LoggingMiddleware(log)) | |
ACTIVE = 'active_downloads' | |
FAILED = 'failed_downloads' | |
COMPLETED = 'success_downloads' | |
UPLOADING = 'uploading' | |
bot[ACTIVE] = 0 | |
bot[FAILED] = 0 | |
bot[COMPLETED] = 0 | |
bot[UPLOADING] = 0 | |
def webp_to_png(image: io.BytesIO) -> io.BytesIO: | |
sticker_png = io.BytesIO() | |
im = Image.open(image).convert('RGBA') | |
im.save(sticker_png, "png") | |
sticker_png.seek(0) | |
return sticker_png | |
async def get_sticker_as_png(sticker) -> io.BytesIO: | |
sticker_webp = io.BytesIO() | |
await sticker.download(sticker_webp, chunk_size=2048) | |
await asyncio.sleep(0) | |
return webp_to_png(sticker_webp) | |
@dp.message_handler(types.ChatType.is_private, content_types=types.ContentType.STICKER) | |
async def sticker_handler(message: types.Message): | |
try: | |
stickers_set = await bot.get_sticker_set(message.sticker.set_name) | |
except InvalidStickersSet: | |
return await message.reply('Invalid stickers set!') | |
await message.reply('Downloading stickers set... \nπ It can take some time.') | |
archive_zip = io.BytesIO() | |
try: | |
bot[ACTIVE] += 1 | |
with zipfile.ZipFile(archive_zip, "a", zipfile.ZIP_DEFLATED, False) as archive: | |
for index, sticker in enumerate(stickers_set.stickers): | |
sticker_name = demojize(sticker.emoji).replace('::', '_').replace(':', '') | |
filename = f"{index:03}_{sticker_name}" | |
log.info(f"Download sticker '{sticker.file_id}' from '{stickers_set.name}' as '{filename}'") | |
sticker_png = await get_sticker_as_png(sticker) | |
archive.writestr(f"{filename}.png", sticker_png.read()) | |
del sticker_png | |
except: | |
await message.reply('Something went wrong! π') | |
log.exception('Failed to download stickers') | |
bot[FAILED] += 1 | |
else: | |
log.info(f"Start uploading '{stickers_set.name}' for {message.from_user.id}") | |
await ChatActions.upload_document() | |
archive_zip.seek(0) | |
bot[UPLOADING] += 1 | |
try: | |
await message.reply_document(types.InputFile(archive_zip, filename=f'{stickers_set.name}.zip'), | |
'π ' + | |
md.hbold('Stickers set:') + ' ' + md.hcode(stickers_set.title) + '\n' + | |
md.hbold('Stickers in set:') + ' ' + md.hcode(len(stickers_set.stickers))) | |
except BotBlocked: | |
bot[FAILED] += 1 | |
return log.warning(f"Can't upload file for {message.from_user.id} because the bot was blocked by the user.") | |
else: | |
bot[COMPLETED] += 1 | |
finally: | |
bot[UPLOADING] -= 1 | |
finally: | |
bot[ACTIVE] -= 1 | |
if bot[ACTIVE]: | |
log.info(f"Sticker set '{stickers_set.name}' completed. Active downloads count: {bot[ACTIVE]}") | |
else: | |
log.info(f'All downloads finished!') | |
del archive_zip | |
@dp.message_handler(commands=['status'], commands_prefix='!') | |
async def cmd_status(message: types.Message): | |
await message.reply(f'Active sticker sets: {bot[ACTIVE]}\n' | |
f'Active uploads: {bot[UPLOADING]}\n' | |
f'Completed: {bot[COMPLETED]}\n' | |
f'Failed: {bot[FAILED]}') | |
def collect_counters(): | |
return bot[ACTIVE], bot[UPLOADING], bot[COMPLETED], bot[FAILED] | |
async def monitoring(timeout): | |
active = collect_counters() | |
while loop.is_running(): | |
current_active = collect_counters() | |
if current_active != active: | |
active = current_active | |
log.info(f'Active sticker sets: {bot[ACTIVE]}. ' | |
f'Active uploads: {bot[UPLOADING]}. ' | |
f'Completed: {bot[COMPLETED]}. ' | |
f'Failed: {bot[FAILED]}.') | |
await asyncio.sleep(timeout) | |
if __name__ == '__main__': | |
loop.create_task(monitoring(TIMEOUT)) | |
executor.start_polling(dp, skip_updates=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment