Skip to content

Conversation

@nirandaperera
Copy link
Contributor

Closes #675

Signed-off-by: niranda perera <[email protected]>
@copy-pr-bot
Copy link

copy-pr-bot bot commented Dec 4, 2025

Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually.

Contributors can view more details about this message here.

@nirandaperera nirandaperera added improvement Improves an existing functionality non-breaking Introduces a non-breaking change labels Dec 4, 2025
Comment on lines 37 to 45
std::unique_lock item_lock(item->mutex);
RAPIDSMPF_EXPECTS(
item->message.has_value(),
"empty message " + std::to_string(mid),
std::out_of_range
);
auto const& msg = *item->message;
auto res = br->reserve_or_fail(msg.copy_cost(), msg.spillable_memory_types());
return msg.copy(res);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure if this is good because every time we make a copy, we would have to lock-unlock two mutexes.
Can we keep a copy in the fanout and make copies there, rather than copying messages from spillable messages? @madsbk WDYT?

Copy link
Member

@madsbk madsbk Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good idea, but we only want a Message to be at one location at a time!

I have submitted a small PR that introduces SpillableMessages::copy: #713.
I think that is useful here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madsbk I'm just seeing this. I'm a little opposed to taking a reservation here. Because then, we need to have another method in SpillableMessages to extract the content_descriptions for a message ID to determine the size of the allocation. I think this makes things unnecessarily complicated

* @return Span of memory types that this message can be spilled to.
*/
[[nodiscard]] constexpr std::span<const MemoryType>
spillable_memory_types() const noexcept {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good idea, but could we generalise it and move it into memory_type.hpp?
Something like:

constexpr std::span<MemoryType const> memory_types_lower_than(MemoryType mem_type) noexcept;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we havent finalized on the assigning an order to memory types 😇 I think I have this on #601

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have included some generalization in #718

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking at it now

rapids-bot bot pushed a commit that referenced this pull request Dec 5, 2025
```c++
    /**
     * @brief Create a deep copy of a message without removing it.
     *
     * This method duplicates the message identified by `mid` while leaving the
     * original message intact inside the container. The returned message is a
     * full deep copy of the payload. If the message is currently being spilled
     * by another thread, this call waits until spilling completes.
     *
     * @param mid Message identifier.
     * @param reservation Memory reservation used for allocating buffers during
     * the deep copy. The reservation also determines the memory type of the
     * returned message.
     *
     * @return A deep copy of the referenced `Message`.
     *
     * @throws std::out_of_range If the message has already been extracted.
     * @throws std::runtime_error If required memory cannot be allocated using
     * the provided reservation.
     */
    [[nodiscard]] Message copy(MessageId mid, MemoryReservation& reservation);
```

Based on #711

Authors:
  - Mads R. B. Kristensen (https://github.com/madsbk)

Approvers:
  - Peter Andreas Entschev (https://github.com/pentschev)

URL: #713
@nirandaperera nirandaperera marked this pull request as ready for review December 8, 2025 16:11
@nirandaperera nirandaperera requested a review from a team as a code owner December 8, 2025 16:11
@nirandaperera nirandaperera requested a review from madsbk December 8, 2025 18:40
…ded-fanout-state-spillable

Signed-off-by: niranda perera <[email protected]>
Signed-off-by: niranda perera <[email protected]>
…ded-fanout-state-spillable

Signed-off-by: niranda perera <[email protected]>
@nirandaperera nirandaperera force-pushed the Make-unbounded-fanout-state-spillable branch from ab0a1e5 to feb04a8 Compare December 8, 2025 23:20
Signed-off-by: niranda perera <[email protected]>
Signed-off-by: niranda perera <[email protected]>
@nirandaperera nirandaperera requested a review from a team as a code owner December 9, 2025 00:17
Signed-off-by: niranda perera <[email protected]>
Comment on lines +38 to +43
/**
* @brief Get the lower memory types than or equal to the @p mem_type .
*
* @param mem_type The memory type.
* @return A span of the lower memory types than the given memory type.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/**
* @brief Get the lower memory types than or equal to the @p mem_type .
*
* @param mem_type The memory type.
* @return A span of the lower memory types than the given memory type.
*/
/**
* @brief Get the memory types with preference lower than or equal to @p mem_type.
*
* The returned span reflects the predefined ordering used in @ref MEMORY_TYPES,
* which lists memory types in decreasing order of preference.
*
* @param mem_type The memory type used as the starting point.
* @return A span of memory types whose preference is lower than or equal to
* the given type.
*/

* @return A coroutine that evaluates to the message id. If the channel is shut down,
* the message id will be `SpillableMessages::InvalidMessageId`.
*/
coro::task<SpillableMessages::MessageId> receive_message_id();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the use of MessageId inside a Channel should remain an implementation detail. We should avoid exposing it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead, insert the received message with ctx->spillable_messages()->insert(std::move(msg)) and use the returned MessageId. This keeps the Channel free of implementation details, and allows us to change the Channel internals or swap out the spillable_messages implementation later without requiring changes outside Channel.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

improvement Improves an existing functionality non-breaking Introduces a non-breaking change

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Make unbounded fanout state spillable

2 participants