crossbeam_channel/select.rs
1//! Interface to the select mechanism.
2
3use std::fmt;
4use std::marker::PhantomData;
5use std::mem;
6use std::time::{Duration, Instant};
7use std::vec::Vec;
8
9use crossbeam_utils::Backoff;
10
11use crate::channel::{self, Receiver, Sender};
12use crate::context::Context;
13use crate::err::{ReadyTimeoutError, TryReadyError};
14use crate::err::{RecvError, SendError};
15use crate::err::{SelectTimeoutError, TrySelectError};
16use crate::flavors;
17use crate::utils;
18
19/// Temporary data that gets initialized during select or a blocking operation, and is consumed by
20/// `read` or `write`.
21///
22/// Each field contains data associated with a specific channel flavor.
23// This is a private API that is used by the select macro.
24#[derive(Debug, Default)]
25pub struct Token {
26 pub(crate) at: flavors::at::AtToken,
27 pub(crate) array: flavors::array::ArrayToken,
28 pub(crate) list: flavors::list::ListToken,
29 #[allow(dead_code)]
30 pub(crate) never: flavors::never::NeverToken,
31 pub(crate) tick: flavors::tick::TickToken,
32 pub(crate) zero: flavors::zero::ZeroToken,
33}
34
35/// Identifier associated with an operation by a specific thread on a specific channel.
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub struct Operation(usize);
38
39impl Operation {
40 /// Creates an operation identifier from a mutable reference.
41 ///
42 /// This function essentially just turns the address of the reference into a number. The
43 /// reference should point to a variable that is specific to the thread and the operation,
44 /// and is alive for the entire duration of select or blocking operation.
45 #[inline]
46 pub fn hook<T>(r: &mut T) -> Operation {
47 let val = r as *mut T as usize;
48 // Make sure that the pointer address doesn't equal the numerical representation of
49 // `Selected::{Waiting, Aborted, Disconnected}`.
50 assert!(val > 2);
51 Operation(val)
52 }
53}
54
55/// Current state of a select or a blocking operation.
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum Selected {
58 /// Still waiting for an operation.
59 Waiting,
60
61 /// The attempt to block the current thread has been aborted.
62 Aborted,
63
64 /// An operation became ready because a channel is disconnected.
65 Disconnected,
66
67 /// An operation became ready because a message can be sent or received.
68 Operation(Operation),
69}
70
71impl From<usize> for Selected {
72 #[inline]
73 fn from(val: usize) -> Selected {
74 match val {
75 0 => Selected::Waiting,
76 1 => Selected::Aborted,
77 2 => Selected::Disconnected,
78 oper => Selected::Operation(Operation(oper)),
79 }
80 }
81}
82
83impl Into<usize> for Selected {
84 #[inline]
85 fn into(self) -> usize {
86 match self {
87 Selected::Waiting => 0,
88 Selected::Aborted => 1,
89 Selected::Disconnected => 2,
90 Selected::Operation(Operation(val)) => val,
91 }
92 }
93}
94
95/// A receiver or a sender that can participate in select.
96///
97/// This is a handle that assists select in executing an operation, registration, deciding on the
98/// appropriate deadline for blocking, etc.
99// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
100pub trait SelectHandle {
101 /// Attempts to select an operation and returns `true` on success.
102 fn try_select(&self, token: &mut Token) -> bool;
103
104 /// Returns a deadline for an operation, if there is one.
105 fn deadline(&self) -> Option<Instant>;
106
107 /// Registers an operation for execution and returns `true` if it is now ready.
108 fn register(&self, oper: Operation, cx: &Context) -> bool;
109
110 /// Unregisters an operation for execution.
111 fn unregister(&self, oper: Operation);
112
113 /// Attempts to select an operation the thread got woken up for and returns `true` on success.
114 fn accept(&self, token: &mut Token, cx: &Context) -> bool;
115
116 /// Returns `true` if an operation can be executed without blocking.
117 fn is_ready(&self) -> bool;
118
119 /// Registers an operation for readiness notification and returns `true` if it is now ready.
120 fn watch(&self, oper: Operation, cx: &Context) -> bool;
121
122 /// Unregisters an operation for readiness notification.
123 fn unwatch(&self, oper: Operation);
124}
125
126impl<T: SelectHandle> SelectHandle for &T {
127 fn try_select(&self, token: &mut Token) -> bool {
128 (**self).try_select(token)
129 }
130
131 fn deadline(&self) -> Option<Instant> {
132 (**self).deadline()
133 }
134
135 fn register(&self, oper: Operation, cx: &Context) -> bool {
136 (**self).register(oper, cx)
137 }
138
139 fn unregister(&self, oper: Operation) {
140 (**self).unregister(oper);
141 }
142
143 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
144 (**self).accept(token, cx)
145 }
146
147 fn is_ready(&self) -> bool {
148 (**self).is_ready()
149 }
150
151 fn watch(&self, oper: Operation, cx: &Context) -> bool {
152 (**self).watch(oper, cx)
153 }
154
155 fn unwatch(&self, oper: Operation) {
156 (**self).unwatch(oper)
157 }
158}
159
160/// Determines when a select operation should time out.
161#[derive(Clone, Copy, Eq, PartialEq)]
162enum Timeout {
163 /// No blocking.
164 Now,
165
166 /// Block forever.
167 Never,
168
169 /// Time out after the time instant.
170 At(Instant),
171}
172
173/// Runs until one of the operations is selected, potentially blocking the current thread.
174///
175/// Successful receive operations will have to be followed up by `channel::read()` and successful
176/// send operations by `channel::write()`.
177fn run_select(
178 handles: &mut [(&dyn SelectHandle, usize, *const u8)],
179 timeout: Timeout,
180 is_biased: bool,
181) -> Option<(Token, usize, *const u8)> {
182 if handles.is_empty() {
183 // Wait until the timeout and return.
184 match timeout {
185 Timeout::Now => return None,
186 Timeout::Never => {
187 utils::sleep_until(None);
188 unreachable!();
189 }
190 Timeout::At(when) => {
191 utils::sleep_until(Some(when));
192 return None;
193 }
194 }
195 }
196
197 if !is_biased {
198 // Shuffle the operations for fairness.
199 utils::shuffle(handles);
200 }
201
202 // Create a token, which serves as a temporary variable that gets initialized in this function
203 // and is later used by a call to `channel::read()` or `channel::write()` that completes the
204 // selected operation.
205 let mut token = Token::default();
206
207 // Try selecting one of the operations without blocking.
208 for &(handle, i, ptr) in handles.iter() {
209 if handle.try_select(&mut token) {
210 return Some((token, i, ptr));
211 }
212 }
213
214 loop {
215 // Prepare for blocking.
216 let res = Context::with(|cx| {
217 let mut sel = Selected::Waiting;
218 let mut registered_count = 0;
219 let mut index_ready = None;
220
221 if let Timeout::Now = timeout {
222 cx.try_select(Selected::Aborted).unwrap();
223 }
224
225 // Register all operations.
226 for (handle, i, _) in handles.iter_mut() {
227 registered_count += 1;
228
229 // If registration returns `false`, that means the operation has just become ready.
230 if handle.register(Operation::hook::<&dyn SelectHandle>(handle), cx) {
231 // Try aborting select.
232 sel = match cx.try_select(Selected::Aborted) {
233 Ok(()) => {
234 index_ready = Some(*i);
235 Selected::Aborted
236 }
237 Err(s) => s,
238 };
239 break;
240 }
241
242 // If another thread has already selected one of the operations, stop registration.
243 sel = cx.selected();
244 if sel != Selected::Waiting {
245 break;
246 }
247 }
248
249 if sel == Selected::Waiting {
250 // Check with each operation for how long we're allowed to block, and compute the
251 // earliest deadline.
252 let mut deadline: Option<Instant> = match timeout {
253 Timeout::Now => return None,
254 Timeout::Never => None,
255 Timeout::At(when) => Some(when),
256 };
257 for &(handle, _, _) in handles.iter() {
258 if let Some(x) = handle.deadline() {
259 deadline = deadline.map(|y| x.min(y)).or(Some(x));
260 }
261 }
262
263 // Block the current thread.
264 sel = cx.wait_until(deadline);
265 }
266
267 // Unregister all registered operations.
268 for (handle, _, _) in handles.iter_mut().take(registered_count) {
269 handle.unregister(Operation::hook::<&dyn SelectHandle>(handle));
270 }
271
272 match sel {
273 Selected::Waiting => unreachable!(),
274 Selected::Aborted => {
275 // If an operation became ready during registration, try selecting it.
276 if let Some(index_ready) = index_ready {
277 for &(handle, i, ptr) in handles.iter() {
278 if i == index_ready && handle.try_select(&mut token) {
279 return Some((i, ptr));
280 }
281 }
282 }
283 }
284 Selected::Disconnected => {}
285 Selected::Operation(_) => {
286 // Find the selected operation.
287 for (handle, i, ptr) in handles.iter_mut() {
288 // Is this the selected operation?
289 if sel == Selected::Operation(Operation::hook::<&dyn SelectHandle>(handle))
290 {
291 // Try selecting this operation.
292 if handle.accept(&mut token, cx) {
293 return Some((*i, *ptr));
294 }
295 }
296 }
297 }
298 }
299
300 None
301 });
302
303 // Return if an operation was selected.
304 if let Some((i, ptr)) = res {
305 return Some((token, i, ptr));
306 }
307
308 // Try selecting one of the operations without blocking.
309 for &(handle, i, ptr) in handles.iter() {
310 if handle.try_select(&mut token) {
311 return Some((token, i, ptr));
312 }
313 }
314
315 match timeout {
316 Timeout::Now => return None,
317 Timeout::Never => {}
318 Timeout::At(when) => {
319 if Instant::now() >= when {
320 return None;
321 }
322 }
323 }
324 }
325}
326
327/// Runs until one of the operations becomes ready, potentially blocking the current thread.
328fn run_ready(
329 handles: &mut [(&dyn SelectHandle, usize, *const u8)],
330 timeout: Timeout,
331 is_biased: bool,
332) -> Option<usize> {
333 if handles.is_empty() {
334 // Wait until the timeout and return.
335 match timeout {
336 Timeout::Now => return None,
337 Timeout::Never => {
338 utils::sleep_until(None);
339 unreachable!();
340 }
341 Timeout::At(when) => {
342 utils::sleep_until(Some(when));
343 return None;
344 }
345 }
346 }
347
348 if !is_biased {
349 // Shuffle the operations for fairness.
350 utils::shuffle(handles);
351 }
352
353 loop {
354 let backoff = Backoff::new();
355 loop {
356 // Check operations for readiness.
357 for &(handle, i, _) in handles.iter() {
358 if handle.is_ready() {
359 return Some(i);
360 }
361 }
362
363 if backoff.is_completed() {
364 break;
365 } else {
366 backoff.snooze();
367 }
368 }
369
370 // Check for timeout.
371 match timeout {
372 Timeout::Now => return None,
373 Timeout::Never => {}
374 Timeout::At(when) => {
375 if Instant::now() >= when {
376 return None;
377 }
378 }
379 }
380
381 // Prepare for blocking.
382 let res = Context::with(|cx| {
383 let mut sel = Selected::Waiting;
384 let mut registered_count = 0;
385
386 // Begin watching all operations.
387 for (handle, _, _) in handles.iter_mut() {
388 registered_count += 1;
389 let oper = Operation::hook::<&dyn SelectHandle>(handle);
390
391 // If registration returns `false`, that means the operation has just become ready.
392 if handle.watch(oper, cx) {
393 sel = match cx.try_select(Selected::Operation(oper)) {
394 Ok(()) => Selected::Operation(oper),
395 Err(s) => s,
396 };
397 break;
398 }
399
400 // If another thread has already chosen one of the operations, stop registration.
401 sel = cx.selected();
402 if sel != Selected::Waiting {
403 break;
404 }
405 }
406
407 if sel == Selected::Waiting {
408 // Check with each operation for how long we're allowed to block, and compute the
409 // earliest deadline.
410 let mut deadline: Option<Instant> = match timeout {
411 Timeout::Now => unreachable!(),
412 Timeout::Never => None,
413 Timeout::At(when) => Some(when),
414 };
415 for &(handle, _, _) in handles.iter() {
416 if let Some(x) = handle.deadline() {
417 deadline = deadline.map(|y| x.min(y)).or(Some(x));
418 }
419 }
420
421 // Block the current thread.
422 sel = cx.wait_until(deadline);
423 }
424
425 // Unwatch all operations.
426 for (handle, _, _) in handles.iter_mut().take(registered_count) {
427 handle.unwatch(Operation::hook::<&dyn SelectHandle>(handle));
428 }
429
430 match sel {
431 Selected::Waiting => unreachable!(),
432 Selected::Aborted => {}
433 Selected::Disconnected => {}
434 Selected::Operation(_) => {
435 for (handle, i, _) in handles.iter_mut() {
436 let oper = Operation::hook::<&dyn SelectHandle>(handle);
437 if sel == Selected::Operation(oper) {
438 return Some(*i);
439 }
440 }
441 }
442 }
443
444 None
445 });
446
447 // Return if an operation became ready.
448 if res.is_some() {
449 return res;
450 }
451 }
452}
453
454/// Attempts to select one of the operations without blocking.
455// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
456#[inline]
457pub fn try_select<'a>(
458 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
459 is_biased: bool,
460) -> Result<SelectedOperation<'a>, TrySelectError> {
461 match run_select(handles, Timeout::Now, is_biased) {
462 None => Err(TrySelectError),
463 Some((token, index, ptr)) => Ok(SelectedOperation {
464 token,
465 index,
466 ptr,
467 _marker: PhantomData,
468 }),
469 }
470}
471
472/// Blocks until one of the operations becomes ready and selects it.
473// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
474#[inline]
475pub fn select<'a>(
476 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
477 is_biased: bool,
478) -> SelectedOperation<'a> {
479 if handles.is_empty() {
480 panic!("no operations have been added to `Select`");
481 }
482
483 let (token, index, ptr) = run_select(handles, Timeout::Never, is_biased).unwrap();
484 SelectedOperation {
485 token,
486 index,
487 ptr,
488 _marker: PhantomData,
489 }
490}
491
492/// Blocks for a limited time until one of the operations becomes ready and selects it.
493// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
494#[inline]
495pub fn select_timeout<'a>(
496 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
497 timeout: Duration,
498 is_biased: bool,
499) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
500 match Instant::now().checked_add(timeout) {
501 Some(deadline) => select_deadline(handles, deadline, is_biased),
502 None => Ok(select(handles, is_biased)),
503 }
504}
505
506/// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
507#[inline]
508pub(crate) fn select_deadline<'a>(
509 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
510 deadline: Instant,
511 is_biased: bool,
512) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
513 match run_select(handles, Timeout::At(deadline), is_biased) {
514 None => Err(SelectTimeoutError),
515 Some((token, index, ptr)) => Ok(SelectedOperation {
516 token,
517 index,
518 ptr,
519 _marker: PhantomData,
520 }),
521 }
522}
523
524/// Selects from a set of channel operations.
525///
526/// `Select` allows you to define a set of channel operations, wait until any one of them becomes
527/// ready, and finally execute it. If multiple operations are ready at the same time, a random one
528/// among them is selected.
529///
530/// An operation is considered to be ready if it doesn't have to block. Note that it is ready even
531/// when it will simply return an error because the channel is disconnected.
532///
533/// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a
534/// dynamically created list of channel operations.
535///
536/// [`select!`]: crate::select!
537///
538/// Once a list of operations has been built with `Select`, there are two different ways of
539/// proceeding:
540///
541/// * Select an operation with [`try_select`], [`select`], or [`select_timeout`]. If successful,
542/// the returned selected operation has already begun and **must** be completed. If we don't
543/// complete it, a panic will occur.
544///
545/// * Wait for an operation to become ready with [`try_ready`], [`ready`], or [`ready_timeout`]. If
546/// successful, we may attempt to execute the operation, but are not obliged to. In fact, it's
547/// possible for another thread to make the operation not ready just before we try executing it,
548/// so it's wise to use a retry loop. However, note that these methods might return with success
549/// spuriously, so it's a good idea to always double check if the operation is really ready.
550///
551/// # Examples
552///
553/// Use [`select`] to receive a message from a list of receivers:
554///
555/// ```
556/// use crossbeam_channel::{Receiver, RecvError, Select};
557///
558/// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
559/// // Build a list of operations.
560/// let mut sel = Select::new();
561/// for r in rs {
562/// sel.recv(r);
563/// }
564///
565/// // Complete the selected operation.
566/// let oper = sel.select();
567/// let index = oper.index();
568/// oper.recv(&rs[index])
569/// }
570/// ```
571///
572/// Use [`ready`] to receive a message from a list of receivers:
573///
574/// ```
575/// use crossbeam_channel::{Receiver, RecvError, Select};
576///
577/// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
578/// // Build a list of operations.
579/// let mut sel = Select::new();
580/// for r in rs {
581/// sel.recv(r);
582/// }
583///
584/// loop {
585/// // Wait until a receive operation becomes ready and try executing it.
586/// let index = sel.ready();
587/// let res = rs[index].try_recv();
588///
589/// // If the operation turns out not to be ready, retry.
590/// if let Err(e) = res {
591/// if e.is_empty() {
592/// continue;
593/// }
594/// }
595///
596/// // Success!
597/// return res.map_err(|_| RecvError);
598/// }
599/// }
600/// ```
601///
602/// [`try_select`]: Select::try_select
603/// [`select`]: Select::select
604/// [`select_timeout`]: Select::select_timeout
605/// [`try_ready`]: Select::try_ready
606/// [`ready`]: Select::ready
607/// [`ready_timeout`]: Select::ready_timeout
608pub struct Select<'a> {
609 /// A list of senders and receivers participating in selection.
610 handles: Vec<(&'a dyn SelectHandle, usize, *const u8)>,
611
612 /// The next index to assign to an operation.
613 next_index: usize,
614
615 /// Whether to use the index of handles as bias for selecting ready operations.
616 biased: bool,
617}
618
619unsafe impl Send for Select<'_> {}
620unsafe impl Sync for Select<'_> {}
621
622impl<'a> Select<'a> {
623 /// Creates an empty list of channel operations for selection.
624 ///
625 /// # Examples
626 ///
627 /// ```
628 /// use crossbeam_channel::Select;
629 ///
630 /// let mut sel = Select::new();
631 ///
632 /// // The list of operations is empty, which means no operation can be selected.
633 /// assert!(sel.try_select().is_err());
634 /// ```
635 pub fn new() -> Select<'a> {
636 Select {
637 handles: Vec::with_capacity(4),
638 next_index: 0,
639 biased: false,
640 }
641 }
642
643 /// Creates an empty list of channel operations with biased selection.
644 ///
645 /// When multiple handles are ready, this will select the operation with the lowest index.
646 ///
647 /// # Examples
648 ///
649 /// ```
650 /// use crossbeam_channel::Select;
651 ///
652 /// let mut sel = Select::new_biased();
653 ///
654 /// // The list of operations is empty, which means no operation can be selected.
655 /// assert!(sel.try_select().is_err());
656 /// ```
657 pub fn new_biased() -> Self {
658 Self {
659 biased: true,
660 ..Default::default()
661 }
662 }
663
664 /// Adds a send operation.
665 ///
666 /// Returns the index of the added operation.
667 ///
668 /// # Examples
669 ///
670 /// ```
671 /// use crossbeam_channel::{unbounded, Select};
672 ///
673 /// let (s, r) = unbounded::<i32>();
674 ///
675 /// let mut sel = Select::new();
676 /// let index = sel.send(&s);
677 /// ```
678 pub fn send<T>(&mut self, s: &'a Sender<T>) -> usize {
679 let i = self.next_index;
680 let ptr = s as *const Sender<_> as *const u8;
681 self.handles.push((s, i, ptr));
682 self.next_index += 1;
683 i
684 }
685
686 /// Adds a receive operation.
687 ///
688 /// Returns the index of the added operation.
689 ///
690 /// # Examples
691 ///
692 /// ```
693 /// use crossbeam_channel::{unbounded, Select};
694 ///
695 /// let (s, r) = unbounded::<i32>();
696 ///
697 /// let mut sel = Select::new();
698 /// let index = sel.recv(&r);
699 /// ```
700 pub fn recv<T>(&mut self, r: &'a Receiver<T>) -> usize {
701 let i = self.next_index;
702 let ptr = r as *const Receiver<_> as *const u8;
703 self.handles.push((r, i, ptr));
704 self.next_index += 1;
705 i
706 }
707
708 /// Removes a previously added operation.
709 ///
710 /// This is useful when an operation is selected because the channel got disconnected and we
711 /// want to try again to select a different operation instead.
712 ///
713 /// If new operations are added after removing some, the indices of removed operations will not
714 /// be reused.
715 ///
716 /// # Panics
717 ///
718 /// An attempt to remove a non-existing or already removed operation will panic.
719 ///
720 /// # Examples
721 ///
722 /// ```
723 /// use crossbeam_channel::{unbounded, Select};
724 ///
725 /// let (s1, r1) = unbounded::<i32>();
726 /// let (_, r2) = unbounded::<i32>();
727 ///
728 /// let mut sel = Select::new();
729 /// let oper1 = sel.recv(&r1);
730 /// let oper2 = sel.recv(&r2);
731 ///
732 /// // Both operations are initially ready, so a random one will be executed.
733 /// let oper = sel.select();
734 /// assert_eq!(oper.index(), oper2);
735 /// assert!(oper.recv(&r2).is_err());
736 /// sel.remove(oper2);
737 ///
738 /// s1.send(10).unwrap();
739 ///
740 /// let oper = sel.select();
741 /// assert_eq!(oper.index(), oper1);
742 /// assert_eq!(oper.recv(&r1), Ok(10));
743 /// ```
744 pub fn remove(&mut self, index: usize) {
745 assert!(
746 index < self.next_index,
747 "index out of bounds; {} >= {}",
748 index,
749 self.next_index,
750 );
751
752 let i = self
753 .handles
754 .iter()
755 .enumerate()
756 .find(|(_, (_, i, _))| *i == index)
757 .expect("no operation with this index")
758 .0;
759
760 self.handles.swap_remove(i);
761 }
762
763 /// Attempts to select one of the operations without blocking.
764 ///
765 /// If an operation is ready, it is selected and returned. If multiple operations are ready at
766 /// the same time, a random one among them is selected. If none of the operations are ready, an
767 /// error is returned.
768 ///
769 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
770 /// even when it will simply return an error because the channel is disconnected.
771 ///
772 /// The selected operation must be completed with [`SelectedOperation::send`]
773 /// or [`SelectedOperation::recv`].
774 ///
775 /// # Examples
776 ///
777 /// ```
778 /// use crossbeam_channel::{unbounded, Select};
779 ///
780 /// let (s1, r1) = unbounded();
781 /// let (s2, r2) = unbounded();
782 ///
783 /// s1.send(10).unwrap();
784 /// s2.send(20).unwrap();
785 ///
786 /// let mut sel = Select::new();
787 /// let oper1 = sel.recv(&r1);
788 /// let oper2 = sel.recv(&r2);
789 ///
790 /// // Both operations are initially ready, so a random one will be executed.
791 /// let oper = sel.try_select();
792 /// match oper {
793 /// Err(_) => panic!("both operations should be ready"),
794 /// Ok(oper) => match oper.index() {
795 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
796 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
797 /// _ => unreachable!(),
798 /// }
799 /// }
800 /// ```
801 pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> {
802 try_select(&mut self.handles, self.biased)
803 }
804
805 /// Blocks until one of the operations becomes ready and selects it.
806 ///
807 /// Once an operation becomes ready, it is selected and returned. If multiple operations are
808 /// ready at the same time, a random one among them is selected.
809 ///
810 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
811 /// even when it will simply return an error because the channel is disconnected.
812 ///
813 /// The selected operation must be completed with [`SelectedOperation::send`]
814 /// or [`SelectedOperation::recv`].
815 ///
816 /// # Panics
817 ///
818 /// Panics if no operations have been added to `Select`.
819 ///
820 /// # Examples
821 ///
822 /// ```
823 /// use std::thread;
824 /// use std::time::Duration;
825 /// use crossbeam_channel::{unbounded, Select};
826 ///
827 /// let (s1, r1) = unbounded();
828 /// let (s2, r2) = unbounded();
829 ///
830 /// thread::spawn(move || {
831 /// thread::sleep(Duration::from_secs(1));
832 /// s1.send(10).unwrap();
833 /// });
834 /// thread::spawn(move || s2.send(20).unwrap());
835 ///
836 /// let mut sel = Select::new();
837 /// let oper1 = sel.recv(&r1);
838 /// let oper2 = sel.recv(&r2);
839 ///
840 /// // The second operation will be selected because it becomes ready first.
841 /// let oper = sel.select();
842 /// match oper.index() {
843 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
844 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
845 /// _ => unreachable!(),
846 /// }
847 /// ```
848 pub fn select(&mut self) -> SelectedOperation<'a> {
849 select(&mut self.handles, self.biased)
850 }
851
852 /// Blocks for a limited time until one of the operations becomes ready and selects it.
853 ///
854 /// If an operation becomes ready, it is selected and returned. If multiple operations are
855 /// ready at the same time, a random one among them is selected. If none of the operations
856 /// become ready for the specified duration, an error is returned.
857 ///
858 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
859 /// even when it will simply return an error because the channel is disconnected.
860 ///
861 /// The selected operation must be completed with [`SelectedOperation::send`]
862 /// or [`SelectedOperation::recv`].
863 ///
864 /// # Examples
865 ///
866 /// ```
867 /// use std::thread;
868 /// use std::time::Duration;
869 /// use crossbeam_channel::{unbounded, Select};
870 ///
871 /// let (s1, r1) = unbounded();
872 /// let (s2, r2) = unbounded();
873 ///
874 /// thread::spawn(move || {
875 /// thread::sleep(Duration::from_secs(1));
876 /// s1.send(10).unwrap();
877 /// });
878 /// thread::spawn(move || s2.send(20).unwrap());
879 ///
880 /// let mut sel = Select::new();
881 /// let oper1 = sel.recv(&r1);
882 /// let oper2 = sel.recv(&r2);
883 ///
884 /// // The second operation will be selected because it becomes ready first.
885 /// let oper = sel.select_timeout(Duration::from_millis(500));
886 /// match oper {
887 /// Err(_) => panic!("should not have timed out"),
888 /// Ok(oper) => match oper.index() {
889 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
890 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
891 /// _ => unreachable!(),
892 /// }
893 /// }
894 /// ```
895 pub fn select_timeout(
896 &mut self,
897 timeout: Duration,
898 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
899 select_timeout(&mut self.handles, timeout, self.biased)
900 }
901
902 /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
903 ///
904 /// If an operation becomes ready, it is selected and returned. If multiple operations are
905 /// ready at the same time, a random one among them is selected. If none of the operations
906 /// become ready before the given deadline, an error is returned.
907 ///
908 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
909 /// even when it will simply return an error because the channel is disconnected.
910 ///
911 /// The selected operation must be completed with [`SelectedOperation::send`]
912 /// or [`SelectedOperation::recv`].
913 ///
914 /// # Examples
915 ///
916 /// ```
917 /// use std::thread;
918 /// use std::time::{Instant, Duration};
919 /// use crossbeam_channel::{unbounded, Select};
920 ///
921 /// let (s1, r1) = unbounded();
922 /// let (s2, r2) = unbounded();
923 ///
924 /// thread::spawn(move || {
925 /// thread::sleep(Duration::from_secs(1));
926 /// s1.send(10).unwrap();
927 /// });
928 /// thread::spawn(move || s2.send(20).unwrap());
929 ///
930 /// let mut sel = Select::new();
931 /// let oper1 = sel.recv(&r1);
932 /// let oper2 = sel.recv(&r2);
933 ///
934 /// let deadline = Instant::now() + Duration::from_millis(500);
935 ///
936 /// // The second operation will be selected because it becomes ready first.
937 /// let oper = sel.select_deadline(deadline);
938 /// match oper {
939 /// Err(_) => panic!("should not have timed out"),
940 /// Ok(oper) => match oper.index() {
941 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
942 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
943 /// _ => unreachable!(),
944 /// }
945 /// }
946 /// ```
947 pub fn select_deadline(
948 &mut self,
949 deadline: Instant,
950 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
951 select_deadline(&mut self.handles, deadline, self.biased)
952 }
953
954 /// Attempts to find a ready operation without blocking.
955 ///
956 /// If an operation is ready, its index is returned. If multiple operations are ready at the
957 /// same time, a random one among them is chosen. If none of the operations are ready, an error
958 /// is returned.
959 ///
960 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
961 /// even when it will simply return an error because the channel is disconnected.
962 ///
963 /// Note that this method might return with success spuriously, so it's a good idea to always
964 /// double check if the operation is really ready.
965 ///
966 /// # Examples
967 ///
968 /// ```
969 /// use crossbeam_channel::{unbounded, Select};
970 ///
971 /// let (s1, r1) = unbounded();
972 /// let (s2, r2) = unbounded();
973 ///
974 /// s1.send(10).unwrap();
975 /// s2.send(20).unwrap();
976 ///
977 /// let mut sel = Select::new();
978 /// let oper1 = sel.recv(&r1);
979 /// let oper2 = sel.recv(&r2);
980 ///
981 /// // Both operations are initially ready, so a random one will be chosen.
982 /// match sel.try_ready() {
983 /// Err(_) => panic!("both operations should be ready"),
984 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
985 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
986 /// Ok(_) => unreachable!(),
987 /// }
988 /// ```
989 pub fn try_ready(&mut self) -> Result<usize, TryReadyError> {
990 match run_ready(&mut self.handles, Timeout::Now, self.biased) {
991 None => Err(TryReadyError),
992 Some(index) => Ok(index),
993 }
994 }
995
996 /// Blocks until one of the operations becomes ready.
997 ///
998 /// Once an operation becomes ready, its index is returned. If multiple operations are ready at
999 /// the same time, a random one among them is chosen.
1000 ///
1001 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1002 /// even when it will simply return an error because the channel is disconnected.
1003 ///
1004 /// Note that this method might return with success spuriously, so it's a good idea to always
1005 /// double check if the operation is really ready.
1006 ///
1007 /// # Panics
1008 ///
1009 /// Panics if no operations have been added to `Select`.
1010 ///
1011 /// # Examples
1012 ///
1013 /// ```
1014 /// use std::thread;
1015 /// use std::time::Duration;
1016 /// use crossbeam_channel::{unbounded, Select};
1017 ///
1018 /// let (s1, r1) = unbounded();
1019 /// let (s2, r2) = unbounded();
1020 ///
1021 /// thread::spawn(move || {
1022 /// thread::sleep(Duration::from_secs(1));
1023 /// s1.send(10).unwrap();
1024 /// });
1025 /// thread::spawn(move || s2.send(20).unwrap());
1026 ///
1027 /// let mut sel = Select::new();
1028 /// let oper1 = sel.recv(&r1);
1029 /// let oper2 = sel.recv(&r2);
1030 ///
1031 /// // The second operation will be selected because it becomes ready first.
1032 /// match sel.ready() {
1033 /// i if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1034 /// i if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1035 /// _ => unreachable!(),
1036 /// }
1037 /// ```
1038 pub fn ready(&mut self) -> usize {
1039 if self.handles.is_empty() {
1040 panic!("no operations have been added to `Select`");
1041 }
1042
1043 run_ready(&mut self.handles, Timeout::Never, self.biased).unwrap()
1044 }
1045
1046 /// Blocks for a limited time until one of the operations becomes ready.
1047 ///
1048 /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1049 /// the same time, a random one among them is chosen. If none of the operations become ready
1050 /// for the specified duration, an error is returned.
1051 ///
1052 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1053 /// even when it will simply return an error because the channel is disconnected.
1054 ///
1055 /// Note that this method might return with success spuriously, so it's a good idea to double
1056 /// check if the operation is really ready.
1057 ///
1058 /// # Examples
1059 ///
1060 /// ```
1061 /// use std::thread;
1062 /// use std::time::Duration;
1063 /// use crossbeam_channel::{unbounded, Select};
1064 ///
1065 /// let (s1, r1) = unbounded();
1066 /// let (s2, r2) = unbounded();
1067 ///
1068 /// thread::spawn(move || {
1069 /// thread::sleep(Duration::from_secs(1));
1070 /// s1.send(10).unwrap();
1071 /// });
1072 /// thread::spawn(move || s2.send(20).unwrap());
1073 ///
1074 /// let mut sel = Select::new();
1075 /// let oper1 = sel.recv(&r1);
1076 /// let oper2 = sel.recv(&r2);
1077 ///
1078 /// // The second operation will be selected because it becomes ready first.
1079 /// match sel.ready_timeout(Duration::from_millis(500)) {
1080 /// Err(_) => panic!("should not have timed out"),
1081 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1082 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1083 /// Ok(_) => unreachable!(),
1084 /// }
1085 /// ```
1086 pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> {
1087 match Instant::now().checked_add(timeout) {
1088 Some(deadline) => self.ready_deadline(deadline),
1089 None => Ok(self.ready()),
1090 }
1091 }
1092
1093 /// Blocks until a given deadline, or until one of the operations becomes ready.
1094 ///
1095 /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1096 /// the same time, a random one among them is chosen. If none of the operations become ready
1097 /// before the deadline, an error is returned.
1098 ///
1099 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1100 /// even when it will simply return an error because the channel is disconnected.
1101 ///
1102 /// Note that this method might return with success spuriously, so it's a good idea to double
1103 /// check if the operation is really ready.
1104 ///
1105 /// # Examples
1106 ///
1107 /// ```
1108 /// use std::thread;
1109 /// use std::time::{Duration, Instant};
1110 /// use crossbeam_channel::{unbounded, Select};
1111 ///
1112 /// let deadline = Instant::now() + Duration::from_millis(500);
1113 ///
1114 /// let (s1, r1) = unbounded();
1115 /// let (s2, r2) = unbounded();
1116 ///
1117 /// thread::spawn(move || {
1118 /// thread::sleep(Duration::from_secs(1));
1119 /// s1.send(10).unwrap();
1120 /// });
1121 /// thread::spawn(move || s2.send(20).unwrap());
1122 ///
1123 /// let mut sel = Select::new();
1124 /// let oper1 = sel.recv(&r1);
1125 /// let oper2 = sel.recv(&r2);
1126 ///
1127 /// // The second operation will be selected because it becomes ready first.
1128 /// match sel.ready_deadline(deadline) {
1129 /// Err(_) => panic!("should not have timed out"),
1130 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1131 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1132 /// Ok(_) => unreachable!(),
1133 /// }
1134 /// ```
1135 pub fn ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError> {
1136 match run_ready(&mut self.handles, Timeout::At(deadline), self.biased) {
1137 None => Err(ReadyTimeoutError),
1138 Some(index) => Ok(index),
1139 }
1140 }
1141}
1142
1143impl<'a> Clone for Select<'a> {
1144 fn clone(&self) -> Select<'a> {
1145 Select {
1146 handles: self.handles.clone(),
1147 next_index: self.next_index,
1148 biased: self.biased,
1149 }
1150 }
1151}
1152
1153impl<'a> Default for Select<'a> {
1154 fn default() -> Select<'a> {
1155 Select::new()
1156 }
1157}
1158
1159impl fmt::Debug for Select<'_> {
1160 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1161 f.pad("Select { .. }")
1162 }
1163}
1164
1165/// A selected operation that needs to be completed.
1166///
1167/// To complete the operation, call [`send`] or [`recv`].
1168///
1169/// # Panics
1170///
1171/// Forgetting to complete the operation is an error and might lead to deadlocks. If a
1172/// `SelectedOperation` is dropped without completion, a panic occurs.
1173///
1174/// [`send`]: SelectedOperation::send
1175/// [`recv`]: SelectedOperation::recv
1176#[must_use]
1177pub struct SelectedOperation<'a> {
1178 /// Token needed to complete the operation.
1179 token: Token,
1180
1181 /// The index of the selected operation.
1182 index: usize,
1183
1184 /// The address of the selected `Sender` or `Receiver`.
1185 ptr: *const u8,
1186
1187 /// Indicates that `Sender`s and `Receiver`s are borrowed.
1188 _marker: PhantomData<&'a ()>,
1189}
1190
1191impl SelectedOperation<'_> {
1192 /// Returns the index of the selected operation.
1193 ///
1194 /// # Examples
1195 ///
1196 /// ```
1197 /// use crossbeam_channel::{bounded, Select};
1198 ///
1199 /// let (s1, r1) = bounded::<()>(0);
1200 /// let (s2, r2) = bounded::<()>(0);
1201 /// let (s3, r3) = bounded::<()>(1);
1202 ///
1203 /// let mut sel = Select::new();
1204 /// let oper1 = sel.send(&s1);
1205 /// let oper2 = sel.recv(&r2);
1206 /// let oper3 = sel.send(&s3);
1207 ///
1208 /// // Only the last operation is ready.
1209 /// let oper = sel.select();
1210 /// assert_eq!(oper.index(), 2);
1211 /// assert_eq!(oper.index(), oper3);
1212 ///
1213 /// // Complete the operation.
1214 /// oper.send(&s3, ()).unwrap();
1215 /// ```
1216 pub fn index(&self) -> usize {
1217 self.index
1218 }
1219
1220 /// Completes the send operation.
1221 ///
1222 /// The passed [`Sender`] reference must be the same one that was used in [`Select::send`]
1223 /// when the operation was added.
1224 ///
1225 /// # Panics
1226 ///
1227 /// Panics if an incorrect [`Sender`] reference is passed.
1228 ///
1229 /// # Examples
1230 ///
1231 /// ```
1232 /// use crossbeam_channel::{bounded, Select, SendError};
1233 ///
1234 /// let (s, r) = bounded::<i32>(0);
1235 /// drop(r);
1236 ///
1237 /// let mut sel = Select::new();
1238 /// let oper1 = sel.send(&s);
1239 ///
1240 /// let oper = sel.select();
1241 /// assert_eq!(oper.index(), oper1);
1242 /// assert_eq!(oper.send(&s, 10), Err(SendError(10)));
1243 /// ```
1244 pub fn send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>> {
1245 assert!(
1246 s as *const Sender<T> as *const u8 == self.ptr,
1247 "passed a sender that wasn't selected",
1248 );
1249 let res = unsafe { channel::write(s, &mut self.token, msg) };
1250 mem::forget(self);
1251 res.map_err(SendError)
1252 }
1253
1254 /// Completes the receive operation.
1255 ///
1256 /// The passed [`Receiver`] reference must be the same one that was used in [`Select::recv`]
1257 /// when the operation was added.
1258 ///
1259 /// # Panics
1260 ///
1261 /// Panics if an incorrect [`Receiver`] reference is passed.
1262 ///
1263 /// # Examples
1264 ///
1265 /// ```
1266 /// use crossbeam_channel::{bounded, Select, RecvError};
1267 ///
1268 /// let (s, r) = bounded::<i32>(0);
1269 /// drop(s);
1270 ///
1271 /// let mut sel = Select::new();
1272 /// let oper1 = sel.recv(&r);
1273 ///
1274 /// let oper = sel.select();
1275 /// assert_eq!(oper.index(), oper1);
1276 /// assert_eq!(oper.recv(&r), Err(RecvError));
1277 /// ```
1278 pub fn recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError> {
1279 assert!(
1280 r as *const Receiver<T> as *const u8 == self.ptr,
1281 "passed a receiver that wasn't selected",
1282 );
1283 let res = unsafe { channel::read(r, &mut self.token) };
1284 mem::forget(self);
1285 res.map_err(|_| RecvError)
1286 }
1287}
1288
1289impl fmt::Debug for SelectedOperation<'_> {
1290 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1291 f.pad("SelectedOperation { .. }")
1292 }
1293}
1294
1295impl Drop for SelectedOperation<'_> {
1296 fn drop(&mut self) {
1297 panic!("dropped `SelectedOperation` without completing the operation");
1298 }
1299}