-
Notifications
You must be signed in to change notification settings - Fork 22
Make unbounded fanout messages spillable #711
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Make unbounded fanout messages spillable #711
Conversation
Signed-off-by: niranda perera <[email protected]>
|
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
…ded-fanout-state-spillable
| std::unique_lock item_lock(item->mutex); | ||
| RAPIDSMPF_EXPECTS( | ||
| item->message.has_value(), | ||
| "empty message " + std::to_string(mid), | ||
| std::out_of_range | ||
| ); | ||
| auto const& msg = *item->message; | ||
| auto res = br->reserve_or_fail(msg.copy_cost(), msg.spillable_memory_types()); | ||
| return msg.copy(res); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not 100% sure if this is good because every time we make a copy, we would have to lock-unlock two mutexes.
Can we keep a copy in the fanout and make copies there, rather than copying messages from spillable messages? @madsbk WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a good idea, but we only want a Message to be at one location at a time!
I have submitted a small PR that introduces SpillableMessages::copy: #713.
I think that is useful here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@madsbk I'm just seeing this. I'm a little opposed to taking a reservation here. Because then, we need to have another method in SpillableMessages to extract the content_descriptions for a message ID to determine the size of the allocation. I think this makes things unnecessarily complicated
| * @return Span of memory types that this message can be spilled to. | ||
| */ | ||
| [[nodiscard]] constexpr std::span<const MemoryType> | ||
| spillable_memory_types() const noexcept { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a good idea, but could we generalise it and move it into memory_type.hpp?
Something like:
constexpr std::span<MemoryType const> memory_types_lower_than(MemoryType mem_type) noexcept;There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we havent finalized on the assigning an order to memory types 😇 I think I have this on #601
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have included some generalization in #718
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking at it now
```c++
/**
* @brief Create a deep copy of a message without removing it.
*
* This method duplicates the message identified by `mid` while leaving the
* original message intact inside the container. The returned message is a
* full deep copy of the payload. If the message is currently being spilled
* by another thread, this call waits until spilling completes.
*
* @param mid Message identifier.
* @param reservation Memory reservation used for allocating buffers during
* the deep copy. The reservation also determines the memory type of the
* returned message.
*
* @return A deep copy of the referenced `Message`.
*
* @throws std::out_of_range If the message has already been extracted.
* @throws std::runtime_error If required memory cannot be allocated using
* the provided reservation.
*/
[[nodiscard]] Message copy(MessageId mid, MemoryReservation& reservation);
```
Based on #711
Authors:
- Mads R. B. Kristensen (https://github.com/madsbk)
Approvers:
- Peter Andreas Entschev (https://github.com/pentschev)
URL: #713
…ded-fanout-state-spillable Signed-off-by: niranda perera <[email protected]>
Signed-off-by: niranda perera <[email protected]>
…ded-fanout-state-spillable
…ded-fanout-state-spillable Signed-off-by: niranda perera <[email protected]>
Signed-off-by: niranda perera <[email protected]>
…ded-fanout-state-spillable Signed-off-by: niranda perera <[email protected]>
ab0a1e5 to
feb04a8
Compare
Signed-off-by: niranda perera <[email protected]>
Signed-off-by: niranda perera <[email protected]>
Signed-off-by: niranda perera <[email protected]>
| /** | ||
| * @brief Get the lower memory types than or equal to the @p mem_type . | ||
| * | ||
| * @param mem_type The memory type. | ||
| * @return A span of the lower memory types than the given memory type. | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /** | |
| * @brief Get the lower memory types than or equal to the @p mem_type . | |
| * | |
| * @param mem_type The memory type. | |
| * @return A span of the lower memory types than the given memory type. | |
| */ | |
| /** | |
| * @brief Get the memory types with preference lower than or equal to @p mem_type. | |
| * | |
| * The returned span reflects the predefined ordering used in @ref MEMORY_TYPES, | |
| * which lists memory types in decreasing order of preference. | |
| * | |
| * @param mem_type The memory type used as the starting point. | |
| * @return A span of memory types whose preference is lower than or equal to | |
| * the given type. | |
| */ |
| * @return A coroutine that evaluates to the message id. If the channel is shut down, | ||
| * the message id will be `SpillableMessages::InvalidMessageId`. | ||
| */ | ||
| coro::task<SpillableMessages::MessageId> receive_message_id(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the use of MessageId inside a Channel should remain an implementation detail. We should avoid exposing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead, insert the received message with ctx->spillable_messages()->insert(std::move(msg)) and use the returned MessageId. This keeps the Channel free of implementation details, and allows us to change the Channel internals or swap out the spillable_messages implementation later without requiring changes outside Channel.
Closes #675