Skip to content

Instantly share code, notes, and snippets.

@ripmeep
Last active November 6, 2021 20:26
Show Gist options
  • Save ripmeep/4679a7da5d6bd2a88d0b104d33039f21 to your computer and use it in GitHub Desktop.
Save ripmeep/4679a7da5d6bd2a88d0b104d33039f21 to your computer and use it in GitHub Desktop.
Python 3 Async/Threaded loading animation generator
#
# Author: ripmeep
# Instagram: @rip.meep
# GitHub: https://github.com/ripmeep
#
import threading
import asyncio
import sys
import time
class Loader(object):
def __init__(self):
self.format: str = "[{SYMBOL}] {STATUS}"
self.is_loading: bool = False
self.delay: int or float = 0.2
self.symbols: list[str] = ['/', '-', '\\', '|']
self.finish_symbol = '✓'
self.fail_symbol = 'x'
self.current_status = None
self.current_status_output = None
@staticmethod
def FormatStatus(format: str, symbol: str, status: str, padding: int=0, padding_char: str=' '):
return format.replace('{SYMBOL}', symbol).replace('{STATUS}', status).ljust(padding, padding_char)
@staticmethod
def HideCursor():
sys.stdout.write('\x1b[?25l')
@staticmethod
def ShowCursor():
sys.stdout.write('\x1b[?25h')
def __show_loader(self):
while self.is_loading:
for s in self.symbols:
self.current_status_output = '\r' + Loader.FormatStatus(self.format, s, self.current_status)
sys.stdout.write(self.current_status_output)
sys.stdout.flush()
time.sleep(self.delay)
def start_loading(self, status: str, hide_cursor=True):
self.is_loading = True
self.current_status = status
self.current_status_output = '\r' + Loader.FormatStatus(self.format, self.symbols[0], self.current_status)
if hide_cursor:
Loader.HideCursor()
threading.Thread(target=self.__show_loader).start()
def update_status(self, new_status: str):
self.current_status = new_status
def update_progress(self, message: str):
sys.stdout.write('\r' + message.ljust(len(self.current_status_output), ' ') + '\n')
sys.stdout.flush()
def stop_loading(self, failed: bool=False, status: str=None):
self.is_loading = False
time.sleep(self.delay * len(self.symbols))
sys.stdout.write('\r' + Loader.FormatStatus(self.format, (self.finish_symbol if not failed else self.fail_symbol), (status if status is not None else self.current_status), padding=len(self.current_status_output)) + '\n')
Loader.ShowCursor()
sys.stdout.flush()
class LoaderAsync(Loader):
def __init__(self):
super().__init__()
async def start_loading_async(self, status: str, hide_cursor=True):
self.is_loading = True
self.current_status = status
if hide_cursor:
Loader.HideCursor()
while self.is_loading:
for s in self.symbols:
self.current_status_output = '\r' + Loader.FormatStatus(self.format, s, self.current_status)
sys.stdout.write(self.current_status_output)
sys.stdout.flush()
await asyncio.sleep(self.delay)
async def update_status_async(self, new_status: str):
super().update_status(new_status)
await asyncio.sleep(self.delay)
async def update_progress_async(self, message: str):
super().update_progress(message)
await asyncio.sleep(self.delay)
async def stop_loading_async(self, failed: bool=False, status: str=None):
self.is_loading = False
sys.stdout.write('\r' + Loader.FormatStatus(self.format, (self.finish_symbol if not failed else self.fail_symbol), (status if status is not None else self.current_status), padding=len(self.current_status_output)) + '\n')
Loader.ShowCursor()
sys.stdout.flush()
await asyncio.sleep(self.delay)
def start_loading(self, status: str):
asyncio.create_task(self.start_loading_async(status))
def update_status(self, new_status: str):
asyncio.create_task(self.update_status_async(new_status))
def update_progress(self, message: str):
asyncio.create_task(self.update_progress_async(message))
def stop_loading(self, failed: bool=False, status: str=None):
asyncio.create_task(self.stop_loading_async(failed=failed, status=status))
def abort(self, status: str=None):
asyncio.run(self.stop_loading_async(failed=True, status=status))
import loader
import time
# CREATE THREADED LOADER
thread_loader = loader.Loader()
thread_loader.delay = 0.15
thread_loader.format = "{SYMBOL} {STATUS} {SYMBOL}"
# EXAMPLE FUNCTIONS
def example_one(n):
for i in range(n):
thread_loader.update_progress("Doing some important things in background...")
time.sleep(1.5)
def example_two():
nums = [[200, 100], [150, 120], [600, 305], [300, 450]]
for n in nums:
thread_loader.update_progress(f"{n[0]} + {n[1]} = {n[0] + n[1]}")
time.sleep(1.5)
try:
thread_loader.start_loading("Running example one...")
example_one(5)
thread_loader.update_progress("Finished running example one!")
thread_loader.update_status("Running example two...")
example_two()
thread_loader.update_progress("Finished running example two!")
thread_loader.stop_loading(status="Completed the examples")
except KeyboardInterrupt:
thread_loader.stop_loading(failed=True, status="User requested shutdown (CTRL+C)")
import loader
import asyncio
# CREATE ASYNC LOADER
async_loader = loader.LoaderAsync()
async_loader.delay = 0.15
async_loader.format = "{SYMBOL} {STATUS} {SYMBOL}"
# EXAMPLE FUNCTIONS ASYNC
async def example_one(n):
for i in range(n):
async_loader.update_progress(f"Doing stuff asynchronously #{i+1}...")
await asyncio.sleep(1.5)
async def example_two():
nums = [[200, 100], [150, 120], [600, 305], [300, 450]]
for n in nums:
async_loader.update_progress(f"{n[0]} + {n[1]} = {n[0] + n[1]}")
await asyncio.sleep(1.5)
# MAIN ASYNC
async def main_async():
async_loader.start_loading("Running example one...")
await example_one(5)
async_loader.update_progress("Finished running example one!")
async_loader.update_status("Running example two...")
await example_two()
async_loader.update_progress("Finished running example two!")
async_loader.stop_loading(status='Completed the examples')
if __name__ == '__main__':
try:
asyncio.run(main_async())
except KeyboardInterrupt:
async_loader.abort(status="User requested shutdown (CTRL+C)")
import loader
import time
# CREATE THREADED LOADER
thread_loader = loader.Loader()
thread_loader.delay = 0.15
thread_loader.format = "{SYMBOL} {STATUS} {SYMBOL}"
thread_loader.finish_symbol = f"\x1b[01;32m{thread_loader.finish_symbol}\x1b[0m"
thread_loader.fail_symbol = f"\x1b[01;31m{thread_loader.fail_symbol}\x1b[0m"
symbols = thread_loader.symbols.copy()
color_strenghts = [50, 100, 150, 200, 255, 200, 150, 100, 50]
thread_loader.symbols.clear()
for cs in color_strenghts:
for s in symbols:
thread_loader.symbols.append("\x1b[38;2;{};{};0m{}\x1b[0m".format(cs, cs, s))
# EXAMPLE FUNCTIONS
def example_one(n):
for i in range(n):
thread_loader.update_progress("Doing some important things in background...")
time.sleep(1.5)
def example_two():
nums = [[200, 100], [150, 120], [600, 305], [300, 450]]
for n in nums:
thread_loader.update_progress(f"{n[0]} + {n[1]} = {n[0] + n[1]}")
time.sleep(1.5)
try:
thread_loader.start_loading("Running example one...")
example_one(5)
thread_loader.update_progress("Finished running example one!")
thread_loader.update_status("Running example two...")
example_two()
thread_loader.update_progress("Finished running example two!")
thread_loader.stop_loading(status="Completed the examples")
except KeyboardInterrupt:
thread_loader.stop_loading(failed=True, status="User requested shutdown (CTRL+C)")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment