1#![cfg_attr(docsrs, feature(doc_cfg))]
29#![deny(missing_docs)]
30
31#[cfg(feature = "async")]
32pub mod r#async;
33#[cfg(feature = "select")]
34pub mod select;
35
36mod signal;
37
38#[cfg(feature = "select")]
40pub use select::Selector;
41
42use crate::signal::{Signal, SyncSignal};
43#[cfg(feature = "spin")]
44use spin1::{Mutex as Spinlock, MutexGuard as SpinlockGuard};
45use std::fmt::Formatter;
46use std::{
47 collections::VecDeque,
48 fmt,
49 marker::PhantomData,
50 sync::{
51 atomic::{AtomicBool, AtomicUsize, Ordering},
52 Arc, Weak,
53 },
54 thread,
55 time::{Duration, Instant},
56};
57
58#[derive(Copy, Clone, PartialEq, Eq)]
61pub struct SendError<T>(pub T);
62
63impl<T> SendError<T> {
64 pub fn into_inner(self) -> T {
66 self.0
67 }
68}
69
70impl<T> fmt::Debug for SendError<T> {
71 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72 "SendError(..)".fmt(f)
73 }
74}
75
76impl<T> fmt::Display for SendError<T> {
77 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78 "sending on a closed channel".fmt(f)
79 }
80}
81
82impl<T> std::error::Error for SendError<T> {}
83
84#[derive(Copy, Clone, PartialEq, Eq)]
87pub enum TrySendError<T> {
88 Full(T),
90 Disconnected(T),
92}
93
94impl<T> TrySendError<T> {
95 pub fn into_inner(self) -> T {
97 match self {
98 Self::Full(msg) | Self::Disconnected(msg) => msg,
99 }
100 }
101}
102
103impl<T> fmt::Debug for TrySendError<T> {
104 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105 match *self {
106 TrySendError::Full(..) => "Full(..)".fmt(f),
107 TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
108 }
109 }
110}
111
112impl<T> fmt::Display for TrySendError<T> {
113 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114 match self {
115 TrySendError::Full(..) => "sending on a full channel".fmt(f),
116 TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f),
117 }
118 }
119}
120
121impl<T> std::error::Error for TrySendError<T> {}
122
123impl<T> From<SendError<T>> for TrySendError<T> {
124 fn from(err: SendError<T>) -> Self {
125 match err {
126 SendError(item) => Self::Disconnected(item),
127 }
128 }
129}
130
131#[derive(Copy, Clone, PartialEq, Eq)]
134pub enum SendTimeoutError<T> {
135 Timeout(T),
137 Disconnected(T),
139}
140
141impl<T> SendTimeoutError<T> {
142 pub fn into_inner(self) -> T {
144 match self {
145 Self::Timeout(msg) | Self::Disconnected(msg) => msg,
146 }
147 }
148}
149
150impl<T> fmt::Debug for SendTimeoutError<T> {
151 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
152 "SendTimeoutError(..)".fmt(f)
153 }
154}
155
156impl<T> fmt::Display for SendTimeoutError<T> {
157 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
158 match self {
159 SendTimeoutError::Timeout(..) => "timed out sending on a full channel".fmt(f),
160 SendTimeoutError::Disconnected(..) => "sending on a closed channel".fmt(f),
161 }
162 }
163}
164
165impl<T> std::error::Error for SendTimeoutError<T> {}
166
167impl<T> From<SendError<T>> for SendTimeoutError<T> {
168 fn from(err: SendError<T>) -> Self {
169 match err {
170 SendError(item) => Self::Disconnected(item),
171 }
172 }
173}
174
175enum TrySendTimeoutError<T> {
176 Full(T),
177 Disconnected(T),
178 Timeout(T),
179}
180
181#[derive(Copy, Clone, Debug, PartialEq, Eq)]
184pub enum RecvError {
185 Disconnected,
187}
188
189impl fmt::Display for RecvError {
190 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191 match self {
192 RecvError::Disconnected => "receiving on a closed channel".fmt(f),
193 }
194 }
195}
196
197impl std::error::Error for RecvError {}
198
199#[derive(Copy, Clone, Debug, PartialEq, Eq)]
203pub enum TryRecvError {
204 Empty,
206 Disconnected,
208}
209
210impl fmt::Display for TryRecvError {
211 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212 match self {
213 TryRecvError::Empty => "receiving on an empty channel".fmt(f),
214 TryRecvError::Disconnected => "channel is empty and closed".fmt(f),
215 }
216 }
217}
218
219impl std::error::Error for TryRecvError {}
220
221impl From<RecvError> for TryRecvError {
222 fn from(err: RecvError) -> Self {
223 match err {
224 RecvError::Disconnected => Self::Disconnected,
225 }
226 }
227}
228
229#[derive(Copy, Clone, Debug, PartialEq, Eq)]
233pub enum RecvTimeoutError {
234 Timeout,
236 Disconnected,
238}
239
240impl fmt::Display for RecvTimeoutError {
241 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
242 match self {
243 RecvTimeoutError::Timeout => "timed out waiting on a channel".fmt(f),
244 RecvTimeoutError::Disconnected => "channel is empty and closed".fmt(f),
245 }
246 }
247}
248
249impl std::error::Error for RecvTimeoutError {}
250
251impl From<RecvError> for RecvTimeoutError {
252 fn from(err: RecvError) -> Self {
253 match err {
254 RecvError::Disconnected => Self::Disconnected,
255 }
256 }
257}
258
259enum TryRecvTimeoutError {
260 Empty,
261 Timeout,
262 Disconnected,
263}
264
265#[cfg(feature = "spin")]
267struct Hook<T, S: ?Sized>(Option<Spinlock<Option<T>>>, S);
268
269#[cfg(not(feature = "spin"))]
270struct Hook<T, S: ?Sized>(Option<Mutex<Option<T>>>, S);
271
272#[cfg(feature = "spin")]
273impl<T, S: ?Sized + Signal> Hook<T, S> {
274 pub fn slot(msg: Option<T>, signal: S) -> Arc<Self>
275 where
276 S: Sized,
277 {
278 Arc::new(Self(Some(Spinlock::new(msg)), signal))
279 }
280
281 fn lock(&self) -> Option<SpinlockGuard<'_, Option<T>>> {
282 self.0.as_ref().map(|s| s.lock())
283 }
284}
285
286#[cfg(not(feature = "spin"))]
287impl<T, S: ?Sized + Signal> Hook<T, S> {
288 pub fn slot(msg: Option<T>, signal: S) -> Arc<Self>
289 where
290 S: Sized,
291 {
292 Arc::new(Self(Some(Mutex::new(msg)), signal))
293 }
294
295 fn lock(&self) -> Option<MutexGuard<'_, Option<T>>> {
296 self.0.as_ref().map(|s| s.lock().unwrap())
297 }
298}
299
300impl<T, S: ?Sized + Signal> Hook<T, S> {
301 pub fn fire_recv(&self) -> (T, &S) {
302 let msg = self.lock().unwrap().take().unwrap();
303 (msg, self.signal())
304 }
305
306 pub fn fire_send(&self, msg: T) -> (Option<T>, &S) {
307 let ret = match self.lock() {
308 Some(mut lock) => {
309 *lock = Some(msg);
310 None
311 }
312 None => Some(msg),
313 };
314 (ret, self.signal())
315 }
316
317 pub fn is_empty(&self) -> bool {
318 self.lock().map(|s| s.is_none()).unwrap_or(true)
319 }
320
321 pub fn try_take(&self) -> Option<T> {
322 self.lock().unwrap().take()
323 }
324
325 pub fn trigger(signal: S) -> Arc<Self>
326 where
327 S: Sized,
328 {
329 Arc::new(Self(None, signal))
330 }
331
332 pub fn signal(&self) -> &S {
333 &self.1
334 }
335
336 pub fn fire_nothing(&self) -> bool {
337 self.signal().fire()
338 }
339}
340
341impl<T> Hook<T, SyncSignal> {
342 pub fn wait_recv(&self, abort: &AtomicBool) -> Option<T> {
343 loop {
344 let disconnected = abort.load(Ordering::SeqCst); let msg = self.lock().unwrap().take();
346 if let Some(msg) = msg {
347 break Some(msg);
348 } else if disconnected {
349 break None;
350 } else {
351 self.signal().wait()
352 }
353 }
354 }
355
356 pub fn wait_deadline_recv(&self, abort: &AtomicBool, deadline: Instant) -> Result<T, bool> {
358 loop {
359 let disconnected = abort.load(Ordering::SeqCst); let msg = self.lock().unwrap().take();
361 if let Some(msg) = msg {
362 break Ok(msg);
363 } else if disconnected {
364 break Err(false);
365 } else if let Some(dur) = deadline.checked_duration_since(Instant::now()) {
366 self.signal().wait_timeout(dur);
367 } else {
368 break Err(true);
369 }
370 }
371 }
372
373 pub fn wait_send(&self, abort: &AtomicBool) {
374 loop {
375 let disconnected = abort.load(Ordering::SeqCst); if disconnected || self.lock().unwrap().is_none() {
377 break;
378 }
379
380 self.signal().wait();
381 }
382 }
383
384 pub fn wait_deadline_send(&self, abort: &AtomicBool, deadline: Instant) -> Result<(), bool> {
386 loop {
387 let disconnected = abort.load(Ordering::SeqCst); if self.lock().unwrap().is_none() {
389 break Ok(());
390 } else if disconnected {
391 break Err(false);
392 } else if let Some(dur) = deadline.checked_duration_since(Instant::now()) {
393 self.signal().wait_timeout(dur);
394 } else {
395 break Err(true);
396 }
397 }
398 }
399}
400
401#[cfg(feature = "spin")]
402#[inline]
403fn wait_lock<T>(lock: &Spinlock<T>) -> SpinlockGuard<'_, T> {
404 #[cfg(any(target_family = "unix", target_family = "windows"))]
408 {
409 let mut i = 4;
410 loop {
411 for _ in 0..10 {
412 if let Some(guard) = lock.try_lock() {
413 return guard;
414 }
415 thread::yield_now();
416 }
417 thread::sleep(Duration::from_nanos(1 << i.min(20)));
419 i += 1;
420 }
421 }
422 #[cfg(not(any(target_family = "unix", target_family = "windows")))]
423 lock.lock()
424}
425
426#[cfg(not(feature = "spin"))]
427#[inline]
428fn wait_lock<'a, T>(lock: &'a Mutex<T>) -> MutexGuard<'a, T> {
429 lock.lock().unwrap()
430}
431
432#[cfg(not(feature = "spin"))]
433use std::sync::{Mutex, MutexGuard};
434
435#[cfg(feature = "spin")]
436type ChanLock<T> = Spinlock<T>;
437#[cfg(not(feature = "spin"))]
438type ChanLock<T> = Mutex<T>;
439
440type SignalVec<T> = VecDeque<Arc<Hook<T, dyn signal::Signal>>>;
441struct Chan<T> {
442 sending: Option<(usize, SignalVec<T>)>,
443 queue: VecDeque<T>,
444 waiting: SignalVec<T>,
445}
446
447impl<T> Chan<T> {
448 fn pull_pending(&mut self, pull_extra: bool) {
449 if let Some((cap, sending)) = &mut self.sending {
450 let effective_cap = *cap + pull_extra as usize;
451
452 while self.queue.len() < effective_cap {
453 if let Some(s) = sending.pop_front() {
454 let (msg, signal) = s.fire_recv();
455 signal.fire();
456 self.queue.push_back(msg);
457 } else {
458 break;
459 }
460 }
461 }
462 }
463
464 fn try_wake_receiver_if_pending(&mut self) {
465 if !self.queue.is_empty() {
466 while Some(false) == self.waiting.pop_front().map(|s| s.fire_nothing()) {}
467 }
468 }
469}
470
471struct Shared<T> {
472 chan: ChanLock<Chan<T>>,
473 disconnected: AtomicBool,
474 sender_count: AtomicUsize,
475 receiver_count: AtomicUsize,
476}
477
478impl<T> Shared<T> {
479 fn new(cap: Option<usize>) -> Self {
480 Self {
481 chan: ChanLock::new(Chan {
482 sending: cap.map(|cap| (cap, VecDeque::new())),
483 queue: VecDeque::new(),
484 waiting: VecDeque::new(),
485 }),
486 disconnected: AtomicBool::new(false),
487 sender_count: AtomicUsize::new(1),
488 receiver_count: AtomicUsize::new(1),
489 }
490 }
491
492 fn send<S: Signal, R: From<Result<(), TrySendTimeoutError<T>>>>(
493 &self,
494 msg: T,
495 should_block: bool,
496 make_signal: impl FnOnce(T) -> Arc<Hook<T, S>>,
497 do_block: impl FnOnce(Arc<Hook<T, S>>) -> R,
498 ) -> R {
499 let mut chan = wait_lock(&self.chan);
500
501 if self.is_disconnected() {
502 Err(TrySendTimeoutError::Disconnected(msg)).into()
503 } else if !chan.waiting.is_empty() {
504 let mut msg = Some(msg);
505
506 loop {
507 let slot = chan.waiting.pop_front();
508 match slot.as_ref().map(|r| r.fire_send(msg.take().unwrap())) {
509 None if msg.is_none() => break,
511 None => {
513 chan.queue.push_back(msg.unwrap());
514 break;
515 }
516 Some((Some(m), signal)) => {
517 if signal.fire() {
518 msg.replace(m);
521 continue;
522 } else {
523 chan.queue.push_back(m);
526 drop(chan);
527 break;
528 }
529 }
530 Some((None, signal)) => {
531 drop(chan);
532 signal.fire();
533 break; }
535 }
536 }
537
538 Ok(()).into()
539 } else if chan
540 .sending
541 .as_ref()
542 .map(|(cap, _)| chan.queue.len() < *cap)
543 .unwrap_or(true)
544 {
545 chan.queue.push_back(msg);
546 Ok(()).into()
547 } else if should_block {
548 let hook = make_signal(msg);
550 chan.sending.as_mut().unwrap().1.push_back(hook.clone());
551 drop(chan);
552
553 do_block(hook)
554 } else {
555 Err(TrySendTimeoutError::Full(msg)).into()
556 }
557 }
558
559 fn send_sync(
560 &self,
561 msg: T,
562 block: Option<Option<Instant>>,
563 ) -> Result<(), TrySendTimeoutError<T>> {
564 self.send(
565 msg,
567 block.is_some(),
569 |msg| Hook::slot(Some(msg), SyncSignal::default()),
571 |hook| {
573 if let Some(deadline) = block.unwrap() {
574 hook.wait_deadline_send(&self.disconnected, deadline)
575 .or_else(|timed_out| {
576 if timed_out {
577 let hook: Arc<Hook<T, dyn signal::Signal>> = hook.clone();
579 wait_lock(&self.chan)
580 .sending
581 .as_mut()
582 .unwrap()
583 .1
584 .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
585 }
586 hook.try_take()
587 .map(|msg| {
588 if self.is_disconnected() {
589 Err(TrySendTimeoutError::Disconnected(msg))
590 } else {
591 Err(TrySendTimeoutError::Timeout(msg))
592 }
593 })
594 .unwrap_or(Ok(()))
595 })
596 } else {
597 hook.wait_send(&self.disconnected);
598
599 match hook.try_take() {
600 Some(msg) => Err(TrySendTimeoutError::Disconnected(msg)),
601 None => Ok(()),
602 }
603 }
604 },
605 )
606 }
607
608 fn recv<S: Signal, R: From<Result<T, TryRecvTimeoutError>>>(
609 &self,
610 should_block: bool,
611 make_signal: impl FnOnce() -> Arc<Hook<T, S>>,
612 do_block: impl FnOnce(Arc<Hook<T, S>>) -> R,
613 ) -> R {
614 let mut chan = wait_lock(&self.chan);
615 chan.pull_pending(true);
616
617 if let Some(msg) = chan.queue.pop_front() {
618 drop(chan);
619 Ok(msg).into()
620 } else if self.is_disconnected() {
621 drop(chan);
622 Err(TryRecvTimeoutError::Disconnected).into()
623 } else if should_block {
624 let hook = make_signal();
625 chan.waiting.push_back(hook.clone());
626 drop(chan);
627
628 do_block(hook)
629 } else {
630 drop(chan);
631 Err(TryRecvTimeoutError::Empty).into()
632 }
633 }
634
635 fn recv_sync(&self, block: Option<Option<Instant>>) -> Result<T, TryRecvTimeoutError> {
636 self.recv(
637 block.is_some(),
639 || Hook::slot(None, SyncSignal::default()),
641 |hook| {
643 if let Some(deadline) = block.unwrap() {
644 hook.wait_deadline_recv(&self.disconnected, deadline)
645 .or_else(|timed_out| {
646 if timed_out {
647 let hook: Arc<Hook<T, dyn Signal>> = hook.clone();
649 wait_lock(&self.chan)
650 .waiting
651 .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
652 }
653 match hook.try_take() {
654 Some(msg) => Ok(msg),
655 None => {
656 let disconnected = self.is_disconnected(); if let Some(msg) = wait_lock(&self.chan).queue.pop_front() {
658 Ok(msg)
659 } else if disconnected {
660 Err(TryRecvTimeoutError::Disconnected)
661 } else {
662 Err(TryRecvTimeoutError::Timeout)
663 }
664 }
665 }
666 })
667 } else {
668 hook.wait_recv(&self.disconnected)
669 .or_else(|| wait_lock(&self.chan).queue.pop_front())
670 .ok_or(TryRecvTimeoutError::Disconnected)
671 }
672 },
673 )
674 }
675
676 fn disconnect_all(&self) {
679 self.disconnected.store(true, Ordering::Relaxed);
680
681 let mut chan = wait_lock(&self.chan);
682 chan.pull_pending(false);
683 if let Some((_, sending)) = chan.sending.as_ref() {
684 sending.iter().for_each(|hook| {
685 hook.signal().fire();
686 })
687 }
688 chan.waiting.iter().for_each(|hook| {
689 hook.signal().fire();
690 });
691 }
692
693 fn is_disconnected(&self) -> bool {
694 self.disconnected.load(Ordering::SeqCst)
695 }
696
697 fn is_empty(&self) -> bool {
698 self.len() == 0
699 }
700
701 fn is_full(&self) -> bool {
702 self.capacity()
703 .map(|cap| cap == self.len())
704 .unwrap_or(false)
705 }
706
707 fn len(&self) -> usize {
708 let mut chan = wait_lock(&self.chan);
709 chan.pull_pending(false);
710 chan.queue.len()
711 }
712
713 fn capacity(&self) -> Option<usize> {
714 wait_lock(&self.chan).sending.as_ref().map(|(cap, _)| *cap)
715 }
716
717 fn sender_count(&self) -> usize {
718 self.sender_count.load(Ordering::Relaxed)
719 }
720
721 fn receiver_count(&self) -> usize {
722 self.receiver_count.load(Ordering::Relaxed)
723 }
724}
725
726pub struct Sender<T> {
728 shared: Arc<Shared<T>>,
729}
730
731impl<T> Sender<T> {
732 pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
736 self.shared.send_sync(msg, None).map_err(|err| match err {
737 TrySendTimeoutError::Full(msg) => TrySendError::Full(msg),
738 TrySendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
739 _ => unreachable!(),
740 })
741 }
742
743 pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
748 self.shared
749 .send_sync(msg, Some(None))
750 .map_err(|err| match err {
751 TrySendTimeoutError::Disconnected(msg) => SendError(msg),
752 _ => unreachable!(),
753 })
754 }
755
756 fn send_deadline_inner(
757 &self,
758 msg: T,
759 deadline: Option<Instant>,
760 ) -> Result<(), SendTimeoutError<T>> {
761 self.shared
762 .send_sync(msg, Some(deadline))
763 .map_err(|err| match err {
764 TrySendTimeoutError::Disconnected(msg) => SendTimeoutError::Disconnected(msg),
765 TrySendTimeoutError::Timeout(msg) => SendTimeoutError::Timeout(msg),
766 _ => unreachable!(),
767 })
768 }
769
770 pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
775 self.send_deadline_inner(msg, Some(deadline))
776 }
777
778 pub fn send_timeout(&self, msg: T, dur: Duration) -> Result<(), SendTimeoutError<T>> {
783 self.send_deadline_inner(msg, Instant::now().checked_add(dur))
784 }
785
786 pub fn is_disconnected(&self) -> bool {
788 self.shared.is_disconnected()
789 }
790
791 pub fn is_empty(&self) -> bool {
794 self.shared.is_empty()
795 }
796
797 pub fn is_full(&self) -> bool {
800 self.shared.is_full()
801 }
802
803 pub fn len(&self) -> usize {
805 self.shared.len()
806 }
807
808 pub fn capacity(&self) -> Option<usize> {
810 self.shared.capacity()
811 }
812
813 pub fn sender_count(&self) -> usize {
815 self.shared.sender_count()
816 }
817
818 pub fn receiver_count(&self) -> usize {
824 self.shared.receiver_count()
825 }
826
827 pub fn downgrade(&self) -> WeakSender<T> {
832 WeakSender {
833 shared: Arc::downgrade(&self.shared),
834 }
835 }
836
837 pub fn same_channel(&self, other: &Sender<T>) -> bool {
839 Arc::ptr_eq(&self.shared, &other.shared)
840 }
841}
842
843impl<T> Clone for Sender<T> {
844 fn clone(&self) -> Self {
847 self.shared.sender_count.fetch_add(1, Ordering::Relaxed);
848 Self {
849 shared: self.shared.clone(),
850 }
851 }
852}
853
854impl<T> fmt::Debug for Sender<T> {
855 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
856 f.debug_struct("Sender").finish()
857 }
858}
859
860impl<T> Drop for Sender<T> {
861 fn drop(&mut self) {
862 if self.shared.sender_count.fetch_sub(1, Ordering::Relaxed) == 1 {
864 self.shared.disconnect_all();
865 }
866 }
867}
868
869pub struct WeakSender<T> {
878 shared: Weak<Shared<T>>,
879}
880
881impl<T> WeakSender<T> {
882 pub fn upgrade(&self) -> Option<Sender<T>> {
887 self.shared
888 .upgrade()
889 .filter(|shared| {
891 shared
892 .sender_count
893 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |count| {
894 if count == 0 {
895 None
897 } else {
898 Some(count + 1)
900 }
901 })
902 .is_ok()
903 })
904 .map(|shared| Sender { shared })
905 }
906}
907
908impl<T> fmt::Debug for WeakSender<T> {
909 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
910 f.debug_struct("WeakSender").finish()
911 }
912}
913
914impl<T> Clone for WeakSender<T> {
915 fn clone(&self) -> Self {
917 Self {
918 shared: self.shared.clone(),
919 }
920 }
921}
922
923pub struct Receiver<T> {
929 shared: Arc<Shared<T>>,
930}
931
932impl<T> Receiver<T> {
933 pub fn try_recv(&self) -> Result<T, TryRecvError> {
936 self.shared.recv_sync(None).map_err(|err| match err {
937 TryRecvTimeoutError::Disconnected => TryRecvError::Disconnected,
938 TryRecvTimeoutError::Empty => TryRecvError::Empty,
939 _ => unreachable!(),
940 })
941 }
942
943 pub fn recv(&self) -> Result<T, RecvError> {
946 self.shared.recv_sync(Some(None)).map_err(|err| match err {
947 TryRecvTimeoutError::Disconnected => RecvError::Disconnected,
948 _ => unreachable!(),
949 })
950 }
951
952 fn recv_deadline_inner(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
953 self.shared
954 .recv_sync(Some(deadline))
955 .map_err(|err| match err {
956 TryRecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
957 TryRecvTimeoutError::Timeout => RecvTimeoutError::Timeout,
958 _ => unreachable!(),
959 })
960 }
961
962 pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
965 self.recv_deadline_inner(Some(deadline))
966 }
967
968 pub fn recv_timeout(&self, dur: Duration) -> Result<T, RecvTimeoutError> {
971 self.recv_deadline_inner(Instant::now().checked_add(dur))
972 }
973
974 pub fn iter(&self) -> Iter<'_, T> {
979 Iter { receiver: self }
980 }
981
982 pub fn try_iter(&self) -> TryIter<'_, T> {
985 TryIter { receiver: self }
986 }
987
988 pub fn drain(&self) -> Drain<'_, T> {
992 let mut chan = wait_lock(&self.shared.chan);
993 chan.pull_pending(false);
994 let queue = std::mem::take(&mut chan.queue);
995
996 Drain {
997 queue,
998 _phantom: PhantomData,
999 }
1000 }
1001
1002 pub fn is_disconnected(&self) -> bool {
1004 self.shared.is_disconnected()
1005 }
1006
1007 pub fn is_empty(&self) -> bool {
1010 self.shared.is_empty()
1011 }
1012
1013 pub fn is_full(&self) -> bool {
1016 self.shared.is_full()
1017 }
1018
1019 pub fn len(&self) -> usize {
1021 self.shared.len()
1022 }
1023
1024 pub fn capacity(&self) -> Option<usize> {
1026 self.shared.capacity()
1027 }
1028
1029 pub fn sender_count(&self) -> usize {
1031 self.shared.sender_count()
1032 }
1033
1034 pub fn receiver_count(&self) -> usize {
1036 self.shared.receiver_count()
1037 }
1038
1039 pub fn same_channel(&self, other: &Receiver<T>) -> bool {
1041 Arc::ptr_eq(&self.shared, &other.shared)
1042 }
1043}
1044
1045impl<T> Clone for Receiver<T> {
1046 fn clone(&self) -> Self {
1054 self.shared.receiver_count.fetch_add(1, Ordering::Relaxed);
1055 Self {
1056 shared: self.shared.clone(),
1057 }
1058 }
1059}
1060
1061impl<T> fmt::Debug for Receiver<T> {
1062 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1063 f.debug_struct("Receiver").finish()
1064 }
1065}
1066
1067impl<T> Drop for Receiver<T> {
1068 fn drop(&mut self) {
1069 if self.shared.receiver_count.fetch_sub(1, Ordering::Relaxed) == 1 {
1072 self.shared.disconnect_all();
1073 }
1074 }
1075}
1076
1077impl<'a, T> IntoIterator for &'a Receiver<T> {
1079 type Item = T;
1080 type IntoIter = Iter<'a, T>;
1081
1082 fn into_iter(self) -> Self::IntoIter {
1083 Iter { receiver: self }
1084 }
1085}
1086
1087impl<T> IntoIterator for Receiver<T> {
1088 type Item = T;
1089 type IntoIter = IntoIter<T>;
1090
1091 fn into_iter(self) -> Self::IntoIter {
1093 IntoIter { receiver: self }
1094 }
1095}
1096
1097pub struct Iter<'a, T> {
1099 receiver: &'a Receiver<T>,
1100}
1101
1102impl<'a, T> fmt::Debug for Iter<'a, T> {
1103 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1104 f.debug_struct("Iter")
1105 .field("receiver", &self.receiver)
1106 .finish()
1107 }
1108}
1109
1110impl<'a, T> Iterator for Iter<'a, T> {
1111 type Item = T;
1112
1113 fn next(&mut self) -> Option<Self::Item> {
1114 self.receiver.recv().ok()
1115 }
1116}
1117
1118pub struct TryIter<'a, T> {
1120 receiver: &'a Receiver<T>,
1121}
1122
1123impl<'a, T> fmt::Debug for TryIter<'a, T> {
1124 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1125 f.debug_struct("TryIter")
1126 .field("receiver", &self.receiver)
1127 .finish()
1128 }
1129}
1130
1131impl<'a, T> Iterator for TryIter<'a, T> {
1132 type Item = T;
1133
1134 fn next(&mut self) -> Option<Self::Item> {
1135 self.receiver.try_recv().ok()
1136 }
1137}
1138
1139#[derive(Debug)]
1141pub struct Drain<'a, T> {
1142 queue: VecDeque<T>,
1143 _phantom: PhantomData<&'a ()>,
1147}
1148
1149impl<'a, T> Iterator for Drain<'a, T> {
1150 type Item = T;
1151
1152 fn next(&mut self) -> Option<Self::Item> {
1153 self.queue.pop_front()
1154 }
1155}
1156
1157impl<'a, T> ExactSizeIterator for Drain<'a, T> {
1158 fn len(&self) -> usize {
1159 self.queue.len()
1160 }
1161}
1162
1163pub struct IntoIter<T> {
1165 receiver: Receiver<T>,
1166}
1167
1168impl<T> fmt::Debug for IntoIter<T> {
1169 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1170 f.debug_struct("IntoIter")
1171 .field("receiver", &self.receiver)
1172 .finish()
1173 }
1174}
1175
1176impl<T> Iterator for IntoIter<T> {
1177 type Item = T;
1178
1179 fn next(&mut self) -> Option<Self::Item> {
1180 self.receiver.recv().ok()
1181 }
1182}
1183
1184pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
1199 let shared = Arc::new(Shared::new(None));
1200 (
1201 Sender {
1202 shared: shared.clone(),
1203 },
1204 Receiver { shared },
1205 )
1206}
1207
1208pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
1236 let shared = Arc::new(Shared::new(Some(cap)));
1237 (
1238 Sender {
1239 shared: shared.clone(),
1240 },
1241 Receiver { shared },
1242 )
1243}