Skip to content

Commit a7ee0f8

Browse files
authored
Merge pull request #796 from tiran/build-parallel-redesign
Redesign parallel build system
2 parents d64de03 + fe9600f commit a7ee0f8

File tree

2 files changed

+66
-99
lines changed

2 files changed

+66
-99
lines changed

e2e/test_build_parallel.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ if ! grep -q "cython-3.1.1: ready to build" "$log"; then
4444
echo "Did not find message indicating build of cython would start" 1>&2
4545
pass=false
4646
fi
47-
if ! grep -q "cython: requires exclusive build" "$log"; then
47+
if ! grep -q "cython-3.1.1: requires exclusive build" "$log"; then
4848
echo "Did not find message indicating build of cython would run on its own" 1>&2
4949
pass=false
5050
fi

src/fromager/commands/build.py

Lines changed: 65 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@ def dict_factory(x):
5858
}
5959

6060

61+
BuildSequenceEntryFuture: typing.TypeAlias = concurrent.futures.Future[
62+
BuildSequenceEntry
63+
]
64+
DependencyNodeSet: typing.TypeAlias = set[dependency_graph.DependencyNode]
65+
66+
6167
@click.command()
6268
@click.option(
6369
"--wheel-server-url",
@@ -550,6 +556,10 @@ def _build_parallel(
550556
)
551557

552558

559+
def _nodes_to_string(nodes: typing.Iterable[dependency_graph.DependencyNode]) -> str:
560+
return ", ".join(sorted(node.key for node in nodes))
561+
562+
553563
@click.command()
554564
@click.option(
555565
"-f",
@@ -607,123 +617,80 @@ def build_parallel(
607617

608618
# Load the dependency graph
609619
logger.info("reading dependency graph from %s", graph_file)
610-
graph: dependency_graph.DependencyGraph
611620
graph = dependency_graph.DependencyGraph.from_file(graph_file)
612-
613-
# Track what has been built
614-
built_node_keys: set[str] = set()
615-
616-
# Get all nodes that need to be built (excluding prebuilt ones and the root node)
617-
# Sort the nodes to build by their key one time to avoid
618-
# redoing the sort every iteration and to make the output deterministic.
619-
nodes_to_build: DependencyNodeList = sorted(
620-
(n for n in graph.nodes.values() if n.key != dependency_graph.ROOT),
621-
key=lambda n: n.key,
621+
logger.info("found %i packages to build", len(graph))
622+
623+
topo = graph.get_build_topology(context=wkctx)
624+
topo.prepare()
625+
logger.info(
626+
"build topology has %i exclusive nodes: %s",
627+
len(topo.exclusive_nodes),
628+
_nodes_to_string(topo.exclusive_nodes),
629+
)
630+
logger.info(
631+
"build topology has %i build requirements: %s",
632+
len(topo.dependency_nodes),
633+
_nodes_to_string(topo.dependency_nodes),
622634
)
623-
logger.info("found %d packages to build", len(nodes_to_build))
624635

625-
# A node can be built when all of its build dependencies are built
626-
entries: list[BuildSequenceEntry] = []
636+
future2node: dict[BuildSequenceEntryFuture, dependency_graph.DependencyNode] = {}
637+
built_entries: list[BuildSequenceEntry] = []
638+
rounds: int = 0
627639

628-
with progress.progress_context(total=len(nodes_to_build)) as progressbar:
640+
with (
641+
progress.progress_context(total=len(graph)) as progressbar,
642+
concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor,
643+
):
629644

630645
def update_progressbar_cb(future: concurrent.futures.Future) -> None:
631-
"""Immediately update the progress when when a task is done"""
646+
"""Immediately update the progress and mark node as done"""
632647
progressbar.update()
633648

634-
while nodes_to_build:
635-
# Find nodes that can be built (all build dependencies are built)
636-
buildable_nodes: DependencyNodeList = []
637-
for node in nodes_to_build:
638-
with req_ctxvar_context(
639-
Requirement(node.canonicalized_name), node.version
640-
):
641-
# Get all build dependencies (build-system, build-backend, build-sdist)
642-
build_deps: DependencyNodeList = [
643-
edge.destination_node
644-
for edge in node.children
645-
if edge.req_type.is_build_requirement
646-
]
647-
# A node can be built when all of its build dependencies are built
648-
unbuilt_deps: set[str] = set(
649-
dep.key for dep in build_deps if dep.key not in built_node_keys
650-
)
651-
if not unbuilt_deps:
652-
logger.info(
653-
"ready to build, have all build dependencies: %s",
654-
sorted(set(dep.key for dep in build_deps)),
655-
)
656-
buildable_nodes.append(node)
657-
else:
658-
logger.info(
659-
"waiting for build dependencies: %s",
660-
sorted(unbuilt_deps),
661-
)
662-
663-
if not buildable_nodes:
664-
# If we can't build anything but still have nodes, we have a cycle
665-
remaining: list[str] = [n.key for n in nodes_to_build]
666-
logger.info("have already built: %s", sorted(built_node_keys))
667-
raise ValueError(f"Circular dependency detected among: {remaining}")
668-
649+
while topo.is_active():
650+
rounds += 1
651+
nodes_to_build = topo.get_available()
669652
logger.info(
670-
"ready to build: %s",
671-
sorted(n.key for n in buildable_nodes),
653+
"round %i: starting to build %i node(s): %s",
654+
rounds,
655+
len(nodes_to_build),
656+
_nodes_to_string(nodes_to_build),
672657
)
658+
for node in nodes_to_build:
659+
with req_ctxvar_context(node.requirement, node.version):
660+
if node in topo.exclusive_nodes:
661+
logger.info("requires exclusive build")
662+
logger.info("ready to build")
673663

674-
# Check if any buildable node requires exclusive build (exclusive_build == True)
675-
exclusive_nodes: DependencyNodeList = [
676-
node
677-
for node in buildable_nodes
678-
if wkctx.settings.package_build_info(
679-
node.canonicalized_name
680-
).exclusive_build
681-
]
682-
if exclusive_nodes:
683-
# Only build the first exclusive node this round
684-
buildable_nodes = [exclusive_nodes[0]]
685-
logger.info(
686-
f"{exclusive_nodes[0].canonicalized_name}: requires exclusive build, running it alone this round."
687-
)
688-
689-
# Build up to max_workers nodes concurrently (or all if max_workers is None)
690-
with concurrent.futures.ThreadPoolExecutor(
691-
max_workers=max_workers
692-
) as executor:
693-
futures: list[concurrent.futures.Future[tuple[pathlib.Path, bool]]] = []
694-
reqs: list[Requirement] = []
695-
logger.info(
696-
"starting to build: %s", sorted(n.key for n in buildable_nodes)
664+
future = executor.submit(
665+
_build_parallel,
666+
wkctx=wkctx,
667+
resolved_version=node.version,
668+
req=node.requirement,
669+
source_download_url=node.download_url,
670+
force=force,
671+
cache_wheel_server_url=cache_wheel_server_url,
697672
)
698-
for node in buildable_nodes:
699-
req = Requirement(f"{node.canonicalized_name}=={node.version}")
700-
reqs.append(req)
701-
future = executor.submit(
702-
_build_parallel,
703-
wkctx=wkctx,
704-
resolved_version=node.version,
705-
req=req,
706-
source_download_url=node.download_url,
707-
force=force,
708-
cache_wheel_server_url=cache_wheel_server_url,
709-
)
710-
future.add_done_callback(update_progressbar_cb)
711-
futures.append(future)
673+
future.add_done_callback(update_progressbar_cb)
674+
future2node[future] = node
712675

713-
# Wait for all builds to complete
714-
for node, future in zip(buildable_nodes, futures, strict=True):
676+
# Wait for all builds to complete
677+
for future in concurrent.futures.as_completed(future2node):
678+
node = future2node.pop(future)
679+
with req_ctxvar_context(node.requirement, node.version):
715680
try:
716681
entry = future.result()
717-
entries.append(entry)
718-
built_node_keys.add(node.key)
719-
nodes_to_build.remove(node)
720-
# progress bar is updated in callback
721682
except Exception as e:
722-
logger.error(f"Failed to build {node.key}: {e}")
683+
logger.error("failed to build: %s", e)
723684
raise
685+
else:
686+
# success
687+
built_entries.append(entry)
688+
finally:
689+
# mark node as done, progress bar is updated in callback.
690+
topo.done(node)
724691

725692
metrics.summarize(wkctx, "Building in parallel")
726-
_summary(wkctx, entries)
693+
_summary(wkctx, built_entries)
727694

728695

729696
build_parallel._fromager_show_build_settings = True # type: ignore

0 commit comments

Comments
 (0)