Skip to content

Instantly share code, notes, and snippets.

@ivarref
Last active June 22, 2026 11:30
Show Gist options
  • Select an option

  • Save ivarref/3fb81f3171faa3457352547309d52e36 to your computer and use it in GitHub Desktop.

Select an option

Save ivarref/3fb81f3171faa3457352547309d52e36 to your computer and use it in GitHub Desktop.
progress.py
import contextlib
import os
import sys
import time
from datetime import datetime
from functools import partial
from typing import Any, Callable, Generator
# Based on:
# https://mdk.fr/blog/how-apt-does-its-fancy-progress-bar.html
def _current_milli_time() -> int:
return time.time_ns() // 1_000_000
def _write(s: str) -> None:
print(s, end="") # noqa T201
def _no_op(_cnt: int) -> None:
pass
def curr_time_str() -> str:
current_time = datetime.now()
formatted_time = current_time.strftime("%H:%M:%S")
return formatted_time
def _ms_to_eta(milliseconds: float) -> str:
seconds = milliseconds // 1000
(days, seconds) = divmod(seconds, int(24 * 3600))
(hours, seconds) = divmod(seconds, 3600)
(minutes, seconds) = divmod(seconds, 60)
if days > 0:
return (
f"{int(days):} days, {hours:02.0f}:{minutes:02.0f}:{seconds:02.0f}"
)
elif hours > 0:
return f"{hours:02.0f}:{minutes:02.0f}:{seconds:02.0f}"
else:
return f"{minutes:02.0f}:{seconds:02.0f}"
def _reserve_margin(lines: int) -> None:
_write("\n") # Ensure the last line is available.
_write("\0337") # Save cursor position
_write(f"\033[0;{lines - 1}r") # Reserve the bottom line
_write("\0338") # Restore the cursor position
_write("\033[1A") # Move up one line
sys.stdout.flush()
def _release_margin(lines: int) -> None:
_write("\0337") # Save cursor position
_write(f"\033[0;{lines}r") # Drop margin reservation
# _write(f"\033[{lines};0f") # Move the cursor to the bottom line
# _write("\033[0K") # Clear entire line
_write("\0338") # Restore cursor position
def _final_msg_drop_margin(lines: int, msg: str) -> None:
_write(msg)
_write("\n")
_write("\0337") # Save cursor position
_write(f"\033[0;{lines}r") # Drop margin reservation
_write(f"\033[{lines};0f") # Move the cursor to the bottom line
_write("\033[0K") # Clear entire line
_write("\0338") # Restore cursor position
sys.stdout.flush()
def _show_progress(lines: int, msg: str) -> None:
_write("\0337") # Save cursor position
_write(f"\033[{lines};0f") # Move cursor to the bottom margin
_write(msg)
_write("\033[K") # Clear to end of line
_write("\0338") # Restore cursor position
sys.stdout.flush()
def _maybe_tick(
msg: str,
total_count: int,
start_ms: int,
state: list[int],
cnt: int,
) -> None:
now_ms = _current_milli_time()
diff_ms = now_ms - state[0]
state[2] += cnt
lines = state[3]
if state[1]:
spent_ms_so_far = now_ms - start_ms
processed_count = state[2]
speed = int(processed_count / max(1, spent_ms_so_far // 1000))
m = (
f"{curr_time_str()} {msg}. {total_count:_} items. "
+ f"{speed:_} /s. Done in {_ms_to_eta(spent_ms_so_far)}."
)
_final_msg_drop_margin(lines, m)
elif diff_ms >= 1000:
_, lines2 = os.get_terminal_size()
if lines2 != state[3]:
_release_margin(lines)
state[3] = lines
lines = lines2
_reserve_margin(lines)
state[0] = now_ms
spent_ms_so_far = now_ms - start_ms
processed_count = state[2]
speed = int(processed_count / max(1, spent_ms_so_far // 1000))
ms_per_row = spent_ms_so_far / processed_count
remaining_ms = ms_per_row * (total_count - processed_count)
percent = (100.0 * processed_count) / total_count
m = (
f"{msg}. "
+ f"{speed:_} /s. "
+ f"{_ms_to_eta(spent_ms_so_far)} elapsed. "
+ f"{percent:.1f}%. "
+ f"ETA: {_ms_to_eta(remaining_ms)}"
)
_show_progress(lines, m)
@contextlib.contextmanager
def progress_total(
msg: str, total_count: int, enabled: bool
) -> Generator[partial[None] | Callable[..., None], Any, None]:
assert isinstance(enabled, bool)
assert isinstance(msg, str)
if not enabled:
yield _no_op
else:
assert isinstance(total_count, int)
assert total_count >= 1
columns, lines = os.get_terminal_size()
state = [0, False, 0, lines]
start_ms = _current_milli_time()
_reserve_margin(lines)
f = partial(_maybe_tick, msg, total_count, start_ms, state)
try:
print( # noqa T201
f"{curr_time_str()} "
+ f'Start "{msg}" for {total_count:_} items ...',
flush=True,
)
yield f
finally:
state[1] = True
f(0)
processed = state[2]
if processed != total_count:
print( # noqa T201
f'{curr_time_str()} Done "{msg}" for {processed:_} '
+ f"(expected {total_count:_}) items.",
flush=True,
)
else:
print( # noqa T201
f"{curr_time_str()} "
+ f'Done "{msg}" for {total_count:_} items.',
flush=True,
)
@contextlib.contextmanager
def measure_speed(
msg: str, total_count: int, enabled: bool
) -> Generator[partial[None] | Callable[..., None], Any, None]:
assert isinstance(enabled, bool)
assert isinstance(msg, str)
if not enabled:
yield _no_op
else:
assert isinstance(total_count, int)
assert total_count >= 1
start_ms = _current_milli_time()
try:
print( # noqa T201
f"{curr_time_str()} "
+ f'Start "{msg}" for {total_count:_} items ...',
flush=True,
)
yield _no_op
finally:
spent_ms = max(1, _current_milli_time() - start_ms)
speed = int(total_count / max(1, spent_ms // 1000))
m = (
f'{curr_time_str()} Done "{msg}" for {total_count:_} items. '
+ f"{speed:_} /s. "
+ f"Done in {_ms_to_eta(spent_ms)}. "
)
print(m, flush=True) # noqa T201
def _maybe_tick_no_count(
msg: str,
columns: int,
lines: int,
start_ms: int,
state: list[int],
cnt: int,
) -> None:
now_ms = _current_milli_time()
diff_ms = now_ms - state[0]
state[2] += cnt
if state[1]:
spent_ms_so_far = now_ms - start_ms
processed_count = state[2]
speed = int(processed_count / max(1, spent_ms_so_far // 1000))
m = (
f"{curr_time_str()} {msg}. "
+ f"{speed:_} /s. Done in {_ms_to_eta(spent_ms_so_far)}."
)
_final_msg_drop_margin(lines, m)
elif diff_ms >= 1000:
spent_ms_so_far = now_ms - start_ms
state[0] = now_ms
processed_count = state[2]
speed = int(processed_count / max(1, spent_ms_so_far // 1000))
m = f"{msg}. " + f"{speed:_} /s. {_ms_to_eta(spent_ms_so_far)} elapsed."
_show_progress(lines, m)
@contextlib.contextmanager
def progress(
msg: str,
enabled: bool,
) -> Generator[Callable[..., None] | partial[None], Any, None]:
if not enabled:
yield _no_op
else:
columns, lines = os.get_terminal_size()
state = [0, False, 0]
start_ms = _current_milli_time()
_reserve_margin(lines)
f = partial(_maybe_tick_no_count, msg, columns, lines, start_ms, state)
try:
yield f
finally:
state[1] = True
f(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment