#!/usr/bin/env python3 import asyncio from asyncio import Semaphore from asyncio.subprocess import Process, PIPE import os from typing import List, Tuple _N = 10 async def print_errors(repo: str, cmd: str, proc: Process): if proc.returncode != 0: print("[ERROR]", repo, cmd) async for line in proc.stderr: print(" " * 4 + line.decode("utf-8")) async def run(repo: str, cmd: List[str]) -> Process: return await asyncio.create_subprocess_exec( *cmd, cwd=repo, stdout=PIPE, stderr=PIPE ) async def update(repo: str) -> Tuple[Process, Process]: fetch = await run(repo, ["git", "fetch", "-pa"]) await fetch.wait() pull = await run(repo, ["git", "pull", "--rebase"]) await pull.wait() return (fetch, pull) async def update_with_semaphore(repo: str, sem: Semaphore, fuel: int = 3): async with sem: error = None for _ in range(fuel): (fetch, pull) = await update(repo) if fetch.returncode == 0 and pull.returncode == 0: print(f"{repo}: OK!") break else: error = (fetch, pull) else: if error: await print_errors(repo, "fetch", error[0]) await print_errors(repo, "pull", error[1]) async def main(n: int): sem = Semaphore(n) repos = ( repo for repo in os.listdir(".") if os.path.isdir(repo) and ".git" in os.listdir(repo) ) tasks = [update_with_semaphore(repo, sem) for repo in repos] print(f"Updating {len(tasks)} repos...") await asyncio.gather(*tasks) if __name__ == "__main__": loop = asyncio.get_event_loop() try: N = int(os.getenv("max_jobs", _N)) except ValueError: N = _N try: loop.run_until_complete(main(N)) finally: loop.run_until_complete(loop.shutdown_asyncgens()) loop.close()