@@ -36,8 +36,86 @@ def _iter_all_frames(self, stack_frames, skip_idle=False):
3636 yield frames , thread_info .thread_id
3737
3838 def _iter_async_frames (self , awaited_info_list ):
39- """Iterate over linear stacks for all leaf tasks (hot path optimized)."""
40- # Build adjacency graph (O(n))
39+ """
40+ Reconstruct linear stacks for async task trees with multiple parent support.
41+
42+ This implements a leaf-to-root BFS algorithm that handles task trees where
43+ a single task can have multiple parents (diamond dependency pattern).
44+
45+ Task Tree Structure:
46+ ===================
47+ In asyncio, tasks form a tree where edges represent "awaits" relationships:
48+
49+ Task-1 (root)
50+ |
51+ main()
52+ / \
53+ Worker-A Worker-B <-- Normal case: 1 parent each
54+ | |
55+ worker_a() worker_b()
56+ | |
57+ LeafA-1 LeafB-1 <-- Leaves: no children
58+ | |
59+ leaf_work() leaf_work()
60+
61+ However, we can also have diamond dependencies (multiple parents):
62+
63+ Task-1 (root)
64+ |
65+ main()
66+ / \\
67+ Parent-1 Parent-2 <-- Both await SharedChild
68+ | |
69+ parent_1() parent_2()
70+ \ /
71+ SharedChild <-- Diamond: 2 parents!
72+ |
73+ shared_child()
74+
75+ Output Generation:
76+ ==================
77+ For each LEAF task, we generate linear stacks showing all paths to the root.
78+
79+ Normal case (single parent chain):
80+ -----------------------------------
81+ LeafA-1 generates ONE stack:
82+ leaf_work > LeafA-1 > worker_a > Worker-A > main > Task-1
83+
84+ Diamond case (multiple parents):
85+ ---------------------------------
86+ SharedChild generates TWO stacks (one per parent path):
87+ Path 1: shared_child > SharedChild > parent_1 > Parent-1 > main > Task-1
88+ Path 2: shared_child > SharedChild > parent_2 > Parent-2 > main > Task-1
89+
90+ Three-Phase Algorithm:
91+ ======================
92+ 1. Index: Build task_map and parent relationships (O(n))
93+ 2. Find Leaves: Identify tasks not awaited by anyone (O(n))
94+ 3. BFS Traversal: For each leaf, explore all paths to root (O(n × depth × paths))
95+
96+ Performance: O(n × depth × num_paths)
97+ - n = number of tasks
98+ - depth = max task nesting depth (typically 3-10)
99+ - num_paths = number of parent paths (typically 1, rarely >1)
100+ """
101+ # Phase 1: Index tasks and build parent relationships
102+ task_map , child_to_parents , all_task_ids = self ._build_task_graph (awaited_info_list )
103+
104+ # Phase 2: Find leaf tasks (tasks not awaited by anyone)
105+ leaf_task_ids = self ._find_leaf_tasks (child_to_parents , all_task_ids )
106+
107+ # Phase 3: Build linear stacks via BFS from each leaf to root
108+ yield from self ._build_linear_stacks (leaf_task_ids , task_map , child_to_parents )
109+
110+ def _build_task_graph (self , awaited_info_list ):
111+ """
112+ Build task graph from awaited info.
113+
114+ Returns:
115+ task_map: {task_id: (TaskInfo, thread_id)}
116+ child_to_parents: {child_id: [parent_id, ...]}
117+ all_task_ids: set of all task IDs
118+ """
41119 task_map = {}
42120 child_to_parents = {}
43121 all_task_ids = set ()
@@ -48,70 +126,97 @@ def _iter_async_frames(self, awaited_info_list):
48126 task_id = task_info .task_id
49127 task_map [task_id ] = (task_info , thread_id )
50128 all_task_ids .add (task_id )
129+
130+ # Store parent task IDs (not frames - those are in task_info.coroutine_stack)
51131 if task_info .awaited_by :
52- # Store all parent coroutines, not just [0]
53- child_to_parents [task_id ] = task_info .awaited_by
132+ child_to_parents [task_id ] = [p .task_name for p in task_info .awaited_by ]
54133
55- # Identify leaf tasks (O(n))
56- # Collect all parent task IDs from all coroutines
57- all_parent_ids = set ()
58- for parent_coros in child_to_parents .values ():
59- for parent_coro in parent_coros :
60- all_parent_ids .add (parent_coro .task_name )
61- leaf_task_ids = all_task_ids - all_parent_ids
134+ return task_map , child_to_parents , all_task_ids
135+
136+ def _find_leaf_tasks (self , child_to_parents , all_task_ids ):
137+ """
138+ Find leaf tasks (tasks not awaited by anyone).
139+
140+ Leaf tasks are identified by set difference:
141+ leaves = all_tasks - tasks_that_have_parents
62142
63- # Build linear stacks for each leaf (O(n × depth × num_paths))
64- # For tasks with multiple parents, we generate one stack per parent path
143+ Example:
144+ Tasks: {Task-1, Parent-1, Parent-2, SharedChild}
145+ Parents: {SharedChild: [Parent-1, Parent-2],
146+ Parent-1: [Task-1],
147+ Parent-2: [Task-1]}
148+ Tasks with parents: {SharedChild, Parent-1, Parent-2}
149+ Leaves: {Task-1} <- Not awaited by anyone
150+
151+ Returns:
152+ set of leaf task IDs
153+ """
154+ all_parent_ids = set ()
155+ for parent_ids in child_to_parents .values ():
156+ all_parent_ids .update (parent_ids )
157+ return all_task_ids - all_parent_ids
158+
159+ def _build_linear_stacks (self , leaf_task_ids , task_map , child_to_parents ):
160+ """
161+ Build linear stacks by BFS from each leaf to root.
162+
163+ BFS Strategy:
164+ =============
165+ For each leaf task, we perform a breadth-first search walking UP the tree
166+ from leaf to root. At each node, we accumulate frames and enqueue parents.
167+
168+ When a task has MULTIPLE parents, we enqueue EACH parent separately with
169+ the accumulated frames. This creates separate paths that will eventually
170+ yield separate complete stacks.
171+
172+ Queue Item Format:
173+ ------------------
174+ (task_id, frames_accumulated_so_far, visited_tasks_on_this_path)
175+
176+ Yields:
177+ -------
178+ (frames, thread_id, leaf_task_id) for each complete path from leaf to root
179+ """
65180 for leaf_id in leaf_task_ids :
66- # Use BFS to explore all paths from leaf to root
67- # Queue items: (current_task_id, frames_accumulated)
68- queue = [(leaf_id , [])]
69- visited = set ()
181+ # BFS queue: (current_task_id, frames_so_far, path_for_cycle_detection)
182+ queue = [(leaf_id , [], frozenset ())]
70183
71184 while queue :
72- current_id , frames = queue .pop (0 )
185+ current_id , frames , path = queue .pop (0 )
73186
74- # Avoid processing the same task twice in this path
75- if current_id in visited :
187+ # Cycle detection
188+ if current_id in path :
76189 continue
77- visited .add (current_id )
78190
191+ # End of path (parent ID not in task_map)
79192 if current_id not in task_map :
80- # Reached end of path - yield if we have frames
81193 if frames :
82194 _ , thread_id = task_map [leaf_id ]
83195 yield frames , thread_id , leaf_id
84196 continue
85197
198+ # Process current task
86199 task_info , tid = task_map [current_id ]
87-
88- # Add this task's frames
89200 new_frames = list (frames )
201+ new_path = path | {current_id }
202+
203+ # Add all frames from all coroutines in this task
90204 if task_info .coroutine_stack :
91- for frame in task_info .coroutine_stack [0 ].call_stack :
92- new_frames .append (frame )
205+ for coro_info in task_info .coroutine_stack :
206+ for frame in coro_info .call_stack :
207+ new_frames .append (frame )
93208
94209 # Add task boundary marker
95210 task_name = task_info .task_name or "Task-" + str (task_info .task_id )
96211 new_frames .append (FrameInfo (("<task>" , 0 , task_name )))
97212
98- # Get parent coroutines
99- parent_coros = child_to_parents .get (current_id )
100- if not parent_coros :
101- # No parents - this is the root, yield the complete stack
102- yield new_frames , tid , leaf_id
103- continue
104-
105- # For each parent coroutine, add its await frames and continue to parent task
106- for parent_coro in parent_coros :
107- parent_task_id = parent_coro .task_name
108-
109- # Add the parent's await-site frames (where parent awaits this task)
110- path_frames = list (new_frames )
111- for frame in parent_coro .call_stack :
112- path_frames .append (frame )
213+ # Get parent task IDs
214+ parent_ids = child_to_parents .get (current_id , [])
113215
114- # Continue BFS with parent task
115- # Note: parent_coro.call_stack contains the frames from the parent task,
116- # so we should NOT add parent task's coroutine_stack again
117- queue .append ((parent_task_id , path_frames ))
216+ if not parent_ids :
217+ # Root task - yield complete stack
218+ yield new_frames , tid , leaf_id
219+ else :
220+ # Continue to each parent (creates multiple paths if >1 parent)
221+ for parent_id in parent_ids :
222+ queue .append ((parent_id , new_frames , new_path ))
0 commit comments