[feat] add shrink_to_fit() to Sender<T> and Receiver<T> #148
[feat] add shrink_to_fit() to Sender<T> and Receiver<T> #148rakbladsvalsen wants to merge 8 commits into
Conversation
zesterer
left a comment
There was a problem hiding this comment.
I'm a bit undecided how I feel about this PR. On the one hand, I recognise that this is a problem. On the other, this feels like it exposes far too much of the implementation in a public API.
One alternative might be to check, in the pull path, whether capacity > length * 2 + some constant and if so, shrink the internal queue automatically.
| /// Discards excess capacity in the internal queue. | ||
| pub fn shrink_to_fit(&self) { | ||
| self.shared.shrink_to_fit(); | ||
| } | ||
|
|
||
| /// Returns the number of elements the internal queue can hold without reallocating. | ||
| pub fn queue_capacity(&self) -> usize { | ||
| self.shared.queue_capacity() | ||
| } |
There was a problem hiding this comment.
I don't think that queue_capacity should be externally exposed, the internal queue is very much an implementation detail.
| pub fn shrink_to_fit(&self) { | ||
| self.shared.shrink_to_fit(); | ||
| } |
There was a problem hiding this comment.
I think a less precise name might be better here to account for implementation changes: there's no saying whether we'll want to continue to use std's queues in the future.
There was a problem hiding this comment.
I completely agree. Perhaps shrink_internal_queue() is a better name?
Hey @zesterer, thanks for the review and sorry for the late response. I don't see how this could be solved without exposing a little bit of implementation details without adding either some sort of periodic task that shrinks the channels every now and then or what you mentioned. In my opinion, instead of adding the capacity checks, shrinking operations should be delegated to the user to mimic rust's collections behavior. That's how |
|
Regarding the I think it might also be useful to expose a method like |
|
@zesterer hey bud. Please review this whenever you get the chance. I've been using my own fork of flume in some projects but I'd very much appreciate to have this merged upstream. BTW, you probably want to consider running |
My reluctance about all of this is that 'a vector with capacity' is not even the right mental model to use for an MPMC channel (unlike, say, a vector or a hashmap). That it happens to be implemented with one right now is largely incidental: it could just as easily be implemented with a Out of interest, what specific use-case are you looking for? Would a more general |
I completely agree that the underlying storage structure could be basically anything, perhaps something as simple as
I use flume to asynchronously process events in long-running axum apps (e.g. save results to DB, send webhooks events, etc). More often than not I've seen memory usage go up and never go back again. I did some debugging and found out that something as common as a network fluctuation could cause events to pile up in the queue, causing While having an |
The problem with this is that there's no neat relationship between the channel capacity and the capacity of the underlying I think a reasonable approach here is to just have flume itself shrink the internal queue if its capacity exceeds its length by some load factor + a small constant to avoid overzealous reallocation for small sizes. This also doesn't require the user to have to think about this problem either. |
I fully agree. I hadn't taken a good chance to look at flume's codebase (even though it's small), but I just noticed there are about 3 queues. I think this issue can be potentially solved with a proper name? Perhaps something such as If the other two queues (
That sounds good, but I think that takes away some of the user's power and visibility over the queue implementation. No matter how small or big the constant is, that's most likely going to cause problems: if it's too small, and the queue gets filled up frequently, it'll hurt performance because of the constant de/allocation. If it's too big, the queue will never deallocate unused memory (which is what happens today). What I think would work best would be your I know, I know... the underlying implementation could change, and the naming issue needs to be worked out, but as far as impl changes go, that's something very unlikely, considering flume is pretty much stable at this point. |
@zesterer I agree with the author of the PR @rakbladsvalsen that the shrinking of the queue should be delegated to the user based on their requirements. Other channel implementations (std, crossbeam-channel) always allocate on |
I agree, although that would break the flume API and most, if not all channel libraries only provide a single flavor that either uses lockfree queues or good ol' Personally, I'd prefer to see a But at this point I've long given up on flume and replaced it with async-channel. It's maintained by the smol team, offers pretty much the same features and uses lockfree queues. I don't know if it performs any better than flume, but it doesn't "leak" memory 🥲 |
Fixes: #147