Last active
June 22, 2026 11:30
-
-
Save ivarref/3fb81f3171faa3457352547309d52e36 to your computer and use it in GitHub Desktop.
progress.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import contextlib | |
| import os | |
| import sys | |
| import time | |
| from datetime import datetime | |
| from functools import partial | |
| from typing import Any, Callable, Generator | |
| # Based on: | |
| # https://mdk.fr/blog/how-apt-does-its-fancy-progress-bar.html | |
| def _current_milli_time() -> int: | |
| return time.time_ns() // 1_000_000 | |
| def _write(s: str) -> None: | |
| print(s, end="") # noqa T201 | |
| def _no_op(_cnt: int) -> None: | |
| pass | |
| def curr_time_str() -> str: | |
| current_time = datetime.now() | |
| formatted_time = current_time.strftime("%H:%M:%S") | |
| return formatted_time | |
| def _ms_to_eta(milliseconds: float) -> str: | |
| seconds = milliseconds // 1000 | |
| (days, seconds) = divmod(seconds, int(24 * 3600)) | |
| (hours, seconds) = divmod(seconds, 3600) | |
| (minutes, seconds) = divmod(seconds, 60) | |
| if days > 0: | |
| return ( | |
| f"{int(days):} days, {hours:02.0f}:{minutes:02.0f}:{seconds:02.0f}" | |
| ) | |
| elif hours > 0: | |
| return f"{hours:02.0f}:{minutes:02.0f}:{seconds:02.0f}" | |
| else: | |
| return f"{minutes:02.0f}:{seconds:02.0f}" | |
| def _reserve_margin(lines: int) -> None: | |
| _write("\n") # Ensure the last line is available. | |
| _write("\0337") # Save cursor position | |
| _write(f"\033[0;{lines - 1}r") # Reserve the bottom line | |
| _write("\0338") # Restore the cursor position | |
| _write("\033[1A") # Move up one line | |
| sys.stdout.flush() | |
| def _release_margin(lines: int) -> None: | |
| _write("\0337") # Save cursor position | |
| _write(f"\033[0;{lines}r") # Drop margin reservation | |
| # _write(f"\033[{lines};0f") # Move the cursor to the bottom line | |
| # _write("\033[0K") # Clear entire line | |
| _write("\0338") # Restore cursor position | |
| def _final_msg_drop_margin(lines: int, msg: str) -> None: | |
| _write(msg) | |
| _write("\n") | |
| _write("\0337") # Save cursor position | |
| _write(f"\033[0;{lines}r") # Drop margin reservation | |
| _write(f"\033[{lines};0f") # Move the cursor to the bottom line | |
| _write("\033[0K") # Clear entire line | |
| _write("\0338") # Restore cursor position | |
| sys.stdout.flush() | |
| def _show_progress(lines: int, msg: str) -> None: | |
| _write("\0337") # Save cursor position | |
| _write(f"\033[{lines};0f") # Move cursor to the bottom margin | |
| _write(msg) | |
| _write("\033[K") # Clear to end of line | |
| _write("\0338") # Restore cursor position | |
| sys.stdout.flush() | |
| def _maybe_tick( | |
| msg: str, | |
| total_count: int, | |
| start_ms: int, | |
| state: list[int], | |
| cnt: int, | |
| ) -> None: | |
| now_ms = _current_milli_time() | |
| diff_ms = now_ms - state[0] | |
| state[2] += cnt | |
| lines = state[3] | |
| if state[1]: | |
| spent_ms_so_far = now_ms - start_ms | |
| processed_count = state[2] | |
| speed = int(processed_count / max(1, spent_ms_so_far // 1000)) | |
| m = ( | |
| f"{curr_time_str()} {msg}. {total_count:_} items. " | |
| + f"{speed:_} /s. Done in {_ms_to_eta(spent_ms_so_far)}." | |
| ) | |
| _final_msg_drop_margin(lines, m) | |
| elif diff_ms >= 1000: | |
| _, lines2 = os.get_terminal_size() | |
| if lines2 != state[3]: | |
| _release_margin(lines) | |
| state[3] = lines | |
| lines = lines2 | |
| _reserve_margin(lines) | |
| state[0] = now_ms | |
| spent_ms_so_far = now_ms - start_ms | |
| processed_count = state[2] | |
| speed = int(processed_count / max(1, spent_ms_so_far // 1000)) | |
| ms_per_row = spent_ms_so_far / processed_count | |
| remaining_ms = ms_per_row * (total_count - processed_count) | |
| percent = (100.0 * processed_count) / total_count | |
| m = ( | |
| f"{msg}. " | |
| + f"{speed:_} /s. " | |
| + f"{_ms_to_eta(spent_ms_so_far)} elapsed. " | |
| + f"{percent:.1f}%. " | |
| + f"ETA: {_ms_to_eta(remaining_ms)}" | |
| ) | |
| _show_progress(lines, m) | |
| @contextlib.contextmanager | |
| def progress_total( | |
| msg: str, total_count: int, enabled: bool | |
| ) -> Generator[partial[None] | Callable[..., None], Any, None]: | |
| assert isinstance(enabled, bool) | |
| assert isinstance(msg, str) | |
| if not enabled: | |
| yield _no_op | |
| else: | |
| assert isinstance(total_count, int) | |
| assert total_count >= 1 | |
| columns, lines = os.get_terminal_size() | |
| state = [0, False, 0, lines] | |
| start_ms = _current_milli_time() | |
| _reserve_margin(lines) | |
| f = partial(_maybe_tick, msg, total_count, start_ms, state) | |
| try: | |
| print( # noqa T201 | |
| f"{curr_time_str()} " | |
| + f'Start "{msg}" for {total_count:_} items ...', | |
| flush=True, | |
| ) | |
| yield f | |
| finally: | |
| state[1] = True | |
| f(0) | |
| processed = state[2] | |
| if processed != total_count: | |
| print( # noqa T201 | |
| f'{curr_time_str()} Done "{msg}" for {processed:_} ' | |
| + f"(expected {total_count:_}) items.", | |
| flush=True, | |
| ) | |
| else: | |
| print( # noqa T201 | |
| f"{curr_time_str()} " | |
| + f'Done "{msg}" for {total_count:_} items.', | |
| flush=True, | |
| ) | |
| @contextlib.contextmanager | |
| def measure_speed( | |
| msg: str, total_count: int, enabled: bool | |
| ) -> Generator[partial[None] | Callable[..., None], Any, None]: | |
| assert isinstance(enabled, bool) | |
| assert isinstance(msg, str) | |
| if not enabled: | |
| yield _no_op | |
| else: | |
| assert isinstance(total_count, int) | |
| assert total_count >= 1 | |
| start_ms = _current_milli_time() | |
| try: | |
| print( # noqa T201 | |
| f"{curr_time_str()} " | |
| + f'Start "{msg}" for {total_count:_} items ...', | |
| flush=True, | |
| ) | |
| yield _no_op | |
| finally: | |
| spent_ms = max(1, _current_milli_time() - start_ms) | |
| speed = int(total_count / max(1, spent_ms // 1000)) | |
| m = ( | |
| f'{curr_time_str()} Done "{msg}" for {total_count:_} items. ' | |
| + f"{speed:_} /s. " | |
| + f"Done in {_ms_to_eta(spent_ms)}. " | |
| ) | |
| print(m, flush=True) # noqa T201 | |
| def _maybe_tick_no_count( | |
| msg: str, | |
| columns: int, | |
| lines: int, | |
| start_ms: int, | |
| state: list[int], | |
| cnt: int, | |
| ) -> None: | |
| now_ms = _current_milli_time() | |
| diff_ms = now_ms - state[0] | |
| state[2] += cnt | |
| if state[1]: | |
| spent_ms_so_far = now_ms - start_ms | |
| processed_count = state[2] | |
| speed = int(processed_count / max(1, spent_ms_so_far // 1000)) | |
| m = ( | |
| f"{curr_time_str()} {msg}. " | |
| + f"{speed:_} /s. Done in {_ms_to_eta(spent_ms_so_far)}." | |
| ) | |
| _final_msg_drop_margin(lines, m) | |
| elif diff_ms >= 1000: | |
| spent_ms_so_far = now_ms - start_ms | |
| state[0] = now_ms | |
| processed_count = state[2] | |
| speed = int(processed_count / max(1, spent_ms_so_far // 1000)) | |
| m = f"{msg}. " + f"{speed:_} /s. {_ms_to_eta(spent_ms_so_far)} elapsed." | |
| _show_progress(lines, m) | |
| @contextlib.contextmanager | |
| def progress( | |
| msg: str, | |
| enabled: bool, | |
| ) -> Generator[Callable[..., None] | partial[None], Any, None]: | |
| if not enabled: | |
| yield _no_op | |
| else: | |
| columns, lines = os.get_terminal_size() | |
| state = [0, False, 0] | |
| start_ms = _current_milli_time() | |
| _reserve_margin(lines) | |
| f = partial(_maybe_tick_no_count, msg, columns, lines, start_ms, state) | |
| try: | |
| yield f | |
| finally: | |
| state[1] = True | |
| f(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment