Skip to content

Instantly share code, notes, and snippets.

@thalesmg
Last active March 16, 2019 17:04
Show Gist options
  • Save thalesmg/1c97392b1ff4b1127f5b61a2b69a554e to your computer and use it in GitHub Desktop.
Save thalesmg/1c97392b1ff4b1127f5b61a2b69a554e to your computer and use it in GitHub Desktop.
Update all git repositories in a folder in parallel using Python asyncio

Running

$ ./atualizar.py
& env max_jobs=10 ./atualizar.py

Quick & dirty benchmarks (4 cores, 1 run)

$ ./bench.py
1 job: 197.70166158676147 s
2 jobs: 96.07654476165771 s
3 jobs: 61.44223690032959 s
4 jobs: 45.68111753463745 s
5 jobs: 37.100176095962524 s
6 jobs: 34.44160318374634 s
7 jobs: 25.20595669746399 s
8 jobs: 25.38742423057556 s
9 jobs: 24.885375499725342 s
10 jobs: 26.666881322860718 s
#!/usr/bin/env python3
import asyncio
from asyncio import Semaphore
from asyncio.subprocess import Process, PIPE
import os
from typing import List, Tuple
_N = 10
async def print_errors(repo: str, cmd: str, proc: Process):
if proc.returncode != 0:
print("[ERROR]", repo, cmd)
async for line in proc.stderr:
print(" " * 4 + line.decode("utf-8"))
async def run(repo: str, cmd: List[str]) -> Process:
return await asyncio.create_subprocess_exec(
*cmd, cwd=repo, stdout=PIPE, stderr=PIPE
)
async def update(repo: str) -> Tuple[Process, Process]:
fetch = await run(repo, ["git", "fetch", "-pa"])
await fetch.wait()
pull = await run(repo, ["git", "pull", "--rebase"])
await pull.wait()
return (fetch, pull)
async def update_with_semaphore(repo: str, sem: Semaphore, fuel: int = 3):
async with sem:
error = None
for _ in range(fuel):
(fetch, pull) = await update(repo)
if fetch.returncode == 0 and pull.returncode == 0:
print(f"{repo}: OK!")
break
else:
error = (fetch, pull)
else:
if error:
await print_errors(repo, "fetch", error[0])
await print_errors(repo, "pull", error[1])
async def main(n: int):
sem = Semaphore(n)
repos = (
repo
for repo in os.listdir(".")
if os.path.isdir(repo) and ".git" in os.listdir(repo)
)
tasks = [update_with_semaphore(repo, sem) for repo in repos]
print(f"Updating {len(tasks)} repos...")
await asyncio.gather(*tasks)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
try:
N = int(os.getenv("max_jobs", _N))
except ValueError:
N = _N
try:
loop.run_until_complete(main(N))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment