1use crate::*;
4use futures_core::{
5 future::FusedFuture,
6 stream::{FusedStream, Stream},
7};
8use futures_sink::Sink;
9use spin1::Mutex as Spinlock;
10use std::fmt::{Debug, Formatter};
11use std::{
12 any::Any,
13 future::Future,
14 ops::Deref,
15 pin::Pin,
16 task::{Context, Poll, Waker},
17};
18
19struct AsyncSignal {
20 waker: Spinlock<Waker>,
21 woken: AtomicBool,
22 stream: bool,
23}
24
25impl AsyncSignal {
26 fn new(cx: &Context, stream: bool) -> Self {
27 AsyncSignal {
28 waker: Spinlock::new(cx.waker().clone()),
29 woken: AtomicBool::new(false),
30 stream,
31 }
32 }
33}
34
35impl Signal for AsyncSignal {
36 fn fire(&self) -> bool {
37 self.woken.store(true, Ordering::SeqCst);
38 self.waker.lock().wake_by_ref();
39 self.stream
40 }
41
42 fn as_any(&self) -> &(dyn Any + 'static) {
43 self
44 }
45 fn as_ptr(&self) -> *const () {
46 self as *const _ as *const ()
47 }
48}
49
50impl<T> Hook<T, AsyncSignal> {
51 fn update_waker(&self, cx_waker: &Waker) -> bool {
54 let mut waker = self.1.waker.lock();
55 let woken = self.1.woken.load(Ordering::SeqCst);
56 if !waker.will_wake(cx_waker) {
57 *waker = cx_waker.clone();
58
59 if woken {
62 cx_waker.wake_by_ref();
63 }
64 }
65 woken
66 }
67}
68
69#[derive(Clone)]
70enum OwnedOrRef<'a, T> {
71 Owned(T),
72 Ref(&'a T),
73}
74
75impl<'a, T> Deref for OwnedOrRef<'a, T> {
76 type Target = T;
77
78 fn deref(&self) -> &T {
79 match self {
80 OwnedOrRef::Owned(arc) => arc,
81 OwnedOrRef::Ref(r) => r,
82 }
83 }
84}
85
86impl<T> Sender<T> {
87 pub fn send_async(&self, item: T) -> SendFut<'_, T> {
94 SendFut {
95 sender: OwnedOrRef::Ref(self),
96 hook: Some(SendState::NotYetSent(item)),
97 }
98 }
99
100 pub fn into_send_async<'a>(self, item: T) -> SendFut<'a, T> {
107 SendFut {
108 sender: OwnedOrRef::Owned(self),
109 hook: Some(SendState::NotYetSent(item)),
110 }
111 }
112
113 pub fn sink(&self) -> SendSink<'_, T> {
119 SendSink(SendFut {
120 sender: OwnedOrRef::Ref(self),
121 hook: None,
122 })
123 }
124
125 pub fn into_sink<'a>(self) -> SendSink<'a, T> {
130 SendSink(SendFut {
131 sender: OwnedOrRef::Owned(self),
132 hook: None,
133 })
134 }
135}
136
137enum SendState<T> {
138 NotYetSent(T),
139 QueuedItem(Arc<Hook<T, AsyncSignal>>),
140}
141
142#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
146pub struct SendFut<'a, T> {
147 sender: OwnedOrRef<'a, Sender<T>>,
148 hook: Option<SendState<T>>,
150}
151
152impl<'a, T> Debug for SendFut<'a, T> {
153 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
154 f.debug_struct("SendFut").finish()
155 }
156}
157
158impl<T> std::marker::Unpin for SendFut<'_, T> {}
159
160impl<'a, T> SendFut<'a, T> {
161 fn reset_hook(&mut self) {
164 if let Some(SendState::QueuedItem(hook)) = self.hook.take() {
165 let hook: Arc<Hook<T, dyn Signal>> = hook;
166 wait_lock(&self.sender.shared.chan)
167 .sending
168 .as_mut()
169 .unwrap()
170 .1
171 .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
172 }
173 }
174
175 pub fn is_disconnected(&self) -> bool {
177 self.sender.is_disconnected()
178 }
179
180 pub fn is_empty(&self) -> bool {
182 self.sender.is_empty()
183 }
184
185 pub fn is_full(&self) -> bool {
187 self.sender.is_full()
188 }
189
190 pub fn len(&self) -> usize {
192 self.sender.len()
193 }
194
195 pub fn capacity(&self) -> Option<usize> {
197 self.sender.capacity()
198 }
199}
200
201impl<'a, T> Drop for SendFut<'a, T> {
202 fn drop(&mut self) {
203 self.reset_hook()
204 }
205}
206
207impl<'a, T> Future for SendFut<'a, T> {
208 type Output = Result<(), SendError<T>>;
209
210 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
211 if let Some(SendState::QueuedItem(hook)) = self.hook.as_ref() {
212 if hook.is_empty() {
213 Poll::Ready(Ok(()))
214 } else if self.sender.shared.is_disconnected() {
215 let item = hook.try_take();
216 self.hook = None;
217 match item {
218 Some(item) => Poll::Ready(Err(SendError(item))),
219 None => Poll::Ready(Ok(())),
220 }
221 } else {
222 hook.update_waker(cx.waker());
223 Poll::Pending
224 }
225 } else if let Some(SendState::NotYetSent(item)) = self.hook.take() {
226 let this = self.get_mut();
227 let (shared, this_hook) = (&this.sender.shared, &mut this.hook);
228
229 shared
230 .send(
231 item,
233 true,
235 |msg| Hook::slot(Some(msg), AsyncSignal::new(cx, false)),
237 |hook| {
239 *this_hook = Some(SendState::QueuedItem(hook));
240 Poll::Pending
241 },
242 )
243 .map(|r| {
244 r.map_err(|err| match err {
245 TrySendTimeoutError::Disconnected(msg) => SendError(msg),
246 _ => unreachable!(),
247 })
248 })
249 } else {
250 Poll::Ready(Ok(()))
252 }
253 }
254}
255
256impl<'a, T> FusedFuture for SendFut<'a, T> {
257 fn is_terminated(&self) -> bool {
258 self.sender.shared.is_disconnected()
259 }
260}
261
262pub struct SendSink<'a, T>(SendFut<'a, T>);
266
267impl<'a, T> SendSink<'a, T> {
268 pub fn sender(&self) -> &Sender<T> {
270 &self.0.sender
271 }
272
273 pub fn is_disconnected(&self) -> bool {
275 self.0.is_disconnected()
276 }
277
278 pub fn is_empty(&self) -> bool {
280 self.0.is_empty()
281 }
282
283 pub fn is_full(&self) -> bool {
285 self.0.is_full()
286 }
287
288 pub fn len(&self) -> usize {
290 self.0.len()
291 }
292
293 pub fn capacity(&self) -> Option<usize> {
295 self.0.capacity()
296 }
297
298 pub fn same_channel(&self, other: &Self) -> bool {
300 self.sender().same_channel(other.sender())
301 }
302}
303
304impl<'a, T> Debug for SendSink<'a, T> {
305 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
306 f.debug_struct("SendSink").finish()
307 }
308}
309
310impl<'a, T> Sink<T> for SendSink<'a, T> {
311 type Error = SendError<T>;
312
313 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
314 Pin::new(&mut self.0).poll(cx)
315 }
316
317 fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
318 self.0.reset_hook();
319 self.0.hook = Some(SendState::NotYetSent(item));
320
321 Ok(())
322 }
323
324 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
325 Pin::new(&mut self.0).poll(cx) }
327
328 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
329 Pin::new(&mut self.0).poll(cx) }
331}
332
333impl<'a, T> Clone for SendSink<'a, T> {
334 fn clone(&self) -> SendSink<'a, T> {
335 SendSink(SendFut {
336 sender: self.0.sender.clone(),
337 hook: None,
338 })
339 }
340}
341
342impl<T> Receiver<T> {
343 pub fn recv_async(&self) -> RecvFut<'_, T> {
346 RecvFut::new(OwnedOrRef::Ref(self))
347 }
348
349 pub fn into_recv_async<'a>(self) -> RecvFut<'a, T> {
353 RecvFut::new(OwnedOrRef::Owned(self))
354 }
355
356 pub fn stream(&self) -> RecvStream<'_, T> {
359 RecvStream(RecvFut::new(OwnedOrRef::Ref(self)))
360 }
361
362 pub fn into_stream<'a>(self) -> RecvStream<'a, T> {
364 RecvStream(RecvFut::new(OwnedOrRef::Owned(self)))
365 }
366}
367
368#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
372pub struct RecvFut<'a, T> {
373 receiver: OwnedOrRef<'a, Receiver<T>>,
374 hook: Option<Arc<Hook<T, AsyncSignal>>>,
375}
376
377impl<'a, T> RecvFut<'a, T> {
378 fn new(receiver: OwnedOrRef<'a, Receiver<T>>) -> Self {
379 Self {
380 receiver,
381 hook: None,
382 }
383 }
384
385 fn reset_hook(&mut self) {
389 if let Some(hook) = self.hook.take() {
390 let hook: Arc<Hook<T, dyn Signal>> = hook;
391 let mut chan = wait_lock(&self.receiver.shared.chan);
392 chan.waiting
394 .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
395 if hook
396 .signal()
397 .as_any()
398 .downcast_ref::<AsyncSignal>()
399 .unwrap()
400 .woken
401 .load(Ordering::SeqCst)
402 {
403 chan.try_wake_receiver_if_pending();
406 }
407 }
408 }
409
410 fn poll_inner(
411 self: Pin<&mut Self>,
412 cx: &mut Context,
413 stream: bool,
414 ) -> Poll<Result<T, RecvError>> {
415 if self.hook.is_some() {
416 match self.receiver.shared.recv_sync(None) {
417 Ok(msg) => return Poll::Ready(Ok(msg)),
418 Err(TryRecvTimeoutError::Disconnected) => {
419 return Poll::Ready(Err(RecvError::Disconnected))
420 }
421 _ => (),
422 }
423
424 let hook = self.hook.as_ref().map(Arc::clone).unwrap();
425 if hook.update_waker(cx.waker()) {
426 wait_lock(&self.receiver.shared.chan)
429 .waiting
430 .push_back(hook);
431 }
432 if self.receiver.shared.is_disconnected() {
435 Poll::Ready(
438 self.receiver
439 .shared
440 .recv_sync(None)
441 .map(Ok)
442 .unwrap_or(Err(RecvError::Disconnected)),
443 )
444 } else {
445 Poll::Pending
446 }
447 } else {
448 let mut_self = self.get_mut();
449 let (shared, this_hook) = (&mut_self.receiver.shared, &mut mut_self.hook);
450
451 shared
452 .recv(
453 true,
455 || Hook::trigger(AsyncSignal::new(cx, stream)),
457 |hook| {
459 *this_hook = Some(hook);
460 Poll::Pending
461 },
462 )
463 .map(|r| {
464 r.map_err(|err| match err {
465 TryRecvTimeoutError::Disconnected => RecvError::Disconnected,
466 _ => unreachable!(),
467 })
468 })
469 }
470 }
471
472 pub fn is_disconnected(&self) -> bool {
474 self.receiver.is_disconnected()
475 }
476
477 pub fn is_empty(&self) -> bool {
479 self.receiver.is_empty()
480 }
481
482 pub fn is_full(&self) -> bool {
484 self.receiver.is_full()
485 }
486
487 pub fn len(&self) -> usize {
489 self.receiver.len()
490 }
491
492 pub fn capacity(&self) -> Option<usize> {
494 self.receiver.capacity()
495 }
496}
497
498impl<'a, T> Debug for RecvFut<'a, T> {
499 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
500 f.debug_struct("RecvFut").finish()
501 }
502}
503
504impl<'a, T> Drop for RecvFut<'a, T> {
505 fn drop(&mut self) {
506 self.reset_hook();
507 }
508}
509
510impl<'a, T> Future for RecvFut<'a, T> {
511 type Output = Result<T, RecvError>;
512
513 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
514 self.poll_inner(cx, false) }
516}
517
518impl<'a, T> FusedFuture for RecvFut<'a, T> {
519 fn is_terminated(&self) -> bool {
520 self.receiver.shared.is_disconnected() && self.receiver.shared.is_empty()
521 }
522}
523
524pub struct RecvStream<'a, T>(RecvFut<'a, T>);
528
529impl<'a, T> RecvStream<'a, T> {
530 pub fn is_disconnected(&self) -> bool {
532 self.0.is_disconnected()
533 }
534
535 pub fn is_empty(&self) -> bool {
537 self.0.is_empty()
538 }
539
540 pub fn is_full(&self) -> bool {
542 self.0.is_full()
543 }
544
545 pub fn len(&self) -> usize {
547 self.0.len()
548 }
549
550 pub fn capacity(&self) -> Option<usize> {
552 self.0.capacity()
553 }
554
555 pub fn same_channel(&self, other: &Self) -> bool {
557 self.0.receiver.same_channel(&*other.0.receiver)
558 }
559}
560
561impl<'a, T> Debug for RecvStream<'a, T> {
562 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
563 f.debug_struct("RecvStream").finish()
564 }
565}
566
567impl<'a, T> Clone for RecvStream<'a, T> {
568 fn clone(&self) -> RecvStream<'a, T> {
569 RecvStream(RecvFut::new(self.0.receiver.clone()))
570 }
571}
572
573impl<'a, T> Stream for RecvStream<'a, T> {
574 type Item = T;
575
576 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
577 match Pin::new(&mut self.0).poll_inner(cx, true) {
578 Poll::Pending => Poll::Pending,
580 Poll::Ready(item) => {
581 self.0.reset_hook();
582 Poll::Ready(item.ok())
583 }
584 }
585 }
586}
587
588impl<'a, T> FusedStream for RecvStream<'a, T> {
589 fn is_terminated(&self) -> bool {
590 self.0.is_terminated()
591 }
592}