Skip to content
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
af65c15
It might work but the code is bad
savannahostrowski Nov 3, 2025
ec28f88
Account for function doing CPU work before/after spawning workers
savannahostrowski Nov 3, 2025
1e01766
Merge branch 'main' into async-tachyon
savannahostrowski Nov 3, 2025
2a2e197
Code cleanup
savannahostrowski Nov 3, 2025
61dc0bb
WIP
savannahostrowski Nov 3, 2025
c9c34a5
Merge branch 'main' into async-tachyon
savannahostrowski Nov 13, 2025
cc9e9ab
Remove depth
savannahostrowski Nov 13, 2025
9b22f1e
Make keyword only
savannahostrowski Nov 13, 2025
890474d
Fix tests
savannahostrowski Nov 13, 2025
563ecff
Bruuuh, it worked
savannahostrowski Nov 13, 2025
112ce73
Simplify algo
pablogsal Nov 14, 2025
2beed97
Fix multiple parents
pablogsal Nov 14, 2025
7315953
Good shit
pablogsal Nov 14, 2025
f8e9d72
Deque, deduplicate yields, propagate thread_id
savannahostrowski Nov 14, 2025
9a4875f
📜🤖 Added by blurb_it.
blurb-it[bot] Nov 14, 2025
ec6fb51
Remove deduplication of leaves to ensure call stacks can be properly …
savannahostrowski Nov 14, 2025
67e1f74
Merge branch 'async-tachyon' of https://github.com/savannahostrowski/…
savannahostrowski Nov 14, 2025
e9ae950
Fix WASI
savannahostrowski Nov 14, 2025
acef9a0
More WASI fixes
savannahostrowski Nov 14, 2025
2953454
Merge main
savannahostrowski Nov 23, 2025
09f5205
Fix tests
savannahostrowski Nov 23, 2025
dc7abae
Fix broken imports
savannahostrowski Nov 24, 2025
36c8b3c
Remove old test file
savannahostrowski Nov 24, 2025
be6d228
Merge remote-tracking branch 'upstream/main' into async-tachyon
pablogsal Nov 24, 2025
64ccb1a
Fixes
pablogsal Nov 24, 2025
fca9c88
fixup! Fixes
pablogsal Nov 25, 2025
394069d
Merge main
savannahostrowski Dec 1, 2025
3d9d2fb
Fix test error
savannahostrowski Dec 1, 2025
1134431
Fix quotations for consistency
savannahostrowski Dec 1, 2025
f0242e1
Merge remote-tracking branch 'upstream/main' into async-tachyon
pablogsal Dec 6, 2025
56661dc
Update to latest main
pablogsal Dec 6, 2025
ff983d8
Fix tests
pablogsal Dec 6, 2025
2203021
Fix tests
pablogsal Dec 6, 2025
e6eaa2c
CLI update
pablogsal Dec 6, 2025
47ebc11
Small fixes
pablogsal Dec 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions Lib/profiling/sampling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@
call stack rather than tracing every function call.
"""

from .collector import Collector
from .pstats_collector import PstatsCollector
from .stack_collector import CollapsedStackCollector
from .gecko_collector import GeckoCollector
from .string_table import StringTable
# Profiling requires the _remote_debugging C extension.
try:
import _remote_debugging # noqa: F401
except ImportError:
__all__ = ()
else:
from .collector import Collector
from .pstats_collector import PstatsCollector
from .stack_collector import CollapsedStackCollector
from .gecko_collector import GeckoCollector
from .string_table import StringTable

__all__ = ("Collector", "PstatsCollector", "CollapsedStackCollector", "GeckoCollector", "StringTable")
__all__ = ("Collector", "PstatsCollector", "CollapsedStackCollector", "GeckoCollector", "StringTable")
12 changes: 12 additions & 0 deletions Lib/profiling/sampling/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,14 @@ def _add_sampling_options(parser):
dest="gc",
help='Don\'t include artificial "<GC>" frames to denote active garbage collection',
)
sampling_group.add_argument(
"--async-aware",
choices=["running", "all"],
default=None,
metavar="MODE",
help='Enable async-aware profiling: "running" (only running task) '
'or "all" (all tasks including waiting)',
)


def _add_mode_options(parser):
Expand Down Expand Up @@ -557,6 +565,7 @@ def _handle_attach(args):
all_threads=args.all_threads,
realtime_stats=args.realtime_stats,
mode=mode,
async_aware=args.async_aware,
native=args.native,
gc=args.gc,
)
Expand Down Expand Up @@ -605,6 +614,7 @@ def _handle_run(args):
all_threads=args.all_threads,
realtime_stats=args.realtime_stats,
mode=mode,
async_aware=args.async_aware,
native=args.native,
gc=args.gc,
)
Expand Down Expand Up @@ -647,6 +657,7 @@ def _handle_live_attach(args, pid):
all_threads=args.all_threads,
realtime_stats=args.realtime_stats,
mode=mode,
async_aware=args.async_aware,
native=args.native,
gc=args.gc,
)
Expand Down Expand Up @@ -687,6 +698,7 @@ def _handle_live_run(args):
all_threads=args.all_threads,
realtime_stats=args.realtime_stats,
mode=mode,
async_aware=args.async_aware,
native=args.native,
gc=args.gc,
)
Expand Down
96 changes: 95 additions & 1 deletion Lib/profiling/sampling/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@
from .constants import (
THREAD_STATUS_HAS_GIL,
THREAD_STATUS_ON_CPU,
THREAD_STATUS_UNKNOWN,
THREAD_STATUS_GIL_REQUESTED,
THREAD_STATUS_UNKNOWN,
)

try:
from _remote_debugging import FrameInfo
except ImportError:
# Fallback definition if _remote_debugging is not available
FrameInfo = None

class Collector(ABC):
@abstractmethod
def collect(self, stack_frames):
Expand Down Expand Up @@ -33,6 +39,94 @@ def _iter_all_frames(self, stack_frames, skip_idle=False):
if frames:
yield frames, thread_info.thread_id

def _iter_async_frames(self, awaited_info_list):
# Phase 1: Index tasks and build parent relationships with pre-computed selection
task_map, child_to_parent, all_task_ids, all_parent_ids = self._build_task_graph(awaited_info_list)

# Phase 2: Find leaf tasks (tasks not awaited by anyone)
leaf_task_ids = self._find_leaf_tasks(all_task_ids, all_parent_ids)

# Phase 3: Build linear stacks from each leaf to root (optimized - no sorting!)
yield from self._build_linear_stacks(leaf_task_ids, task_map, child_to_parent)

def _build_task_graph(self, awaited_info_list):
task_map = {}
child_to_parent = {} # Maps child_id -> (selected_parent_id, parent_count)
all_task_ids = set()
all_parent_ids = set() # Track ALL parent IDs for leaf detection

for awaited_info in awaited_info_list:
thread_id = awaited_info.thread_id
for task_info in awaited_info.awaited_by:
task_id = task_info.task_id
task_map[task_id] = (task_info, thread_id)
all_task_ids.add(task_id)

# Pre-compute selected parent and count for optimization
if task_info.awaited_by:
parent_ids = [p.task_name for p in task_info.awaited_by]
parent_count = len(parent_ids)
# Track ALL parents for leaf detection
all_parent_ids.update(parent_ids)
# Use min() for O(n) instead of sorted()[0] which is O(n log n)
selected_parent = min(parent_ids) if parent_count > 1 else parent_ids[0]
child_to_parent[task_id] = (selected_parent, parent_count)

return task_map, child_to_parent, all_task_ids, all_parent_ids

def _find_leaf_tasks(self, all_task_ids, all_parent_ids):
# Leaves are tasks that are not parents of any other task
return all_task_ids - all_parent_ids

def _build_linear_stacks(self, leaf_task_ids, task_map, child_to_parent):
for leaf_id in leaf_task_ids:
frames = []
visited = set()
current_id = leaf_id
thread_id = None

# Follow the single parent chain from leaf to root
while current_id is not None:
# Cycle detection
if current_id in visited:
break
visited.add(current_id)

# Check if task exists in task_map
if current_id not in task_map:
break

task_info, tid = task_map[current_id]

# Set thread_id from first task
if thread_id is None:
thread_id = tid

# Add all frames from all coroutines in this task
if task_info.coroutine_stack:
for coro_info in task_info.coroutine_stack:
for frame in coro_info.call_stack:
frames.append(frame)

# Get pre-computed parent info (no sorting needed!)
parent_info = child_to_parent.get(current_id)

# Add task boundary marker with parent count annotation if multiple parents
task_name = task_info.task_name or "Task-" + str(task_info.task_id)
if parent_info:
selected_parent, parent_count = parent_info
if parent_count > 1:
task_name = f"{task_name} ({parent_count} parents)"
frames.append(FrameInfo(("<task>", 0, task_name)))
current_id = selected_parent
else:
# Root task - no parent
frames.append(FrameInfo(("<task>", 0, task_name)))
current_id = None

# Yield the complete stack if we collected any frames
if frames and thread_id is not None:
yield frames, thread_id, leaf_id
def _is_gc_frame(self, frame):
if isinstance(frame, tuple):
funcname = frame[2] if len(frame) >= 3 else ""
Expand Down
10 changes: 8 additions & 2 deletions Lib/profiling/sampling/pstats_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,14 @@ def _process_frames(self, frames):
self.callers[callee][caller] += 1

def collect(self, stack_frames):
for frames, thread_id in self._iter_all_frames(stack_frames, skip_idle=self.skip_idle):
self._process_frames(frames)
if stack_frames and hasattr(stack_frames[0], "awaited_by"):
# Async frame processing
for frames, thread_id, task_id in self._iter_async_frames(stack_frames):
self._process_frames(frames)
else:
# Regular frame processing
for frames, thread_id in self._iter_all_frames(stack_frames, skip_idle=self.skip_idle):
self._process_frames(frames)

def export(self, filename):
self.create_stats()
Expand Down
13 changes: 10 additions & 3 deletions Lib/profiling/sampling/sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def __init__(self, pid, sample_interval_usec, all_threads, *, mode=PROFILING_MOD
self.total_samples = 0
self.realtime_stats = False

def sample(self, collector, duration_sec=10):
def sample(self, collector, duration_sec=10, *, async_aware=False):
sample_interval_sec = self.sample_interval_usec / 1_000_000
running_time = 0
num_samples = 0
Expand All @@ -68,7 +68,12 @@ def sample(self, collector, duration_sec=10):
current_time = time.perf_counter()
if next_time < current_time:
try:
stack_frames = self.unwinder.get_stack_trace()
if async_aware == "all":
stack_frames = self.unwinder.get_all_awaited_by()
elif async_aware == "running":
stack_frames = self.unwinder.get_async_stack_trace()
else:
stack_frames = self.unwinder.get_stack_trace()
collector.collect(stack_frames)
except ProcessLookupError:
duration_sec = current_time - start_time
Expand Down Expand Up @@ -191,6 +196,7 @@ def sample(
all_threads=False,
realtime_stats=False,
mode=PROFILING_MODE_WALL,
async_aware=None,
native=False,
gc=True,
):
Expand Down Expand Up @@ -233,7 +239,7 @@ def sample(
profiler.realtime_stats = realtime_stats

# Run the sampling
profiler.sample(collector, duration_sec)
profiler.sample(collector, duration_sec, async_aware=async_aware)

return collector

Expand All @@ -246,6 +252,7 @@ def sample_live(
all_threads=False,
realtime_stats=False,
mode=PROFILING_MODE_WALL,
async_aware=None,
native=False,
gc=True,
):
Expand Down
16 changes: 12 additions & 4 deletions Lib/profiling/sampling/stack_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,18 @@ def __init__(self, sample_interval_usec, *, skip_idle=False):
self.skip_idle = skip_idle

def collect(self, stack_frames, skip_idle=False):
for frames, thread_id in self._iter_all_frames(stack_frames, skip_idle=skip_idle):
if not frames:
continue
self.process_frames(frames, thread_id)
if stack_frames and hasattr(stack_frames[0], "awaited_by"):
# Async-aware mode: process async task frames
for frames, thread_id, task_id in self._iter_async_frames(stack_frames):
if not frames:
continue
self.process_frames(frames, thread_id)
else:
# Sync-only mode
for frames, thread_id in self._iter_all_frames(stack_frames, skip_idle=skip_idle):
if not frames:
continue
self.process_frames(frames, thread_id)

def process_frames(self, frames, thread_id):
pass
Expand Down
35 changes: 35 additions & 0 deletions Lib/test/test_profiling/test_sampling_profiler/mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,38 @@ def __init__(self, interpreter_id, threads):

def __repr__(self):
return f"MockInterpreterInfo(interpreter_id={self.interpreter_id}, threads={self.threads})"


class MockCoroInfo:
"""Mock CoroInfo for testing async tasks."""

def __init__(self, task_name, call_stack):
self.task_name = task_name # In reality, this is the parent task ID
self.call_stack = call_stack

def __repr__(self):
return f"MockCoroInfo(task_name={self.task_name}, call_stack={self.call_stack})"


class MockTaskInfo:
"""Mock TaskInfo for testing async tasks."""

def __init__(self, task_id, task_name, coroutine_stack, awaited_by=None):
self.task_id = task_id
self.task_name = task_name
self.coroutine_stack = coroutine_stack # List of CoroInfo objects
self.awaited_by = awaited_by or [] # List of CoroInfo objects (parents)

def __repr__(self):
return f"MockTaskInfo(task_id={self.task_id}, task_name={self.task_name})"


class MockAwaitedInfo:
"""Mock AwaitedInfo for testing async tasks."""

def __init__(self, thread_id, awaited_by):
self.thread_id = thread_id
self.awaited_by = awaited_by # List of TaskInfo objects

def __repr__(self):
return f"MockAwaitedInfo(thread_id={self.thread_id}, awaited_by={len(self.awaited_by)} tasks)"
Loading
Loading