flume/
lib.rs

1//! # Flume
2//!
3//! A blazingly fast multi-producer, multi-consumer channel.
4//!
5//! *"Do not communicate by sharing memory; instead, share memory by communicating."*
6//!
7//! ## Why Flume?
8//!
9//! - **Featureful**: Unbounded, bounded and rendezvous queues
10//! - **Fast**: Always faster than `std::sync::mpsc` and sometimes `crossbeam-channel`
11//! - **Safe**: No `unsafe` code anywhere in the codebase!
12//! - **Flexible**: `Sender` and `Receiver` both implement `Send + Sync + Clone`
13//! - **Familiar**: Drop-in replacement for `std::sync::mpsc`
14//! - **Capable**: Additional features like MPMC support and send timeouts/deadlines
15//! - **Simple**: Few dependencies, minimal codebase, fast to compile
16//! - **Asynchronous**: `async` support, including mix 'n match with sync code
17//! - **Ergonomic**: Powerful `select`-like interface
18//!
19//! ## Example
20//!
21//! ```
22//! let (tx, rx) = flume::unbounded();
23//!
24//! tx.send(42).unwrap();
25//! assert_eq!(rx.recv().unwrap(), 42);
26//! ```
27
28#![cfg_attr(docsrs, feature(doc_cfg))]
29#![deny(missing_docs)]
30
31#[cfg(feature = "async")]
32pub mod r#async;
33#[cfg(feature = "select")]
34pub mod select;
35
36mod signal;
37
38// Reexports
39#[cfg(feature = "select")]
40pub use select::Selector;
41
42use crate::signal::{Signal, SyncSignal};
43#[cfg(feature = "spin")]
44use spin1::{Mutex as Spinlock, MutexGuard as SpinlockGuard};
45use std::fmt::Formatter;
46use std::{
47    collections::VecDeque,
48    fmt,
49    marker::PhantomData,
50    sync::{
51        atomic::{AtomicBool, AtomicUsize, Ordering},
52        Arc, Weak,
53    },
54    thread,
55    time::{Duration, Instant},
56};
57
58/// An error that may be emitted when attempting to send a value into a channel on a sender when
59/// all receivers are dropped.
60#[derive(Copy, Clone, PartialEq, Eq)]
61pub struct SendError<T>(pub T);
62
63impl<T> SendError<T> {
64    /// Consume the error, yielding the message that failed to send.
65    pub fn into_inner(self) -> T {
66        self.0
67    }
68}
69
70impl<T> fmt::Debug for SendError<T> {
71    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72        "SendError(..)".fmt(f)
73    }
74}
75
76impl<T> fmt::Display for SendError<T> {
77    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78        "sending on a closed channel".fmt(f)
79    }
80}
81
82impl<T> std::error::Error for SendError<T> {}
83
84/// An error that may be emitted when attempting to send a value into a channel on a sender when
85/// the channel is full or all receivers are dropped.
86#[derive(Copy, Clone, PartialEq, Eq)]
87pub enum TrySendError<T> {
88    /// The channel the message is sent on has a finite capacity and was full when the send was attempted.
89    Full(T),
90    /// All channel receivers were dropped and so the message has nobody to receive it.
91    Disconnected(T),
92}
93
94impl<T> TrySendError<T> {
95    /// Consume the error, yielding the message that failed to send.
96    pub fn into_inner(self) -> T {
97        match self {
98            Self::Full(msg) | Self::Disconnected(msg) => msg,
99        }
100    }
101}
102
103impl<T> fmt::Debug for TrySendError<T> {
104    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105        match *self {
106            TrySendError::Full(..) => "Full(..)".fmt(f),
107            TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
108        }
109    }
110}
111
112impl<T> fmt::Display for TrySendError<T> {
113    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114        match self {
115            TrySendError::Full(..) => "sending on a full channel".fmt(f),
116            TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f),
117        }
118    }
119}
120
121impl<T> std::error::Error for TrySendError<T> {}
122
123impl<T> From<SendError<T>> for TrySendError<T> {
124    fn from(err: SendError<T>) -> Self {
125        match err {
126            SendError(item) => Self::Disconnected(item),
127        }
128    }
129}
130
131/// An error that may be emitted when sending a value into a channel on a sender with a timeout when
132/// the send operation times out or all receivers are dropped.
133#[derive(Copy, Clone, PartialEq, Eq)]
134pub enum SendTimeoutError<T> {
135    /// A timeout occurred when attempting to send the message.
136    Timeout(T),
137    /// All channel receivers were dropped and so the message has nobody to receive it.
138    Disconnected(T),
139}
140
141impl<T> SendTimeoutError<T> {
142    /// Consume the error, yielding the message that failed to send.
143    pub fn into_inner(self) -> T {
144        match self {
145            Self::Timeout(msg) | Self::Disconnected(msg) => msg,
146        }
147    }
148}
149
150impl<T> fmt::Debug for SendTimeoutError<T> {
151    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
152        "SendTimeoutError(..)".fmt(f)
153    }
154}
155
156impl<T> fmt::Display for SendTimeoutError<T> {
157    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
158        match self {
159            SendTimeoutError::Timeout(..) => "timed out sending on a full channel".fmt(f),
160            SendTimeoutError::Disconnected(..) => "sending on a closed channel".fmt(f),
161        }
162    }
163}
164
165impl<T> std::error::Error for SendTimeoutError<T> {}
166
167impl<T> From<SendError<T>> for SendTimeoutError<T> {
168    fn from(err: SendError<T>) -> Self {
169        match err {
170            SendError(item) => Self::Disconnected(item),
171        }
172    }
173}
174
175enum TrySendTimeoutError<T> {
176    Full(T),
177    Disconnected(T),
178    Timeout(T),
179}
180
181/// An error that may be emitted when attempting to wait for a value on a receiver when all senders
182/// are dropped and there are no more messages in the channel.
183#[derive(Copy, Clone, Debug, PartialEq, Eq)]
184pub enum RecvError {
185    /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received.
186    Disconnected,
187}
188
189impl fmt::Display for RecvError {
190    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191        match self {
192            RecvError::Disconnected => "receiving on a closed channel".fmt(f),
193        }
194    }
195}
196
197impl std::error::Error for RecvError {}
198
199/// An error that may be emitted when attempting to fetch a value on a receiver when there are no
200/// messages in the channel. If there are no messages in the channel and all senders are dropped,
201/// then `TryRecvError::Disconnected` will be returned.
202#[derive(Copy, Clone, Debug, PartialEq, Eq)]
203pub enum TryRecvError {
204    /// The channel was empty when the receive was attempted.
205    Empty,
206    /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received.
207    Disconnected,
208}
209
210impl fmt::Display for TryRecvError {
211    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212        match self {
213            TryRecvError::Empty => "receiving on an empty channel".fmt(f),
214            TryRecvError::Disconnected => "channel is empty and closed".fmt(f),
215        }
216    }
217}
218
219impl std::error::Error for TryRecvError {}
220
221impl From<RecvError> for TryRecvError {
222    fn from(err: RecvError) -> Self {
223        match err {
224            RecvError::Disconnected => Self::Disconnected,
225        }
226    }
227}
228
229/// An error that may be emitted when attempting to wait for a value on a receiver with a timeout
230/// when the receive operation times out or all senders are dropped and there are no values left
231/// in the channel.
232#[derive(Copy, Clone, Debug, PartialEq, Eq)]
233pub enum RecvTimeoutError {
234    /// A timeout occurred when attempting to receive a message.
235    Timeout,
236    /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received.
237    Disconnected,
238}
239
240impl fmt::Display for RecvTimeoutError {
241    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
242        match self {
243            RecvTimeoutError::Timeout => "timed out waiting on a channel".fmt(f),
244            RecvTimeoutError::Disconnected => "channel is empty and closed".fmt(f),
245        }
246    }
247}
248
249impl std::error::Error for RecvTimeoutError {}
250
251impl From<RecvError> for RecvTimeoutError {
252    fn from(err: RecvError) -> Self {
253        match err {
254            RecvError::Disconnected => Self::Disconnected,
255        }
256    }
257}
258
259enum TryRecvTimeoutError {
260    Empty,
261    Timeout,
262    Disconnected,
263}
264
265// TODO: Investigate some sort of invalidation flag for timeouts
266#[cfg(feature = "spin")]
267struct Hook<T, S: ?Sized>(Option<Spinlock<Option<T>>>, S);
268
269#[cfg(not(feature = "spin"))]
270struct Hook<T, S: ?Sized>(Option<Mutex<Option<T>>>, S);
271
272#[cfg(feature = "spin")]
273impl<T, S: ?Sized + Signal> Hook<T, S> {
274    pub fn slot(msg: Option<T>, signal: S) -> Arc<Self>
275    where
276        S: Sized,
277    {
278        Arc::new(Self(Some(Spinlock::new(msg)), signal))
279    }
280
281    fn lock(&self) -> Option<SpinlockGuard<'_, Option<T>>> {
282        self.0.as_ref().map(|s| s.lock())
283    }
284}
285
286#[cfg(not(feature = "spin"))]
287impl<T, S: ?Sized + Signal> Hook<T, S> {
288    pub fn slot(msg: Option<T>, signal: S) -> Arc<Self>
289    where
290        S: Sized,
291    {
292        Arc::new(Self(Some(Mutex::new(msg)), signal))
293    }
294
295    fn lock(&self) -> Option<MutexGuard<'_, Option<T>>> {
296        self.0.as_ref().map(|s| s.lock().unwrap())
297    }
298}
299
300impl<T, S: ?Sized + Signal> Hook<T, S> {
301    pub fn fire_recv(&self) -> (T, &S) {
302        let msg = self.lock().unwrap().take().unwrap();
303        (msg, self.signal())
304    }
305
306    pub fn fire_send(&self, msg: T) -> (Option<T>, &S) {
307        let ret = match self.lock() {
308            Some(mut lock) => {
309                *lock = Some(msg);
310                None
311            }
312            None => Some(msg),
313        };
314        (ret, self.signal())
315    }
316
317    pub fn is_empty(&self) -> bool {
318        self.lock().map(|s| s.is_none()).unwrap_or(true)
319    }
320
321    pub fn try_take(&self) -> Option<T> {
322        self.lock().unwrap().take()
323    }
324
325    pub fn trigger(signal: S) -> Arc<Self>
326    where
327        S: Sized,
328    {
329        Arc::new(Self(None, signal))
330    }
331
332    pub fn signal(&self) -> &S {
333        &self.1
334    }
335
336    pub fn fire_nothing(&self) -> bool {
337        self.signal().fire()
338    }
339}
340
341impl<T> Hook<T, SyncSignal> {
342    pub fn wait_recv(&self, abort: &AtomicBool) -> Option<T> {
343        loop {
344            let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
345            let msg = self.lock().unwrap().take();
346            if let Some(msg) = msg {
347                break Some(msg);
348            } else if disconnected {
349                break None;
350            } else {
351                self.signal().wait()
352            }
353        }
354    }
355
356    // Err(true) if timeout
357    pub fn wait_deadline_recv(&self, abort: &AtomicBool, deadline: Instant) -> Result<T, bool> {
358        loop {
359            let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
360            let msg = self.lock().unwrap().take();
361            if let Some(msg) = msg {
362                break Ok(msg);
363            } else if disconnected {
364                break Err(false);
365            } else if let Some(dur) = deadline.checked_duration_since(Instant::now()) {
366                self.signal().wait_timeout(dur);
367            } else {
368                break Err(true);
369            }
370        }
371    }
372
373    pub fn wait_send(&self, abort: &AtomicBool) {
374        loop {
375            let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
376            if disconnected || self.lock().unwrap().is_none() {
377                break;
378            }
379
380            self.signal().wait();
381        }
382    }
383
384    // Err(true) if timeout
385    pub fn wait_deadline_send(&self, abort: &AtomicBool, deadline: Instant) -> Result<(), bool> {
386        loop {
387            let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
388            if self.lock().unwrap().is_none() {
389                break Ok(());
390            } else if disconnected {
391                break Err(false);
392            } else if let Some(dur) = deadline.checked_duration_since(Instant::now()) {
393                self.signal().wait_timeout(dur);
394            } else {
395                break Err(true);
396            }
397        }
398    }
399}
400
401#[cfg(feature = "spin")]
402#[inline]
403fn wait_lock<T>(lock: &Spinlock<T>) -> SpinlockGuard<'_, T> {
404    // Some targets don't support `thread::sleep` (e.g. the `wasm32-unknown-unknown` target when
405    // running in the main thread of a web browser) so we only use it on targets where we know it
406    // will work
407    #[cfg(any(target_family = "unix", target_family = "windows"))]
408    {
409        let mut i = 4;
410        loop {
411            for _ in 0..10 {
412                if let Some(guard) = lock.try_lock() {
413                    return guard;
414                }
415                thread::yield_now();
416            }
417            // Sleep for at most ~1 ms
418            thread::sleep(Duration::from_nanos(1 << i.min(20)));
419            i += 1;
420        }
421    }
422    #[cfg(not(any(target_family = "unix", target_family = "windows")))]
423    lock.lock()
424}
425
426#[cfg(not(feature = "spin"))]
427#[inline]
428fn wait_lock<'a, T>(lock: &'a Mutex<T>) -> MutexGuard<'a, T> {
429    lock.lock().unwrap()
430}
431
432#[cfg(not(feature = "spin"))]
433use std::sync::{Mutex, MutexGuard};
434
435#[cfg(feature = "spin")]
436type ChanLock<T> = Spinlock<T>;
437#[cfg(not(feature = "spin"))]
438type ChanLock<T> = Mutex<T>;
439
440type SignalVec<T> = VecDeque<Arc<Hook<T, dyn signal::Signal>>>;
441struct Chan<T> {
442    sending: Option<(usize, SignalVec<T>)>,
443    queue: VecDeque<T>,
444    waiting: SignalVec<T>,
445}
446
447impl<T> Chan<T> {
448    fn pull_pending(&mut self, pull_extra: bool) {
449        if let Some((cap, sending)) = &mut self.sending {
450            let effective_cap = *cap + pull_extra as usize;
451
452            while self.queue.len() < effective_cap {
453                if let Some(s) = sending.pop_front() {
454                    let (msg, signal) = s.fire_recv();
455                    signal.fire();
456                    self.queue.push_back(msg);
457                } else {
458                    break;
459                }
460            }
461        }
462    }
463
464    fn try_wake_receiver_if_pending(&mut self) {
465        if !self.queue.is_empty() {
466            while Some(false) == self.waiting.pop_front().map(|s| s.fire_nothing()) {}
467        }
468    }
469}
470
471struct Shared<T> {
472    chan: ChanLock<Chan<T>>,
473    disconnected: AtomicBool,
474    sender_count: AtomicUsize,
475    receiver_count: AtomicUsize,
476}
477
478impl<T> Shared<T> {
479    fn new(cap: Option<usize>) -> Self {
480        Self {
481            chan: ChanLock::new(Chan {
482                sending: cap.map(|cap| (cap, VecDeque::new())),
483                queue: VecDeque::new(),
484                waiting: VecDeque::new(),
485            }),
486            disconnected: AtomicBool::new(false),
487            sender_count: AtomicUsize::new(1),
488            receiver_count: AtomicUsize::new(1),
489        }
490    }
491
492    fn send<S: Signal, R: From<Result<(), TrySendTimeoutError<T>>>>(
493        &self,
494        msg: T,
495        should_block: bool,
496        make_signal: impl FnOnce(T) -> Arc<Hook<T, S>>,
497        do_block: impl FnOnce(Arc<Hook<T, S>>) -> R,
498    ) -> R {
499        let mut chan = wait_lock(&self.chan);
500
501        if self.is_disconnected() {
502            Err(TrySendTimeoutError::Disconnected(msg)).into()
503        } else if !chan.waiting.is_empty() {
504            let mut msg = Some(msg);
505
506            loop {
507                let slot = chan.waiting.pop_front();
508                match slot.as_ref().map(|r| r.fire_send(msg.take().unwrap())) {
509                    // No more waiting receivers and msg in queue, so break out of the loop
510                    None if msg.is_none() => break,
511                    // No more waiting receivers, so add msg to queue and break out of the loop
512                    None => {
513                        chan.queue.push_back(msg.unwrap());
514                        break;
515                    }
516                    Some((Some(m), signal)) => {
517                        if signal.fire() {
518                            // Was async and a stream, so didn't acquire the message. Wake another
519                            // receiver, and do not yet push the message.
520                            msg.replace(m);
521                            continue;
522                        } else {
523                            // Was async and not a stream, so it did acquire the message. Push the
524                            // message to the queue for it to be received.
525                            chan.queue.push_back(m);
526                            drop(chan);
527                            break;
528                        }
529                    }
530                    Some((None, signal)) => {
531                        drop(chan);
532                        signal.fire();
533                        break; // Was sync, so it has acquired the message
534                    }
535                }
536            }
537
538            Ok(()).into()
539        } else if chan
540            .sending
541            .as_ref()
542            .map(|(cap, _)| chan.queue.len() < *cap)
543            .unwrap_or(true)
544        {
545            chan.queue.push_back(msg);
546            Ok(()).into()
547        } else if should_block {
548            // Only bounded from here on
549            let hook = make_signal(msg);
550            chan.sending.as_mut().unwrap().1.push_back(hook.clone());
551            drop(chan);
552
553            do_block(hook)
554        } else {
555            Err(TrySendTimeoutError::Full(msg)).into()
556        }
557    }
558
559    fn send_sync(
560        &self,
561        msg: T,
562        block: Option<Option<Instant>>,
563    ) -> Result<(), TrySendTimeoutError<T>> {
564        self.send(
565            // msg
566            msg,
567            // should_block
568            block.is_some(),
569            // make_signal
570            |msg| Hook::slot(Some(msg), SyncSignal::default()),
571            // do_block
572            |hook| {
573                if let Some(deadline) = block.unwrap() {
574                    hook.wait_deadline_send(&self.disconnected, deadline)
575                        .or_else(|timed_out| {
576                            if timed_out {
577                                // Remove our signal
578                                let hook: Arc<Hook<T, dyn signal::Signal>> = hook.clone();
579                                wait_lock(&self.chan)
580                                    .sending
581                                    .as_mut()
582                                    .unwrap()
583                                    .1
584                                    .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
585                            }
586                            hook.try_take()
587                                .map(|msg| {
588                                    if self.is_disconnected() {
589                                        Err(TrySendTimeoutError::Disconnected(msg))
590                                    } else {
591                                        Err(TrySendTimeoutError::Timeout(msg))
592                                    }
593                                })
594                                .unwrap_or(Ok(()))
595                        })
596                } else {
597                    hook.wait_send(&self.disconnected);
598
599                    match hook.try_take() {
600                        Some(msg) => Err(TrySendTimeoutError::Disconnected(msg)),
601                        None => Ok(()),
602                    }
603                }
604            },
605        )
606    }
607
608    fn recv<S: Signal, R: From<Result<T, TryRecvTimeoutError>>>(
609        &self,
610        should_block: bool,
611        make_signal: impl FnOnce() -> Arc<Hook<T, S>>,
612        do_block: impl FnOnce(Arc<Hook<T, S>>) -> R,
613    ) -> R {
614        let mut chan = wait_lock(&self.chan);
615        chan.pull_pending(true);
616
617        if let Some(msg) = chan.queue.pop_front() {
618            drop(chan);
619            Ok(msg).into()
620        } else if self.is_disconnected() {
621            drop(chan);
622            Err(TryRecvTimeoutError::Disconnected).into()
623        } else if should_block {
624            let hook = make_signal();
625            chan.waiting.push_back(hook.clone());
626            drop(chan);
627
628            do_block(hook)
629        } else {
630            drop(chan);
631            Err(TryRecvTimeoutError::Empty).into()
632        }
633    }
634
635    fn recv_sync(&self, block: Option<Option<Instant>>) -> Result<T, TryRecvTimeoutError> {
636        self.recv(
637            // should_block
638            block.is_some(),
639            // make_signal
640            || Hook::slot(None, SyncSignal::default()),
641            // do_block
642            |hook| {
643                if let Some(deadline) = block.unwrap() {
644                    hook.wait_deadline_recv(&self.disconnected, deadline)
645                        .or_else(|timed_out| {
646                            if timed_out {
647                                // Remove our signal
648                                let hook: Arc<Hook<T, dyn Signal>> = hook.clone();
649                                wait_lock(&self.chan)
650                                    .waiting
651                                    .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
652                            }
653                            match hook.try_take() {
654                                Some(msg) => Ok(msg),
655                                None => {
656                                    let disconnected = self.is_disconnected(); // Check disconnect *before* msg
657                                    if let Some(msg) = wait_lock(&self.chan).queue.pop_front() {
658                                        Ok(msg)
659                                    } else if disconnected {
660                                        Err(TryRecvTimeoutError::Disconnected)
661                                    } else {
662                                        Err(TryRecvTimeoutError::Timeout)
663                                    }
664                                }
665                            }
666                        })
667                } else {
668                    hook.wait_recv(&self.disconnected)
669                        .or_else(|| wait_lock(&self.chan).queue.pop_front())
670                        .ok_or(TryRecvTimeoutError::Disconnected)
671                }
672            },
673        )
674    }
675
676    /// Disconnect anything listening on this channel (this will not prevent receivers receiving
677    /// msgs that have already been sent)
678    fn disconnect_all(&self) {
679        self.disconnected.store(true, Ordering::Relaxed);
680
681        let mut chan = wait_lock(&self.chan);
682        chan.pull_pending(false);
683        if let Some((_, sending)) = chan.sending.as_ref() {
684            sending.iter().for_each(|hook| {
685                hook.signal().fire();
686            })
687        }
688        chan.waiting.iter().for_each(|hook| {
689            hook.signal().fire();
690        });
691    }
692
693    fn is_disconnected(&self) -> bool {
694        self.disconnected.load(Ordering::SeqCst)
695    }
696
697    fn is_empty(&self) -> bool {
698        self.len() == 0
699    }
700
701    fn is_full(&self) -> bool {
702        self.capacity()
703            .map(|cap| cap == self.len())
704            .unwrap_or(false)
705    }
706
707    fn len(&self) -> usize {
708        let mut chan = wait_lock(&self.chan);
709        chan.pull_pending(false);
710        chan.queue.len()
711    }
712
713    fn capacity(&self) -> Option<usize> {
714        wait_lock(&self.chan).sending.as_ref().map(|(cap, _)| *cap)
715    }
716
717    fn sender_count(&self) -> usize {
718        self.sender_count.load(Ordering::Relaxed)
719    }
720
721    fn receiver_count(&self) -> usize {
722        self.receiver_count.load(Ordering::Relaxed)
723    }
724}
725
726/// A transmitting end of a channel.
727pub struct Sender<T> {
728    shared: Arc<Shared<T>>,
729}
730
731impl<T> Sender<T> {
732    /// Attempt to send a value into the channel. If the channel is bounded and full, or all
733    /// receivers have been dropped, an error is returned. If the channel associated with this
734    /// sender is unbounded, this method has the same behaviour as [`Sender::send`].
735    pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
736        self.shared.send_sync(msg, None).map_err(|err| match err {
737            TrySendTimeoutError::Full(msg) => TrySendError::Full(msg),
738            TrySendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
739            _ => unreachable!(),
740        })
741    }
742
743    /// Send a value into the channel, returning an error if all receivers have been dropped.
744    /// If the channel is bounded and is full, this method will block until space is available
745    /// or all receivers have been dropped. If the channel is unbounded, this method will not
746    /// block.
747    pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
748        self.shared
749            .send_sync(msg, Some(None))
750            .map_err(|err| match err {
751                TrySendTimeoutError::Disconnected(msg) => SendError(msg),
752                _ => unreachable!(),
753            })
754    }
755
756    fn send_deadline_inner(
757        &self,
758        msg: T,
759        deadline: Option<Instant>,
760    ) -> Result<(), SendTimeoutError<T>> {
761        self.shared
762            .send_sync(msg, Some(deadline))
763            .map_err(|err| match err {
764                TrySendTimeoutError::Disconnected(msg) => SendTimeoutError::Disconnected(msg),
765                TrySendTimeoutError::Timeout(msg) => SendTimeoutError::Timeout(msg),
766                _ => unreachable!(),
767            })
768    }
769
770    /// Send a value into the channel, returning an error if all receivers have been dropped
771    /// or the deadline has passed. If the channel is bounded and is full, this method will
772    /// block until space is available, the deadline is reached, or all receivers have been
773    /// dropped.
774    pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
775        self.send_deadline_inner(msg, Some(deadline))
776    }
777
778    /// Send a value into the channel, returning an error if all receivers have been dropped
779    /// or the timeout has expired. If the channel is bounded and is full, this method will
780    /// block until space is available, the timeout has expired, or all receivers have been
781    /// dropped.
782    pub fn send_timeout(&self, msg: T, dur: Duration) -> Result<(), SendTimeoutError<T>> {
783        self.send_deadline_inner(msg, Instant::now().checked_add(dur))
784    }
785
786    /// Returns true if all receivers for this channel have been dropped.
787    pub fn is_disconnected(&self) -> bool {
788        self.shared.is_disconnected()
789    }
790
791    /// Returns true if the channel is empty.
792    /// Note: Zero-capacity channels are always empty.
793    pub fn is_empty(&self) -> bool {
794        self.shared.is_empty()
795    }
796
797    /// Returns true if the channel is full.
798    /// Note: Zero-capacity channels are always full.
799    pub fn is_full(&self) -> bool {
800        self.shared.is_full()
801    }
802
803    /// Returns the number of messages in the channel
804    pub fn len(&self) -> usize {
805        self.shared.len()
806    }
807
808    /// If the channel is bounded, returns its capacity.
809    pub fn capacity(&self) -> Option<usize> {
810        self.shared.capacity()
811    }
812
813    /// Get the number of senders that currently exist, including this one.
814    pub fn sender_count(&self) -> usize {
815        self.shared.sender_count()
816    }
817
818    /// Get the number of receivers that currently exist.
819    ///
820    /// Note that this method makes no guarantees that a subsequent send will succeed; it's
821    /// possible that between `receiver_count()` being called and a `send()`, all open receivers
822    /// could drop.
823    pub fn receiver_count(&self) -> usize {
824        self.shared.receiver_count()
825    }
826
827    /// Creates a [`WeakSender`] that does not keep the channel open.
828    ///
829    /// The channel is closed once all `Sender`s are dropped, even if there
830    /// are still active `WeakSender`s.
831    pub fn downgrade(&self) -> WeakSender<T> {
832        WeakSender {
833            shared: Arc::downgrade(&self.shared),
834        }
835    }
836
837    /// Returns whether the senders are belong to the same channel.
838    pub fn same_channel(&self, other: &Sender<T>) -> bool {
839        Arc::ptr_eq(&self.shared, &other.shared)
840    }
841}
842
843impl<T> Clone for Sender<T> {
844    /// Clone this sender. [`Sender`] acts as a handle to the ending a channel. Remaining channel
845    /// contents will only be cleaned up when all senders and the receiver have been dropped.
846    fn clone(&self) -> Self {
847        self.shared.sender_count.fetch_add(1, Ordering::Relaxed);
848        Self {
849            shared: self.shared.clone(),
850        }
851    }
852}
853
854impl<T> fmt::Debug for Sender<T> {
855    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
856        f.debug_struct("Sender").finish()
857    }
858}
859
860impl<T> Drop for Sender<T> {
861    fn drop(&mut self) {
862        // Notify receivers that all senders have been dropped if the number of senders drops to 0.
863        if self.shared.sender_count.fetch_sub(1, Ordering::Relaxed) == 1 {
864            self.shared.disconnect_all();
865        }
866    }
867}
868
869/// A sender that does not prevent the channel from being closed.
870///
871/// Weak senders do not count towards the number of active senders on the channel. As soon as
872/// all normal [`Sender`]s are dropped, the channel is closed, even if there is still a
873/// `WeakSender`.
874///
875/// To send messages, a `WeakSender` must first be upgraded to a `Sender` using the [`WeakSender::upgrade`]
876/// method.
877pub struct WeakSender<T> {
878    shared: Weak<Shared<T>>,
879}
880
881impl<T> WeakSender<T> {
882    /// Tries to upgrade the `WeakSender` to a [`Sender`], in order to send messages.
883    ///
884    /// Returns `None` if the channel was closed already. Note that a `Some` return value
885    /// does not guarantee that the channel is still open.
886    pub fn upgrade(&self) -> Option<Sender<T>> {
887        self.shared
888            .upgrade()
889            // check that there are still live senders
890            .filter(|shared| {
891                shared
892                    .sender_count
893                    .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |count| {
894                        if count == 0 {
895                            // all senders are closed already -> don't increase the sender count
896                            None
897                        } else {
898                            // there is still at least one active sender
899                            Some(count + 1)
900                        }
901                    })
902                    .is_ok()
903            })
904            .map(|shared| Sender { shared })
905    }
906}
907
908impl<T> fmt::Debug for WeakSender<T> {
909    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
910        f.debug_struct("WeakSender").finish()
911    }
912}
913
914impl<T> Clone for WeakSender<T> {
915    /// Clones this [`WeakSender`].
916    fn clone(&self) -> Self {
917        Self {
918            shared: self.shared.clone(),
919        }
920    }
921}
922
923/// The receiving end of a channel.
924///
925/// Note: Cloning the receiver *does not* turn this channel into a broadcast channel.
926/// Each message will only be received by a single receiver. This is useful for
927/// implementing work stealing for concurrent programs.
928pub struct Receiver<T> {
929    shared: Arc<Shared<T>>,
930}
931
932impl<T> Receiver<T> {
933    /// Attempt to fetch an incoming value from the channel associated with this receiver,
934    /// returning an error if the channel is empty or if all senders have been dropped.
935    pub fn try_recv(&self) -> Result<T, TryRecvError> {
936        self.shared.recv_sync(None).map_err(|err| match err {
937            TryRecvTimeoutError::Disconnected => TryRecvError::Disconnected,
938            TryRecvTimeoutError::Empty => TryRecvError::Empty,
939            _ => unreachable!(),
940        })
941    }
942
943    /// Wait for an incoming value from the channel associated with this receiver, returning an
944    /// error if all senders have been dropped.
945    pub fn recv(&self) -> Result<T, RecvError> {
946        self.shared.recv_sync(Some(None)).map_err(|err| match err {
947            TryRecvTimeoutError::Disconnected => RecvError::Disconnected,
948            _ => unreachable!(),
949        })
950    }
951
952    fn recv_deadline_inner(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
953        self.shared
954            .recv_sync(Some(deadline))
955            .map_err(|err| match err {
956                TryRecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
957                TryRecvTimeoutError::Timeout => RecvTimeoutError::Timeout,
958                _ => unreachable!(),
959            })
960    }
961
962    /// Wait for an incoming value from the channel associated with this receiver, returning an
963    /// error if all senders have been dropped or the deadline has passed.
964    pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
965        self.recv_deadline_inner(Some(deadline))
966    }
967
968    /// Wait for an incoming value from the channel associated with this receiver, returning an
969    /// error if all senders have been dropped or the timeout has expired.
970    pub fn recv_timeout(&self, dur: Duration) -> Result<T, RecvTimeoutError> {
971        self.recv_deadline_inner(Instant::now().checked_add(dur))
972    }
973
974    /// Create a blocking iterator over the values received on the channel that finishes iteration
975    /// when all senders have been dropped.
976    ///
977    /// You can also create a self-owned iterator with [`Receiver::into_iter`].
978    pub fn iter(&self) -> Iter<'_, T> {
979        Iter { receiver: self }
980    }
981
982    /// A non-blocking iterator over the values received on the channel that finishes iteration
983    /// when all senders have been dropped or the channel is empty.
984    pub fn try_iter(&self) -> TryIter<'_, T> {
985        TryIter { receiver: self }
986    }
987
988    /// Take all msgs currently sitting in the channel and produce an iterator over them. Unlike
989    /// `try_iter`, the iterator will not attempt to fetch any more values from the channel once
990    /// the function has been called.
991    pub fn drain(&self) -> Drain<'_, T> {
992        let mut chan = wait_lock(&self.shared.chan);
993        chan.pull_pending(false);
994        let queue = std::mem::take(&mut chan.queue);
995
996        Drain {
997            queue,
998            _phantom: PhantomData,
999        }
1000    }
1001
1002    /// Returns true if all senders for this channel have been dropped.
1003    pub fn is_disconnected(&self) -> bool {
1004        self.shared.is_disconnected()
1005    }
1006
1007    /// Returns true if the channel is empty.
1008    /// Note: Zero-capacity channels are always empty.
1009    pub fn is_empty(&self) -> bool {
1010        self.shared.is_empty()
1011    }
1012
1013    /// Returns true if the channel is full.
1014    /// Note: Zero-capacity channels are always full.
1015    pub fn is_full(&self) -> bool {
1016        self.shared.is_full()
1017    }
1018
1019    /// Returns the number of messages in the channel.
1020    pub fn len(&self) -> usize {
1021        self.shared.len()
1022    }
1023
1024    /// If the channel is bounded, returns its capacity.
1025    pub fn capacity(&self) -> Option<usize> {
1026        self.shared.capacity()
1027    }
1028
1029    /// Get the number of senders that currently exist.
1030    pub fn sender_count(&self) -> usize {
1031        self.shared.sender_count()
1032    }
1033
1034    /// Get the number of receivers that currently exist, including this one.
1035    pub fn receiver_count(&self) -> usize {
1036        self.shared.receiver_count()
1037    }
1038
1039    /// Returns whether the receivers are belong to the same channel.
1040    pub fn same_channel(&self, other: &Receiver<T>) -> bool {
1041        Arc::ptr_eq(&self.shared, &other.shared)
1042    }
1043}
1044
1045impl<T> Clone for Receiver<T> {
1046    /// Clone this receiver. [`Receiver`] acts as a handle to the ending a channel. Remaining
1047    /// channel contents will only be cleaned up when all senders and the receiver have been
1048    /// dropped.
1049    ///
1050    /// Note: Cloning the receiver *does not* turn this channel into a broadcast channel.
1051    /// Each message will only be received by a single receiver. This is useful for
1052    /// implementing work stealing for concurrent programs.
1053    fn clone(&self) -> Self {
1054        self.shared.receiver_count.fetch_add(1, Ordering::Relaxed);
1055        Self {
1056            shared: self.shared.clone(),
1057        }
1058    }
1059}
1060
1061impl<T> fmt::Debug for Receiver<T> {
1062    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1063        f.debug_struct("Receiver").finish()
1064    }
1065}
1066
1067impl<T> Drop for Receiver<T> {
1068    fn drop(&mut self) {
1069        // Notify senders that all receivers have been dropped if the number of receivers drops
1070        // to 0.
1071        if self.shared.receiver_count.fetch_sub(1, Ordering::Relaxed) == 1 {
1072            self.shared.disconnect_all();
1073        }
1074    }
1075}
1076
1077/// This exists as a shorthand for [`Receiver::iter`].
1078impl<'a, T> IntoIterator for &'a Receiver<T> {
1079    type Item = T;
1080    type IntoIter = Iter<'a, T>;
1081
1082    fn into_iter(self) -> Self::IntoIter {
1083        Iter { receiver: self }
1084    }
1085}
1086
1087impl<T> IntoIterator for Receiver<T> {
1088    type Item = T;
1089    type IntoIter = IntoIter<T>;
1090
1091    /// Creates a self-owned but semantically equivalent alternative to [`Receiver::iter`].
1092    fn into_iter(self) -> Self::IntoIter {
1093        IntoIter { receiver: self }
1094    }
1095}
1096
1097/// An iterator over the msgs received from a channel.
1098pub struct Iter<'a, T> {
1099    receiver: &'a Receiver<T>,
1100}
1101
1102impl<'a, T> fmt::Debug for Iter<'a, T> {
1103    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1104        f.debug_struct("Iter")
1105            .field("receiver", &self.receiver)
1106            .finish()
1107    }
1108}
1109
1110impl<'a, T> Iterator for Iter<'a, T> {
1111    type Item = T;
1112
1113    fn next(&mut self) -> Option<Self::Item> {
1114        self.receiver.recv().ok()
1115    }
1116}
1117
1118/// An non-blocking iterator over the msgs received from a channel.
1119pub struct TryIter<'a, T> {
1120    receiver: &'a Receiver<T>,
1121}
1122
1123impl<'a, T> fmt::Debug for TryIter<'a, T> {
1124    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1125        f.debug_struct("TryIter")
1126            .field("receiver", &self.receiver)
1127            .finish()
1128    }
1129}
1130
1131impl<'a, T> Iterator for TryIter<'a, T> {
1132    type Item = T;
1133
1134    fn next(&mut self) -> Option<Self::Item> {
1135        self.receiver.try_recv().ok()
1136    }
1137}
1138
1139/// An fixed-sized iterator over the msgs drained from a channel.
1140#[derive(Debug)]
1141pub struct Drain<'a, T> {
1142    queue: VecDeque<T>,
1143    /// A phantom field used to constrain the lifetime of this iterator. We do this because the
1144    /// implementation may change and we don't want to unintentionally constrain it. Removing this
1145    /// lifetime later is a possibility.
1146    _phantom: PhantomData<&'a ()>,
1147}
1148
1149impl<'a, T> Iterator for Drain<'a, T> {
1150    type Item = T;
1151
1152    fn next(&mut self) -> Option<Self::Item> {
1153        self.queue.pop_front()
1154    }
1155}
1156
1157impl<'a, T> ExactSizeIterator for Drain<'a, T> {
1158    fn len(&self) -> usize {
1159        self.queue.len()
1160    }
1161}
1162
1163/// An owned iterator over the msgs received from a channel.
1164pub struct IntoIter<T> {
1165    receiver: Receiver<T>,
1166}
1167
1168impl<T> fmt::Debug for IntoIter<T> {
1169    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1170        f.debug_struct("IntoIter")
1171            .field("receiver", &self.receiver)
1172            .finish()
1173    }
1174}
1175
1176impl<T> Iterator for IntoIter<T> {
1177    type Item = T;
1178
1179    fn next(&mut self) -> Option<Self::Item> {
1180        self.receiver.recv().ok()
1181    }
1182}
1183
1184/// Create a channel with no maximum capacity.
1185///
1186/// Create an unbounded channel with a [`Sender`] and [`Receiver`] connected to each end respectively. Values sent in
1187/// one end of the channel will be received on the other end. The channel is thread-safe, and both [`Sender`] and
1188/// [`Receiver`] may be sent to or shared between threads as necessary. In addition, both [`Sender`] and [`Receiver`]
1189/// may be cloned.
1190///
1191/// # Examples
1192/// ```
1193/// let (tx, rx) = flume::unbounded();
1194///
1195/// tx.send(42).unwrap();
1196/// assert_eq!(rx.recv().unwrap(), 42);
1197/// ```
1198pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
1199    let shared = Arc::new(Shared::new(None));
1200    (
1201        Sender {
1202            shared: shared.clone(),
1203        },
1204        Receiver { shared },
1205    )
1206}
1207
1208/// Create a channel with a maximum capacity.
1209///
1210/// Create a bounded channel with a [`Sender`] and [`Receiver`] connected to each end respectively. Values sent in one
1211/// end of the channel will be received on the other end. The channel is thread-safe, and both [`Sender`] and
1212/// [`Receiver`] may be sent to or shared between threads as necessary. In addition, both [`Sender`] and [`Receiver`]
1213/// may be cloned.
1214///
1215/// Unlike an [`unbounded`] channel, if there is no space left for new messages, calls to
1216/// [`Sender::send`] will block (unblocking once a receiver has made space). If blocking behaviour
1217/// is not desired, [`Sender::try_send`] may be used.
1218///
1219/// Like `std::sync::mpsc`, `flume` supports 'rendezvous' channels. A bounded queue with a maximum capacity of zero
1220/// will block senders until a receiver is available to take the value. You can imagine a rendezvous channel as a
1221/// ['Glienicke Bridge'](https://en.wikipedia.org/wiki/Glienicke_Bridge)-style location at which senders and receivers
1222/// perform a handshake and transfer ownership of a value.
1223///
1224/// # Examples
1225/// ```
1226/// let (tx, rx) = flume::bounded(32);
1227///
1228/// for i in 1..33 {
1229///     tx.send(i).unwrap();
1230/// }
1231/// assert!(tx.try_send(33).is_err());
1232///
1233/// assert_eq!(rx.try_iter().sum::<u32>(), (1..33).sum());
1234/// ```
1235pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
1236    let shared = Arc::new(Shared::new(Some(cap)));
1237    (
1238        Sender {
1239            shared: shared.clone(),
1240        },
1241        Receiver { shared },
1242    )
1243}