crossbeam_channel/
select.rs

1//! Interface to the select mechanism.
2
3use std::fmt;
4use std::marker::PhantomData;
5use std::mem;
6use std::time::{Duration, Instant};
7use std::vec::Vec;
8
9use crossbeam_utils::Backoff;
10
11use crate::channel::{self, Receiver, Sender};
12use crate::context::Context;
13use crate::err::{ReadyTimeoutError, TryReadyError};
14use crate::err::{RecvError, SendError};
15use crate::err::{SelectTimeoutError, TrySelectError};
16use crate::flavors;
17use crate::utils;
18
19/// Temporary data that gets initialized during select or a blocking operation, and is consumed by
20/// `read` or `write`.
21///
22/// Each field contains data associated with a specific channel flavor.
23// This is a private API that is used by the select macro.
24#[derive(Debug, Default)]
25pub struct Token {
26    pub(crate) at: flavors::at::AtToken,
27    pub(crate) array: flavors::array::ArrayToken,
28    pub(crate) list: flavors::list::ListToken,
29    #[allow(dead_code)]
30    pub(crate) never: flavors::never::NeverToken,
31    pub(crate) tick: flavors::tick::TickToken,
32    pub(crate) zero: flavors::zero::ZeroToken,
33}
34
35/// Identifier associated with an operation by a specific thread on a specific channel.
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub struct Operation(usize);
38
39impl Operation {
40    /// Creates an operation identifier from a mutable reference.
41    ///
42    /// This function essentially just turns the address of the reference into a number. The
43    /// reference should point to a variable that is specific to the thread and the operation,
44    /// and is alive for the entire duration of select or blocking operation.
45    #[inline]
46    pub fn hook<T>(r: &mut T) -> Operation {
47        let val = r as *mut T as usize;
48        // Make sure that the pointer address doesn't equal the numerical representation of
49        // `Selected::{Waiting, Aborted, Disconnected}`.
50        assert!(val > 2);
51        Operation(val)
52    }
53}
54
55/// Current state of a select or a blocking operation.
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum Selected {
58    /// Still waiting for an operation.
59    Waiting,
60
61    /// The attempt to block the current thread has been aborted.
62    Aborted,
63
64    /// An operation became ready because a channel is disconnected.
65    Disconnected,
66
67    /// An operation became ready because a message can be sent or received.
68    Operation(Operation),
69}
70
71impl From<usize> for Selected {
72    #[inline]
73    fn from(val: usize) -> Selected {
74        match val {
75            0 => Selected::Waiting,
76            1 => Selected::Aborted,
77            2 => Selected::Disconnected,
78            oper => Selected::Operation(Operation(oper)),
79        }
80    }
81}
82
83impl Into<usize> for Selected {
84    #[inline]
85    fn into(self) -> usize {
86        match self {
87            Selected::Waiting => 0,
88            Selected::Aborted => 1,
89            Selected::Disconnected => 2,
90            Selected::Operation(Operation(val)) => val,
91        }
92    }
93}
94
95/// A receiver or a sender that can participate in select.
96///
97/// This is a handle that assists select in executing an operation, registration, deciding on the
98/// appropriate deadline for blocking, etc.
99// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
100pub trait SelectHandle {
101    /// Attempts to select an operation and returns `true` on success.
102    fn try_select(&self, token: &mut Token) -> bool;
103
104    /// Returns a deadline for an operation, if there is one.
105    fn deadline(&self) -> Option<Instant>;
106
107    /// Registers an operation for execution and returns `true` if it is now ready.
108    fn register(&self, oper: Operation, cx: &Context) -> bool;
109
110    /// Unregisters an operation for execution.
111    fn unregister(&self, oper: Operation);
112
113    /// Attempts to select an operation the thread got woken up for and returns `true` on success.
114    fn accept(&self, token: &mut Token, cx: &Context) -> bool;
115
116    /// Returns `true` if an operation can be executed without blocking.
117    fn is_ready(&self) -> bool;
118
119    /// Registers an operation for readiness notification and returns `true` if it is now ready.
120    fn watch(&self, oper: Operation, cx: &Context) -> bool;
121
122    /// Unregisters an operation for readiness notification.
123    fn unwatch(&self, oper: Operation);
124}
125
126impl<T: SelectHandle> SelectHandle for &T {
127    fn try_select(&self, token: &mut Token) -> bool {
128        (**self).try_select(token)
129    }
130
131    fn deadline(&self) -> Option<Instant> {
132        (**self).deadline()
133    }
134
135    fn register(&self, oper: Operation, cx: &Context) -> bool {
136        (**self).register(oper, cx)
137    }
138
139    fn unregister(&self, oper: Operation) {
140        (**self).unregister(oper);
141    }
142
143    fn accept(&self, token: &mut Token, cx: &Context) -> bool {
144        (**self).accept(token, cx)
145    }
146
147    fn is_ready(&self) -> bool {
148        (**self).is_ready()
149    }
150
151    fn watch(&self, oper: Operation, cx: &Context) -> bool {
152        (**self).watch(oper, cx)
153    }
154
155    fn unwatch(&self, oper: Operation) {
156        (**self).unwatch(oper)
157    }
158}
159
160/// Determines when a select operation should time out.
161#[derive(Clone, Copy, Eq, PartialEq)]
162enum Timeout {
163    /// No blocking.
164    Now,
165
166    /// Block forever.
167    Never,
168
169    /// Time out after the time instant.
170    At(Instant),
171}
172
173/// Runs until one of the operations is selected, potentially blocking the current thread.
174///
175/// Successful receive operations will have to be followed up by `channel::read()` and successful
176/// send operations by `channel::write()`.
177fn run_select(
178    handles: &mut [(&dyn SelectHandle, usize, *const u8)],
179    timeout: Timeout,
180    is_biased: bool,
181) -> Option<(Token, usize, *const u8)> {
182    if handles.is_empty() {
183        // Wait until the timeout and return.
184        match timeout {
185            Timeout::Now => return None,
186            Timeout::Never => {
187                utils::sleep_until(None);
188                unreachable!();
189            }
190            Timeout::At(when) => {
191                utils::sleep_until(Some(when));
192                return None;
193            }
194        }
195    }
196
197    if !is_biased {
198        // Shuffle the operations for fairness.
199        utils::shuffle(handles);
200    }
201
202    // Create a token, which serves as a temporary variable that gets initialized in this function
203    // and is later used by a call to `channel::read()` or `channel::write()` that completes the
204    // selected operation.
205    let mut token = Token::default();
206
207    // Try selecting one of the operations without blocking.
208    for &(handle, i, ptr) in handles.iter() {
209        if handle.try_select(&mut token) {
210            return Some((token, i, ptr));
211        }
212    }
213
214    loop {
215        // Prepare for blocking.
216        let res = Context::with(|cx| {
217            let mut sel = Selected::Waiting;
218            let mut registered_count = 0;
219            let mut index_ready = None;
220
221            if let Timeout::Now = timeout {
222                cx.try_select(Selected::Aborted).unwrap();
223            }
224
225            // Register all operations.
226            for (handle, i, _) in handles.iter_mut() {
227                registered_count += 1;
228
229                // If registration returns `false`, that means the operation has just become ready.
230                if handle.register(Operation::hook::<&dyn SelectHandle>(handle), cx) {
231                    // Try aborting select.
232                    sel = match cx.try_select(Selected::Aborted) {
233                        Ok(()) => {
234                            index_ready = Some(*i);
235                            Selected::Aborted
236                        }
237                        Err(s) => s,
238                    };
239                    break;
240                }
241
242                // If another thread has already selected one of the operations, stop registration.
243                sel = cx.selected();
244                if sel != Selected::Waiting {
245                    break;
246                }
247            }
248
249            if sel == Selected::Waiting {
250                // Check with each operation for how long we're allowed to block, and compute the
251                // earliest deadline.
252                let mut deadline: Option<Instant> = match timeout {
253                    Timeout::Now => return None,
254                    Timeout::Never => None,
255                    Timeout::At(when) => Some(when),
256                };
257                for &(handle, _, _) in handles.iter() {
258                    if let Some(x) = handle.deadline() {
259                        deadline = deadline.map(|y| x.min(y)).or(Some(x));
260                    }
261                }
262
263                // Block the current thread.
264                sel = cx.wait_until(deadline);
265            }
266
267            // Unregister all registered operations.
268            for (handle, _, _) in handles.iter_mut().take(registered_count) {
269                handle.unregister(Operation::hook::<&dyn SelectHandle>(handle));
270            }
271
272            match sel {
273                Selected::Waiting => unreachable!(),
274                Selected::Aborted => {
275                    // If an operation became ready during registration, try selecting it.
276                    if let Some(index_ready) = index_ready {
277                        for &(handle, i, ptr) in handles.iter() {
278                            if i == index_ready && handle.try_select(&mut token) {
279                                return Some((i, ptr));
280                            }
281                        }
282                    }
283                }
284                Selected::Disconnected => {}
285                Selected::Operation(_) => {
286                    // Find the selected operation.
287                    for (handle, i, ptr) in handles.iter_mut() {
288                        // Is this the selected operation?
289                        if sel == Selected::Operation(Operation::hook::<&dyn SelectHandle>(handle))
290                        {
291                            // Try selecting this operation.
292                            if handle.accept(&mut token, cx) {
293                                return Some((*i, *ptr));
294                            }
295                        }
296                    }
297                }
298            }
299
300            None
301        });
302
303        // Return if an operation was selected.
304        if let Some((i, ptr)) = res {
305            return Some((token, i, ptr));
306        }
307
308        // Try selecting one of the operations without blocking.
309        for &(handle, i, ptr) in handles.iter() {
310            if handle.try_select(&mut token) {
311                return Some((token, i, ptr));
312            }
313        }
314
315        match timeout {
316            Timeout::Now => return None,
317            Timeout::Never => {}
318            Timeout::At(when) => {
319                if Instant::now() >= when {
320                    return None;
321                }
322            }
323        }
324    }
325}
326
327/// Runs until one of the operations becomes ready, potentially blocking the current thread.
328fn run_ready(
329    handles: &mut [(&dyn SelectHandle, usize, *const u8)],
330    timeout: Timeout,
331    is_biased: bool,
332) -> Option<usize> {
333    if handles.is_empty() {
334        // Wait until the timeout and return.
335        match timeout {
336            Timeout::Now => return None,
337            Timeout::Never => {
338                utils::sleep_until(None);
339                unreachable!();
340            }
341            Timeout::At(when) => {
342                utils::sleep_until(Some(when));
343                return None;
344            }
345        }
346    }
347
348    if !is_biased {
349        // Shuffle the operations for fairness.
350        utils::shuffle(handles);
351    }
352
353    loop {
354        let backoff = Backoff::new();
355        loop {
356            // Check operations for readiness.
357            for &(handle, i, _) in handles.iter() {
358                if handle.is_ready() {
359                    return Some(i);
360                }
361            }
362
363            if backoff.is_completed() {
364                break;
365            } else {
366                backoff.snooze();
367            }
368        }
369
370        // Check for timeout.
371        match timeout {
372            Timeout::Now => return None,
373            Timeout::Never => {}
374            Timeout::At(when) => {
375                if Instant::now() >= when {
376                    return None;
377                }
378            }
379        }
380
381        // Prepare for blocking.
382        let res = Context::with(|cx| {
383            let mut sel = Selected::Waiting;
384            let mut registered_count = 0;
385
386            // Begin watching all operations.
387            for (handle, _, _) in handles.iter_mut() {
388                registered_count += 1;
389                let oper = Operation::hook::<&dyn SelectHandle>(handle);
390
391                // If registration returns `false`, that means the operation has just become ready.
392                if handle.watch(oper, cx) {
393                    sel = match cx.try_select(Selected::Operation(oper)) {
394                        Ok(()) => Selected::Operation(oper),
395                        Err(s) => s,
396                    };
397                    break;
398                }
399
400                // If another thread has already chosen one of the operations, stop registration.
401                sel = cx.selected();
402                if sel != Selected::Waiting {
403                    break;
404                }
405            }
406
407            if sel == Selected::Waiting {
408                // Check with each operation for how long we're allowed to block, and compute the
409                // earliest deadline.
410                let mut deadline: Option<Instant> = match timeout {
411                    Timeout::Now => unreachable!(),
412                    Timeout::Never => None,
413                    Timeout::At(when) => Some(when),
414                };
415                for &(handle, _, _) in handles.iter() {
416                    if let Some(x) = handle.deadline() {
417                        deadline = deadline.map(|y| x.min(y)).or(Some(x));
418                    }
419                }
420
421                // Block the current thread.
422                sel = cx.wait_until(deadline);
423            }
424
425            // Unwatch all operations.
426            for (handle, _, _) in handles.iter_mut().take(registered_count) {
427                handle.unwatch(Operation::hook::<&dyn SelectHandle>(handle));
428            }
429
430            match sel {
431                Selected::Waiting => unreachable!(),
432                Selected::Aborted => {}
433                Selected::Disconnected => {}
434                Selected::Operation(_) => {
435                    for (handle, i, _) in handles.iter_mut() {
436                        let oper = Operation::hook::<&dyn SelectHandle>(handle);
437                        if sel == Selected::Operation(oper) {
438                            return Some(*i);
439                        }
440                    }
441                }
442            }
443
444            None
445        });
446
447        // Return if an operation became ready.
448        if res.is_some() {
449            return res;
450        }
451    }
452}
453
454/// Attempts to select one of the operations without blocking.
455// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
456#[inline]
457pub fn try_select<'a>(
458    handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
459    is_biased: bool,
460) -> Result<SelectedOperation<'a>, TrySelectError> {
461    match run_select(handles, Timeout::Now, is_biased) {
462        None => Err(TrySelectError),
463        Some((token, index, ptr)) => Ok(SelectedOperation {
464            token,
465            index,
466            ptr,
467            _marker: PhantomData,
468        }),
469    }
470}
471
472/// Blocks until one of the operations becomes ready and selects it.
473// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
474#[inline]
475pub fn select<'a>(
476    handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
477    is_biased: bool,
478) -> SelectedOperation<'a> {
479    if handles.is_empty() {
480        panic!("no operations have been added to `Select`");
481    }
482
483    let (token, index, ptr) = run_select(handles, Timeout::Never, is_biased).unwrap();
484    SelectedOperation {
485        token,
486        index,
487        ptr,
488        _marker: PhantomData,
489    }
490}
491
492/// Blocks for a limited time until one of the operations becomes ready and selects it.
493// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
494#[inline]
495pub fn select_timeout<'a>(
496    handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
497    timeout: Duration,
498    is_biased: bool,
499) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
500    match Instant::now().checked_add(timeout) {
501        Some(deadline) => select_deadline(handles, deadline, is_biased),
502        None => Ok(select(handles, is_biased)),
503    }
504}
505
506/// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
507#[inline]
508pub(crate) fn select_deadline<'a>(
509    handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
510    deadline: Instant,
511    is_biased: bool,
512) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
513    match run_select(handles, Timeout::At(deadline), is_biased) {
514        None => Err(SelectTimeoutError),
515        Some((token, index, ptr)) => Ok(SelectedOperation {
516            token,
517            index,
518            ptr,
519            _marker: PhantomData,
520        }),
521    }
522}
523
524/// Selects from a set of channel operations.
525///
526/// `Select` allows you to define a set of channel operations, wait until any one of them becomes
527/// ready, and finally execute it. If multiple operations are ready at the same time, a random one
528/// among them is selected.
529///
530/// An operation is considered to be ready if it doesn't have to block. Note that it is ready even
531/// when it will simply return an error because the channel is disconnected.
532///
533/// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a
534/// dynamically created list of channel operations.
535///
536/// [`select!`]: crate::select!
537///
538/// Once a list of operations has been built with `Select`, there are two different ways of
539/// proceeding:
540///
541/// * Select an operation with [`try_select`], [`select`], or [`select_timeout`]. If successful,
542///   the returned selected operation has already begun and **must** be completed. If we don't
543///   complete it, a panic will occur.
544///
545/// * Wait for an operation to become ready with [`try_ready`], [`ready`], or [`ready_timeout`]. If
546///   successful, we may attempt to execute the operation, but are not obliged to. In fact, it's
547///   possible for another thread to make the operation not ready just before we try executing it,
548///   so it's wise to use a retry loop. However, note that these methods might return with success
549///   spuriously, so it's a good idea to always double check if the operation is really ready.
550///
551/// # Examples
552///
553/// Use [`select`] to receive a message from a list of receivers:
554///
555/// ```
556/// use crossbeam_channel::{Receiver, RecvError, Select};
557///
558/// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
559///     // Build a list of operations.
560///     let mut sel = Select::new();
561///     for r in rs {
562///         sel.recv(r);
563///     }
564///
565///     // Complete the selected operation.
566///     let oper = sel.select();
567///     let index = oper.index();
568///     oper.recv(&rs[index])
569/// }
570/// ```
571///
572/// Use [`ready`] to receive a message from a list of receivers:
573///
574/// ```
575/// use crossbeam_channel::{Receiver, RecvError, Select};
576///
577/// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
578///     // Build a list of operations.
579///     let mut sel = Select::new();
580///     for r in rs {
581///         sel.recv(r);
582///     }
583///
584///     loop {
585///         // Wait until a receive operation becomes ready and try executing it.
586///         let index = sel.ready();
587///         let res = rs[index].try_recv();
588///
589///         // If the operation turns out not to be ready, retry.
590///         if let Err(e) = res {
591///             if e.is_empty() {
592///                 continue;
593///             }
594///         }
595///
596///         // Success!
597///         return res.map_err(|_| RecvError);
598///     }
599/// }
600/// ```
601///
602/// [`try_select`]: Select::try_select
603/// [`select`]: Select::select
604/// [`select_timeout`]: Select::select_timeout
605/// [`try_ready`]: Select::try_ready
606/// [`ready`]: Select::ready
607/// [`ready_timeout`]: Select::ready_timeout
608pub struct Select<'a> {
609    /// A list of senders and receivers participating in selection.
610    handles: Vec<(&'a dyn SelectHandle, usize, *const u8)>,
611
612    /// The next index to assign to an operation.
613    next_index: usize,
614
615    /// Whether to use the index of handles as bias for selecting ready operations.
616    biased: bool,
617}
618
619unsafe impl Send for Select<'_> {}
620unsafe impl Sync for Select<'_> {}
621
622impl<'a> Select<'a> {
623    /// Creates an empty list of channel operations for selection.
624    ///
625    /// # Examples
626    ///
627    /// ```
628    /// use crossbeam_channel::Select;
629    ///
630    /// let mut sel = Select::new();
631    ///
632    /// // The list of operations is empty, which means no operation can be selected.
633    /// assert!(sel.try_select().is_err());
634    /// ```
635    pub fn new() -> Select<'a> {
636        Select {
637            handles: Vec::with_capacity(4),
638            next_index: 0,
639            biased: false,
640        }
641    }
642
643    /// Creates an empty list of channel operations with biased selection.
644    ///
645    /// When multiple handles are ready, this will select the operation with the lowest index.
646    ///
647    /// # Examples
648    ///
649    /// ```
650    /// use crossbeam_channel::Select;
651    ///
652    /// let mut sel = Select::new_biased();
653    ///
654    /// // The list of operations is empty, which means no operation can be selected.
655    /// assert!(sel.try_select().is_err());
656    /// ```
657    pub fn new_biased() -> Self {
658        Self {
659            biased: true,
660            ..Default::default()
661        }
662    }
663
664    /// Adds a send operation.
665    ///
666    /// Returns the index of the added operation.
667    ///
668    /// # Examples
669    ///
670    /// ```
671    /// use crossbeam_channel::{unbounded, Select};
672    ///
673    /// let (s, r) = unbounded::<i32>();
674    ///
675    /// let mut sel = Select::new();
676    /// let index = sel.send(&s);
677    /// ```
678    pub fn send<T>(&mut self, s: &'a Sender<T>) -> usize {
679        let i = self.next_index;
680        let ptr = s as *const Sender<_> as *const u8;
681        self.handles.push((s, i, ptr));
682        self.next_index += 1;
683        i
684    }
685
686    /// Adds a receive operation.
687    ///
688    /// Returns the index of the added operation.
689    ///
690    /// # Examples
691    ///
692    /// ```
693    /// use crossbeam_channel::{unbounded, Select};
694    ///
695    /// let (s, r) = unbounded::<i32>();
696    ///
697    /// let mut sel = Select::new();
698    /// let index = sel.recv(&r);
699    /// ```
700    pub fn recv<T>(&mut self, r: &'a Receiver<T>) -> usize {
701        let i = self.next_index;
702        let ptr = r as *const Receiver<_> as *const u8;
703        self.handles.push((r, i, ptr));
704        self.next_index += 1;
705        i
706    }
707
708    /// Removes a previously added operation.
709    ///
710    /// This is useful when an operation is selected because the channel got disconnected and we
711    /// want to try again to select a different operation instead.
712    ///
713    /// If new operations are added after removing some, the indices of removed operations will not
714    /// be reused.
715    ///
716    /// # Panics
717    ///
718    /// An attempt to remove a non-existing or already removed operation will panic.
719    ///
720    /// # Examples
721    ///
722    /// ```
723    /// use crossbeam_channel::{unbounded, Select};
724    ///
725    /// let (s1, r1) = unbounded::<i32>();
726    /// let (_, r2) = unbounded::<i32>();
727    ///
728    /// let mut sel = Select::new();
729    /// let oper1 = sel.recv(&r1);
730    /// let oper2 = sel.recv(&r2);
731    ///
732    /// // Both operations are initially ready, so a random one will be executed.
733    /// let oper = sel.select();
734    /// assert_eq!(oper.index(), oper2);
735    /// assert!(oper.recv(&r2).is_err());
736    /// sel.remove(oper2);
737    ///
738    /// s1.send(10).unwrap();
739    ///
740    /// let oper = sel.select();
741    /// assert_eq!(oper.index(), oper1);
742    /// assert_eq!(oper.recv(&r1), Ok(10));
743    /// ```
744    pub fn remove(&mut self, index: usize) {
745        assert!(
746            index < self.next_index,
747            "index out of bounds; {} >= {}",
748            index,
749            self.next_index,
750        );
751
752        let i = self
753            .handles
754            .iter()
755            .enumerate()
756            .find(|(_, (_, i, _))| *i == index)
757            .expect("no operation with this index")
758            .0;
759
760        self.handles.swap_remove(i);
761    }
762
763    /// Attempts to select one of the operations without blocking.
764    ///
765    /// If an operation is ready, it is selected and returned. If multiple operations are ready at
766    /// the same time, a random one among them is selected. If none of the operations are ready, an
767    /// error is returned.
768    ///
769    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
770    /// even when it will simply return an error because the channel is disconnected.
771    ///
772    /// The selected operation must be completed with [`SelectedOperation::send`]
773    /// or [`SelectedOperation::recv`].
774    ///
775    /// # Examples
776    ///
777    /// ```
778    /// use crossbeam_channel::{unbounded, Select};
779    ///
780    /// let (s1, r1) = unbounded();
781    /// let (s2, r2) = unbounded();
782    ///
783    /// s1.send(10).unwrap();
784    /// s2.send(20).unwrap();
785    ///
786    /// let mut sel = Select::new();
787    /// let oper1 = sel.recv(&r1);
788    /// let oper2 = sel.recv(&r2);
789    ///
790    /// // Both operations are initially ready, so a random one will be executed.
791    /// let oper = sel.try_select();
792    /// match oper {
793    ///     Err(_) => panic!("both operations should be ready"),
794    ///     Ok(oper) => match oper.index() {
795    ///         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
796    ///         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
797    ///         _ => unreachable!(),
798    ///     }
799    /// }
800    /// ```
801    pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> {
802        try_select(&mut self.handles, self.biased)
803    }
804
805    /// Blocks until one of the operations becomes ready and selects it.
806    ///
807    /// Once an operation becomes ready, it is selected and returned. If multiple operations are
808    /// ready at the same time, a random one among them is selected.
809    ///
810    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
811    /// even when it will simply return an error because the channel is disconnected.
812    ///
813    /// The selected operation must be completed with [`SelectedOperation::send`]
814    /// or [`SelectedOperation::recv`].
815    ///
816    /// # Panics
817    ///
818    /// Panics if no operations have been added to `Select`.
819    ///
820    /// # Examples
821    ///
822    /// ```
823    /// use std::thread;
824    /// use std::time::Duration;
825    /// use crossbeam_channel::{unbounded, Select};
826    ///
827    /// let (s1, r1) = unbounded();
828    /// let (s2, r2) = unbounded();
829    ///
830    /// thread::spawn(move || {
831    ///     thread::sleep(Duration::from_secs(1));
832    ///     s1.send(10).unwrap();
833    /// });
834    /// thread::spawn(move || s2.send(20).unwrap());
835    ///
836    /// let mut sel = Select::new();
837    /// let oper1 = sel.recv(&r1);
838    /// let oper2 = sel.recv(&r2);
839    ///
840    /// // The second operation will be selected because it becomes ready first.
841    /// let oper = sel.select();
842    /// match oper.index() {
843    ///     i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
844    ///     i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
845    ///     _ => unreachable!(),
846    /// }
847    /// ```
848    pub fn select(&mut self) -> SelectedOperation<'a> {
849        select(&mut self.handles, self.biased)
850    }
851
852    /// Blocks for a limited time until one of the operations becomes ready and selects it.
853    ///
854    /// If an operation becomes ready, it is selected and returned. If multiple operations are
855    /// ready at the same time, a random one among them is selected. If none of the operations
856    /// become ready for the specified duration, an error is returned.
857    ///
858    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
859    /// even when it will simply return an error because the channel is disconnected.
860    ///
861    /// The selected operation must be completed with [`SelectedOperation::send`]
862    /// or [`SelectedOperation::recv`].
863    ///
864    /// # Examples
865    ///
866    /// ```
867    /// use std::thread;
868    /// use std::time::Duration;
869    /// use crossbeam_channel::{unbounded, Select};
870    ///
871    /// let (s1, r1) = unbounded();
872    /// let (s2, r2) = unbounded();
873    ///
874    /// thread::spawn(move || {
875    ///     thread::sleep(Duration::from_secs(1));
876    ///     s1.send(10).unwrap();
877    /// });
878    /// thread::spawn(move || s2.send(20).unwrap());
879    ///
880    /// let mut sel = Select::new();
881    /// let oper1 = sel.recv(&r1);
882    /// let oper2 = sel.recv(&r2);
883    ///
884    /// // The second operation will be selected because it becomes ready first.
885    /// let oper = sel.select_timeout(Duration::from_millis(500));
886    /// match oper {
887    ///     Err(_) => panic!("should not have timed out"),
888    ///     Ok(oper) => match oper.index() {
889    ///         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
890    ///         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
891    ///         _ => unreachable!(),
892    ///     }
893    /// }
894    /// ```
895    pub fn select_timeout(
896        &mut self,
897        timeout: Duration,
898    ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
899        select_timeout(&mut self.handles, timeout, self.biased)
900    }
901
902    /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
903    ///
904    /// If an operation becomes ready, it is selected and returned. If multiple operations are
905    /// ready at the same time, a random one among them is selected. If none of the operations
906    /// become ready before the given deadline, an error is returned.
907    ///
908    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
909    /// even when it will simply return an error because the channel is disconnected.
910    ///
911    /// The selected operation must be completed with [`SelectedOperation::send`]
912    /// or [`SelectedOperation::recv`].
913    ///
914    /// # Examples
915    ///
916    /// ```
917    /// use std::thread;
918    /// use std::time::{Instant, Duration};
919    /// use crossbeam_channel::{unbounded, Select};
920    ///
921    /// let (s1, r1) = unbounded();
922    /// let (s2, r2) = unbounded();
923    ///
924    /// thread::spawn(move || {
925    ///     thread::sleep(Duration::from_secs(1));
926    ///     s1.send(10).unwrap();
927    /// });
928    /// thread::spawn(move || s2.send(20).unwrap());
929    ///
930    /// let mut sel = Select::new();
931    /// let oper1 = sel.recv(&r1);
932    /// let oper2 = sel.recv(&r2);
933    ///
934    /// let deadline = Instant::now() + Duration::from_millis(500);
935    ///
936    /// // The second operation will be selected because it becomes ready first.
937    /// let oper = sel.select_deadline(deadline);
938    /// match oper {
939    ///     Err(_) => panic!("should not have timed out"),
940    ///     Ok(oper) => match oper.index() {
941    ///         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
942    ///         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
943    ///         _ => unreachable!(),
944    ///     }
945    /// }
946    /// ```
947    pub fn select_deadline(
948        &mut self,
949        deadline: Instant,
950    ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
951        select_deadline(&mut self.handles, deadline, self.biased)
952    }
953
954    /// Attempts to find a ready operation without blocking.
955    ///
956    /// If an operation is ready, its index is returned. If multiple operations are ready at the
957    /// same time, a random one among them is chosen. If none of the operations are ready, an error
958    /// is returned.
959    ///
960    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
961    /// even when it will simply return an error because the channel is disconnected.
962    ///
963    /// Note that this method might return with success spuriously, so it's a good idea to always
964    /// double check if the operation is really ready.
965    ///
966    /// # Examples
967    ///
968    /// ```
969    /// use crossbeam_channel::{unbounded, Select};
970    ///
971    /// let (s1, r1) = unbounded();
972    /// let (s2, r2) = unbounded();
973    ///
974    /// s1.send(10).unwrap();
975    /// s2.send(20).unwrap();
976    ///
977    /// let mut sel = Select::new();
978    /// let oper1 = sel.recv(&r1);
979    /// let oper2 = sel.recv(&r2);
980    ///
981    /// // Both operations are initially ready, so a random one will be chosen.
982    /// match sel.try_ready() {
983    ///     Err(_) => panic!("both operations should be ready"),
984    ///     Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
985    ///     Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
986    ///     Ok(_) => unreachable!(),
987    /// }
988    /// ```
989    pub fn try_ready(&mut self) -> Result<usize, TryReadyError> {
990        match run_ready(&mut self.handles, Timeout::Now, self.biased) {
991            None => Err(TryReadyError),
992            Some(index) => Ok(index),
993        }
994    }
995
996    /// Blocks until one of the operations becomes ready.
997    ///
998    /// Once an operation becomes ready, its index is returned. If multiple operations are ready at
999    /// the same time, a random one among them is chosen.
1000    ///
1001    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1002    /// even when it will simply return an error because the channel is disconnected.
1003    ///
1004    /// Note that this method might return with success spuriously, so it's a good idea to always
1005    /// double check if the operation is really ready.
1006    ///
1007    /// # Panics
1008    ///
1009    /// Panics if no operations have been added to `Select`.
1010    ///
1011    /// # Examples
1012    ///
1013    /// ```
1014    /// use std::thread;
1015    /// use std::time::Duration;
1016    /// use crossbeam_channel::{unbounded, Select};
1017    ///
1018    /// let (s1, r1) = unbounded();
1019    /// let (s2, r2) = unbounded();
1020    ///
1021    /// thread::spawn(move || {
1022    ///     thread::sleep(Duration::from_secs(1));
1023    ///     s1.send(10).unwrap();
1024    /// });
1025    /// thread::spawn(move || s2.send(20).unwrap());
1026    ///
1027    /// let mut sel = Select::new();
1028    /// let oper1 = sel.recv(&r1);
1029    /// let oper2 = sel.recv(&r2);
1030    ///
1031    /// // The second operation will be selected because it becomes ready first.
1032    /// match sel.ready() {
1033    ///     i if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1034    ///     i if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1035    ///     _ => unreachable!(),
1036    /// }
1037    /// ```
1038    pub fn ready(&mut self) -> usize {
1039        if self.handles.is_empty() {
1040            panic!("no operations have been added to `Select`");
1041        }
1042
1043        run_ready(&mut self.handles, Timeout::Never, self.biased).unwrap()
1044    }
1045
1046    /// Blocks for a limited time until one of the operations becomes ready.
1047    ///
1048    /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1049    /// the same time, a random one among them is chosen. If none of the operations become ready
1050    /// for the specified duration, an error is returned.
1051    ///
1052    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1053    /// even when it will simply return an error because the channel is disconnected.
1054    ///
1055    /// Note that this method might return with success spuriously, so it's a good idea to double
1056    /// check if the operation is really ready.
1057    ///
1058    /// # Examples
1059    ///
1060    /// ```
1061    /// use std::thread;
1062    /// use std::time::Duration;
1063    /// use crossbeam_channel::{unbounded, Select};
1064    ///
1065    /// let (s1, r1) = unbounded();
1066    /// let (s2, r2) = unbounded();
1067    ///
1068    /// thread::spawn(move || {
1069    ///     thread::sleep(Duration::from_secs(1));
1070    ///     s1.send(10).unwrap();
1071    /// });
1072    /// thread::spawn(move || s2.send(20).unwrap());
1073    ///
1074    /// let mut sel = Select::new();
1075    /// let oper1 = sel.recv(&r1);
1076    /// let oper2 = sel.recv(&r2);
1077    ///
1078    /// // The second operation will be selected because it becomes ready first.
1079    /// match sel.ready_timeout(Duration::from_millis(500)) {
1080    ///     Err(_) => panic!("should not have timed out"),
1081    ///     Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1082    ///     Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1083    ///     Ok(_) => unreachable!(),
1084    /// }
1085    /// ```
1086    pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> {
1087        match Instant::now().checked_add(timeout) {
1088            Some(deadline) => self.ready_deadline(deadline),
1089            None => Ok(self.ready()),
1090        }
1091    }
1092
1093    /// Blocks until a given deadline, or until one of the operations becomes ready.
1094    ///
1095    /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1096    /// the same time, a random one among them is chosen. If none of the operations become ready
1097    /// before the deadline, an error is returned.
1098    ///
1099    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1100    /// even when it will simply return an error because the channel is disconnected.
1101    ///
1102    /// Note that this method might return with success spuriously, so it's a good idea to double
1103    /// check if the operation is really ready.
1104    ///
1105    /// # Examples
1106    ///
1107    /// ```
1108    /// use std::thread;
1109    /// use std::time::{Duration, Instant};
1110    /// use crossbeam_channel::{unbounded, Select};
1111    ///
1112    /// let deadline = Instant::now() + Duration::from_millis(500);
1113    ///
1114    /// let (s1, r1) = unbounded();
1115    /// let (s2, r2) = unbounded();
1116    ///
1117    /// thread::spawn(move || {
1118    ///     thread::sleep(Duration::from_secs(1));
1119    ///     s1.send(10).unwrap();
1120    /// });
1121    /// thread::spawn(move || s2.send(20).unwrap());
1122    ///
1123    /// let mut sel = Select::new();
1124    /// let oper1 = sel.recv(&r1);
1125    /// let oper2 = sel.recv(&r2);
1126    ///
1127    /// // The second operation will be selected because it becomes ready first.
1128    /// match sel.ready_deadline(deadline) {
1129    ///     Err(_) => panic!("should not have timed out"),
1130    ///     Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1131    ///     Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1132    ///     Ok(_) => unreachable!(),
1133    /// }
1134    /// ```
1135    pub fn ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError> {
1136        match run_ready(&mut self.handles, Timeout::At(deadline), self.biased) {
1137            None => Err(ReadyTimeoutError),
1138            Some(index) => Ok(index),
1139        }
1140    }
1141}
1142
1143impl<'a> Clone for Select<'a> {
1144    fn clone(&self) -> Select<'a> {
1145        Select {
1146            handles: self.handles.clone(),
1147            next_index: self.next_index,
1148            biased: self.biased,
1149        }
1150    }
1151}
1152
1153impl<'a> Default for Select<'a> {
1154    fn default() -> Select<'a> {
1155        Select::new()
1156    }
1157}
1158
1159impl fmt::Debug for Select<'_> {
1160    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1161        f.pad("Select { .. }")
1162    }
1163}
1164
1165/// A selected operation that needs to be completed.
1166///
1167/// To complete the operation, call [`send`] or [`recv`].
1168///
1169/// # Panics
1170///
1171/// Forgetting to complete the operation is an error and might lead to deadlocks. If a
1172/// `SelectedOperation` is dropped without completion, a panic occurs.
1173///
1174/// [`send`]: SelectedOperation::send
1175/// [`recv`]: SelectedOperation::recv
1176#[must_use]
1177pub struct SelectedOperation<'a> {
1178    /// Token needed to complete the operation.
1179    token: Token,
1180
1181    /// The index of the selected operation.
1182    index: usize,
1183
1184    /// The address of the selected `Sender` or `Receiver`.
1185    ptr: *const u8,
1186
1187    /// Indicates that `Sender`s and `Receiver`s are borrowed.
1188    _marker: PhantomData<&'a ()>,
1189}
1190
1191impl SelectedOperation<'_> {
1192    /// Returns the index of the selected operation.
1193    ///
1194    /// # Examples
1195    ///
1196    /// ```
1197    /// use crossbeam_channel::{bounded, Select};
1198    ///
1199    /// let (s1, r1) = bounded::<()>(0);
1200    /// let (s2, r2) = bounded::<()>(0);
1201    /// let (s3, r3) = bounded::<()>(1);
1202    ///
1203    /// let mut sel = Select::new();
1204    /// let oper1 = sel.send(&s1);
1205    /// let oper2 = sel.recv(&r2);
1206    /// let oper3 = sel.send(&s3);
1207    ///
1208    /// // Only the last operation is ready.
1209    /// let oper = sel.select();
1210    /// assert_eq!(oper.index(), 2);
1211    /// assert_eq!(oper.index(), oper3);
1212    ///
1213    /// // Complete the operation.
1214    /// oper.send(&s3, ()).unwrap();
1215    /// ```
1216    pub fn index(&self) -> usize {
1217        self.index
1218    }
1219
1220    /// Completes the send operation.
1221    ///
1222    /// The passed [`Sender`] reference must be the same one that was used in [`Select::send`]
1223    /// when the operation was added.
1224    ///
1225    /// # Panics
1226    ///
1227    /// Panics if an incorrect [`Sender`] reference is passed.
1228    ///
1229    /// # Examples
1230    ///
1231    /// ```
1232    /// use crossbeam_channel::{bounded, Select, SendError};
1233    ///
1234    /// let (s, r) = bounded::<i32>(0);
1235    /// drop(r);
1236    ///
1237    /// let mut sel = Select::new();
1238    /// let oper1 = sel.send(&s);
1239    ///
1240    /// let oper = sel.select();
1241    /// assert_eq!(oper.index(), oper1);
1242    /// assert_eq!(oper.send(&s, 10), Err(SendError(10)));
1243    /// ```
1244    pub fn send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>> {
1245        assert!(
1246            s as *const Sender<T> as *const u8 == self.ptr,
1247            "passed a sender that wasn't selected",
1248        );
1249        let res = unsafe { channel::write(s, &mut self.token, msg) };
1250        mem::forget(self);
1251        res.map_err(SendError)
1252    }
1253
1254    /// Completes the receive operation.
1255    ///
1256    /// The passed [`Receiver`] reference must be the same one that was used in [`Select::recv`]
1257    /// when the operation was added.
1258    ///
1259    /// # Panics
1260    ///
1261    /// Panics if an incorrect [`Receiver`] reference is passed.
1262    ///
1263    /// # Examples
1264    ///
1265    /// ```
1266    /// use crossbeam_channel::{bounded, Select, RecvError};
1267    ///
1268    /// let (s, r) = bounded::<i32>(0);
1269    /// drop(s);
1270    ///
1271    /// let mut sel = Select::new();
1272    /// let oper1 = sel.recv(&r);
1273    ///
1274    /// let oper = sel.select();
1275    /// assert_eq!(oper.index(), oper1);
1276    /// assert_eq!(oper.recv(&r), Err(RecvError));
1277    /// ```
1278    pub fn recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError> {
1279        assert!(
1280            r as *const Receiver<T> as *const u8 == self.ptr,
1281            "passed a receiver that wasn't selected",
1282        );
1283        let res = unsafe { channel::read(r, &mut self.token) };
1284        mem::forget(self);
1285        res.map_err(|_| RecvError)
1286    }
1287}
1288
1289impl fmt::Debug for SelectedOperation<'_> {
1290    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1291        f.pad("SelectedOperation { .. }")
1292    }
1293}
1294
1295impl Drop for SelectedOperation<'_> {
1296    fn drop(&mut self) {
1297        panic!("dropped `SelectedOperation` without completing the operation");
1298    }
1299}