Last active
March 3, 2022 05:00
-
-
Save henryiii/ddeda112ceeb3f61b0e7e332a2b34662 to your computer and use it in GitHub Desktop.
Example of a package builder (pyodide)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from graphlib import TopologicalSorter | |
import asyncio | |
import dataclasses | |
import yaml | |
from pathlib import Path | |
import functools | |
from typing import NoReturn | |
@functools.total_ordering | |
@dataclasses.dataclass(kw_only=True, frozen=True) | |
class Package: | |
name: str | |
deps: tuple[str, ...] = () | |
def __lt__(self, other) -> bool: | |
return len(self.deps) > len(other.deps) | |
def __eq__(self, other) -> bool: | |
return self.name == other.name | |
async def build(self) -> None: | |
print(f"Starting {self.name} ->", *self.deps) | |
proc = await asyncio.create_subprocess_shell(f"sleep {len(self.deps)}") | |
await proc.wait() | |
print(f"Finished {self.name}") | |
WorkQueue = asyncio.PriorityQueue[Package] | |
FinishedQueue = asyncio.Queue[Package] | |
Graph = dict[Package, set[Package]] | |
async def worker(tasks: WorkQueue, results: FinishedQueue) -> NoReturn: | |
while True: | |
package = await tasks.get() | |
await package.build() | |
results.put_nowait(package) | |
async def add_nodes( | |
graph: Graph, tasks: WorkQueue, finished_tasks: FinishedQueue | |
) -> None: | |
topological_sorter = TopologicalSorter(graph) | |
topological_sorter.prepare() | |
while topological_sorter.is_active(): | |
for package in topological_sorter.get_ready(): | |
tasks.put_nowait(package) | |
package = await finished_tasks.get() | |
topological_sorter.done(package) | |
async def build_packages() -> None: | |
metas_paths = list(Path("packages").glob("*/meta.yaml")) | |
packages = {"distutils": Package(name="distutils", deps=())} | |
for p in metas_paths: | |
with p.open() as f: | |
full = yaml.safe_load(f) | |
name = full["package"]["name"] | |
deps = tuple(full.get("requirements", {}).get("run", ())) | |
packages[name] = Package(name=name, deps=deps) | |
graph = {p: {packages[s] for s in p.deps} for p in packages.values()} | |
tasks = WorkQueue() | |
finished_tasks = FinishedQueue() | |
workers = [asyncio.create_task(worker(tasks, finished_tasks)) for _ in range(4)] | |
producer = asyncio.create_task(add_nodes(graph, tasks, finished_tasks)) | |
await asyncio.wait({producer, *workers}, return_when=asyncio.FIRST_COMPLETED) | |
if __name__ == "__main__": | |
asyncio.run(build_packages()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from graphlib import TopologicalSorter | |
import asyncio | |
from typing import NoReturn | |
WorkQueue = asyncio.Queue[str] | |
FinishedQueue = asyncio.Queue[str] | |
Graph = dict[str, set[str]] | |
GRAPH: Graph = { | |
"numpy": set(), | |
"matplotlib": {"numpy"}, | |
"pandas": {"numpy", "matplotlib"}, | |
"requests": set(), | |
"uncertainties": {"numpy"}, | |
"scipy": {"numpy"}, | |
} | |
async def build(package: str) -> None: | |
deps = GRAPH[package] | |
print(f"Starting {package} for {len(deps)+1} s ->", *deps) | |
proc = await asyncio.create_subprocess_shell(f"sleep {len(deps)+1}") | |
await proc.wait() | |
print(f"Finished {package}") | |
async def worker(tasks: WorkQueue, results: FinishedQueue) -> NoReturn: | |
while True: | |
package = await tasks.get() | |
await build(package) | |
results.put_nowait(package) | |
async def add_nodes(tasks: WorkQueue, finished_tasks: FinishedQueue) -> None: | |
topological_sorter = TopologicalSorter(GRAPH) | |
topological_sorter.prepare() | |
while topological_sorter.is_active(): | |
for package in topological_sorter.get_ready(): | |
tasks.put_nowait(package) | |
package = await finished_tasks.get() | |
topological_sorter.done(package) | |
async def main() -> None: | |
tasks = WorkQueue() | |
finished_tasks = FinishedQueue() | |
workers = [asyncio.create_task(worker(tasks, finished_tasks)) for _ in range(4)] | |
producer = asyncio.create_task(add_nodes(tasks, finished_tasks)) | |
await asyncio.wait({producer, *workers}, return_when=asyncio.FIRST_COMPLETED) | |
if __name__ == "__main__": | |
asyncio.run(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from graphlib import TopologicalSorter | |
import asyncio | |
import dataclasses | |
from typing import NoReturn | |
@dataclasses.dataclass(kw_only=True, order=True, frozen=True) | |
class Package: | |
priority: int | |
name: str | |
async def build(self) -> None: | |
print(f"Starting {self.name} for {self.priority} s") | |
proc = await asyncio.create_subprocess_shell(f"sleep {self.priority}") | |
await proc.wait() | |
print(f"Finished {self.name}") | |
WorkQueue = asyncio.PriorityQueue[Package] | |
FinishedQueue = asyncio.Queue[Package] | |
Graph = dict[Package, set[Package]] | |
NUMPY = Package(name="numpy", priority=5) | |
MATPLOTLIB = Package(name="matplotlib", priority=3) | |
PANDAS = Package(name="pandas", priority=4) | |
REQUESTS = Package(name="requests", priority=1) | |
UNCERTAINTIES = Package(name="uncertainties", priority=1) | |
SCIPY = Package(name="scipy", priority=7) | |
GRAPH: Graph = { | |
NUMPY: set(), | |
MATPLOTLIB: {NUMPY}, | |
PANDAS: {NUMPY, MATPLOTLIB}, | |
REQUESTS: set(), | |
UNCERTAINTIES: {NUMPY}, | |
SCIPY: {NUMPY}, | |
} | |
async def worker(tasks: WorkQueue, results: FinishedQueue) -> NoReturn: | |
while True: | |
package = await tasks.get() | |
await package.build() | |
results.put_nowait(package) | |
async def add_nodes(tasks: WorkQueue, finished_tasks: FinishedQueue) -> None: | |
topological_sorter = TopologicalSorter(GRAPH) | |
topological_sorter.prepare() | |
while topological_sorter.is_active(): | |
for package in topological_sorter.get_ready(): | |
tasks.put_nowait(package) | |
package = await finished_tasks.get() | |
topological_sorter.done(package) | |
async def main() -> None: | |
tasks = WorkQueue() | |
finished_tasks = FinishedQueue() | |
workers = [asyncio.create_task(worker(tasks, finished_tasks)) for _ in range(4)] | |
producer = asyncio.create_task(add_nodes(tasks, finished_tasks)) | |
await asyncio.wait({producer, *workers}, return_when=asyncio.FIRST_COMPLETED) | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment