1use std::{
29 cell::RefCell,
30 collections::BinaryHeap,
31 rc::Rc,
32 task::Waker,
33 time::{Duration, Instant},
34};
35
36use crate::{EventSource, LoopHandle, Poll, PostAction, Readiness, Token, TokenFactory};
37
38#[derive(Debug)]
39struct Registration {
40 token: Token,
41 wheel: Rc<RefCell<TimerWheel>>,
42 counter: u32,
43}
44
45#[derive(Debug)]
51pub struct Timer {
52 registration: Option<Registration>,
53 deadline: Option<Instant>,
54}
55
56impl Timer {
57 pub fn immediate() -> Timer {
59 Self::from_deadline(Instant::now())
60 }
61
62 pub fn from_duration(duration: Duration) -> Timer {
64 Self::from_deadline_inner(Instant::now().checked_add(duration))
65 }
66
67 pub fn from_deadline(deadline: Instant) -> Timer {
69 Self::from_deadline_inner(Some(deadline))
70 }
71
72 fn from_deadline_inner(deadline: Option<Instant>) -> Timer {
73 Timer {
74 registration: None,
75 deadline,
76 }
77 }
78
79 pub fn set_deadline(&mut self, deadline: Instant) {
84 self.deadline = Some(deadline);
85 }
86
87 pub fn set_duration(&mut self, duration: Duration) {
92 self.deadline = Instant::now().checked_add(duration);
93 }
94
95 pub fn current_deadline(&self) -> Option<Instant> {
99 self.deadline
100 }
101}
102
103impl EventSource for Timer {
104 type Event = Instant;
105 type Metadata = ();
106 type Ret = TimeoutAction;
107 type Error = std::io::Error;
108
109 fn process_events<F>(
110 &mut self,
111 _: Readiness,
112 token: Token,
113 mut callback: F,
114 ) -> Result<PostAction, Self::Error>
115 where
116 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
117 {
118 if let (Some(ref registration), Some(ref deadline)) = (&self.registration, &self.deadline) {
119 if registration.token != token {
120 return Ok(PostAction::Continue);
121 }
122 let new_deadline = match callback(*deadline, &mut ()) {
123 TimeoutAction::Drop => return Ok(PostAction::Remove),
124 TimeoutAction::ToInstant(instant) => instant,
125 TimeoutAction::ToDuration(duration) => match Instant::now().checked_add(duration) {
126 Some(new_deadline) => new_deadline,
127 None => {
128 self.deadline = None;
130 return Ok(PostAction::Remove);
131 }
132 },
133 };
134 registration.wheel.borrow_mut().insert_reuse(
136 registration.counter,
137 new_deadline,
138 registration.token,
139 );
140 self.deadline = Some(new_deadline);
141 }
142 Ok(PostAction::Continue)
143 }
144
145 fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
146 if let Some(deadline) = self.deadline {
148 let wheel = poll.timers.clone();
149 let token = token_factory.token();
150 let counter = wheel.borrow_mut().insert(deadline, token);
151 self.registration = Some(Registration {
152 token,
153 wheel,
154 counter,
155 });
156 }
157
158 Ok(())
159 }
160
161 fn reregister(
162 &mut self,
163 poll: &mut Poll,
164 token_factory: &mut TokenFactory,
165 ) -> crate::Result<()> {
166 self.unregister(poll)?;
167 self.register(poll, token_factory)
168 }
169
170 fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
171 if let Some(registration) = self.registration.take() {
172 poll.timers.borrow_mut().cancel(registration.counter);
173 }
174 Ok(())
175 }
176}
177
178#[derive(Debug)]
180pub enum TimeoutAction {
181 Drop,
183 ToInstant(Instant),
185 ToDuration(Duration),
187}
188
189#[derive(Debug)]
191struct TimeoutData {
192 deadline: Instant,
193 token: RefCell<Option<Token>>,
194 counter: u32,
195}
196
197#[derive(Debug)]
199pub(crate) struct TimerWheel {
200 heap: BinaryHeap<TimeoutData>,
201 counter: u32,
202}
203
204impl TimerWheel {
205 pub(crate) fn new() -> TimerWheel {
206 TimerWheel {
207 heap: BinaryHeap::new(),
208 counter: 0,
209 }
210 }
211
212 pub(crate) fn insert(&mut self, deadline: Instant, token: Token) -> u32 {
213 self.heap.push(TimeoutData {
214 deadline,
215 token: RefCell::new(Some(token)),
216 counter: self.counter,
217 });
218 let ret = self.counter;
219 self.counter += 1;
220 ret
221 }
222
223 pub(crate) fn insert_reuse(&mut self, counter: u32, deadline: Instant, token: Token) {
224 self.heap.push(TimeoutData {
225 deadline,
226 token: RefCell::new(Some(token)),
227 counter,
228 });
229 }
230
231 pub(crate) fn cancel(&mut self, counter: u32) {
232 self.heap
233 .iter()
234 .find(|data| data.counter == counter)
235 .map(|data| data.token.take());
236 }
237
238 pub(crate) fn next_expired(&mut self, now: Instant) -> Option<(u32, Token)> {
239 loop {
240 if let Some(data) = self.heap.peek() {
242 if data.deadline > now {
243 return None;
244 }
245 } else {
248 return None;
249 }
250
251 let data = self.heap.pop().unwrap();
253 if let Some(token) = data.token.into_inner() {
254 return Some((data.counter, token));
255 }
256 }
258 }
259
260 pub(crate) fn next_deadline(&self) -> Option<std::time::Instant> {
261 self.heap.peek().map(|data| data.deadline)
262 }
263}
264
265impl std::cmp::Ord for TimeoutData {
268 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
269 self.deadline.cmp(&other.deadline).reverse()
271 }
272}
273
274impl std::cmp::PartialOrd for TimeoutData {
275 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
276 Some(self.cmp(other))
277 }
278}
279
280impl std::cmp::PartialEq for TimeoutData {
283 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
284 fn eq(&self, other: &Self) -> bool {
285 self.deadline == other.deadline
286 }
287}
288
289impl std::cmp::Eq for TimeoutData {}
290
291pub struct TimeoutFuture {
295 deadline: Option<Instant>,
296 waker: Rc<RefCell<Option<Waker>>>,
297}
298
299impl std::fmt::Debug for TimeoutFuture {
300 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
301 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
302 f.debug_struct("TimeoutFuture")
303 .field("deadline", &self.deadline)
304 .finish_non_exhaustive()
305 }
306}
307
308impl TimeoutFuture {
309 pub fn from_duration<Data>(handle: &LoopHandle<'_, Data>, duration: Duration) -> TimeoutFuture {
311 Self::from_deadline_inner(handle, Instant::now().checked_add(duration))
312 }
313
314 pub fn from_deadline<Data>(handle: &LoopHandle<'_, Data>, deadline: Instant) -> TimeoutFuture {
316 Self::from_deadline_inner(handle, Some(deadline))
317 }
318
319 fn from_deadline_inner<Data>(
321 handle: &LoopHandle<'_, Data>,
322 deadline: Option<Instant>,
323 ) -> TimeoutFuture {
324 let timer = Timer::from_deadline_inner(deadline);
325 let waker = Rc::new(RefCell::new(None::<Waker>));
326 handle
327 .insert_source(timer, {
328 let waker = waker.clone();
329 move |_, &mut (), _| {
330 if let Some(waker) = waker.borrow_mut().clone() {
331 waker.wake()
332 }
333 TimeoutAction::Drop
334 }
335 })
336 .unwrap();
337
338 TimeoutFuture { deadline, waker }
339 }
340}
341
342impl std::future::Future for TimeoutFuture {
343 type Output = ();
344
345 fn poll(
346 self: std::pin::Pin<&mut Self>,
347 cx: &mut std::task::Context<'_>,
348 ) -> std::task::Poll<Self::Output> {
349 match self.deadline {
350 None => return std::task::Poll::Pending,
351
352 Some(deadline) => {
353 if Instant::now() >= deadline {
354 return std::task::Poll::Ready(());
355 }
356 }
357 }
358
359 *self.waker.borrow_mut() = Some(cx.waker().clone());
360 std::task::Poll::Pending
361 }
362}
363
364#[cfg(test)]
365mod tests {
366 use super::*;
367 use crate::*;
368 use std::time::Duration;
369
370 #[test]
371 fn simple_timer() {
372 let mut event_loop = EventLoop::try_new().unwrap();
373
374 let mut dispatched = false;
375
376 event_loop
377 .handle()
378 .insert_source(
379 Timer::from_duration(Duration::from_millis(100)),
380 |_, &mut (), dispatched| {
381 *dispatched = true;
382 TimeoutAction::Drop
383 },
384 )
385 .unwrap();
386
387 event_loop
388 .dispatch(Some(Duration::ZERO), &mut dispatched)
389 .unwrap();
390 assert!(!dispatched);
392
393 event_loop
394 .dispatch(Some(Duration::from_millis(150)), &mut dispatched)
395 .unwrap();
396 assert!(dispatched);
398 }
399
400 #[test]
401 fn simple_timer_instant() {
402 let mut event_loop = EventLoop::try_new().unwrap();
403
404 let mut dispatched = false;
405
406 event_loop
407 .handle()
408 .insert_source(
409 Timer::from_duration(Duration::from_millis(100)),
410 |_, &mut (), dispatched| {
411 *dispatched = true;
412 TimeoutAction::Drop
413 },
414 )
415 .unwrap();
416
417 event_loop
418 .dispatch(Some(Duration::from_millis(150)), &mut dispatched)
419 .unwrap();
420 assert!(dispatched);
422 }
423
424 #[test]
425 fn immediate_timer() {
426 let mut event_loop = EventLoop::try_new().unwrap();
427
428 let mut dispatched = false;
429
430 event_loop
431 .handle()
432 .insert_source(Timer::immediate(), |_, &mut (), dispatched| {
433 *dispatched = true;
434 TimeoutAction::Drop
435 })
436 .unwrap();
437
438 event_loop
439 .dispatch(Some(Duration::ZERO), &mut dispatched)
440 .unwrap();
441 assert!(dispatched);
443 }
444
445 #[test]
449 fn high_precision_timer() {
450 let mut event_loop = EventLoop::try_new().unwrap();
451
452 let mut dispatched = false;
453
454 event_loop
455 .handle()
456 .insert_source(
457 Timer::from_duration(Duration::from_millis(100)),
458 |_, &mut (), dispatched| {
459 *dispatched = true;
460 TimeoutAction::Drop
461 },
462 )
463 .unwrap();
464
465 event_loop
466 .dispatch(Some(Duration::ZERO), &mut dispatched)
467 .unwrap();
468 assert!(!dispatched);
470
471 event_loop
472 .dispatch(Some(Duration::from_micros(10200)), &mut dispatched)
473 .unwrap();
474 assert!(!dispatched);
476
477 event_loop
478 .dispatch(Some(Duration::from_millis(100)), &mut dispatched)
479 .unwrap();
480 assert!(dispatched);
482 }
483
484 #[test]
485 fn cancel_timer() {
486 let mut event_loop = EventLoop::try_new().unwrap();
487
488 let mut dispatched = false;
489
490 let token = event_loop
491 .handle()
492 .insert_source(
493 Timer::from_duration(Duration::from_millis(100)),
494 |_, &mut (), dispatched| {
495 *dispatched = true;
496 TimeoutAction::Drop
497 },
498 )
499 .unwrap();
500
501 event_loop
502 .dispatch(Some(Duration::ZERO), &mut dispatched)
503 .unwrap();
504 assert!(!dispatched);
506
507 event_loop.handle().remove(token);
508
509 event_loop
510 .dispatch(Some(Duration::from_millis(150)), &mut dispatched)
511 .unwrap();
512 assert!(!dispatched);
514 }
515
516 #[test]
517 fn repeating_timer() {
518 let mut event_loop = EventLoop::try_new().unwrap();
519
520 let mut dispatched = 0;
521
522 event_loop
523 .handle()
524 .insert_source(
525 Timer::from_duration(Duration::from_millis(500)),
526 |_, &mut (), dispatched| {
527 *dispatched += 1;
528 TimeoutAction::ToDuration(Duration::from_millis(500))
529 },
530 )
531 .unwrap();
532
533 event_loop
534 .dispatch(Some(Duration::from_millis(250)), &mut dispatched)
535 .unwrap();
536 assert_eq!(dispatched, 0);
537
538 event_loop
539 .dispatch(Some(Duration::from_millis(510)), &mut dispatched)
540 .unwrap();
541 assert_eq!(dispatched, 1);
542
543 event_loop
544 .dispatch(Some(Duration::from_millis(510)), &mut dispatched)
545 .unwrap();
546 assert_eq!(dispatched, 2);
547
548 event_loop
549 .dispatch(Some(Duration::from_millis(510)), &mut dispatched)
550 .unwrap();
551 assert_eq!(dispatched, 3);
552 }
553
554 #[cfg(feature = "executor")]
555 #[test]
556 fn timeout_future() {
557 let mut event_loop = EventLoop::try_new().unwrap();
558
559 let mut dispatched = 0;
560
561 let timeout_1 =
562 TimeoutFuture::from_duration(&event_loop.handle(), Duration::from_millis(500));
563 let timeout_2 =
564 TimeoutFuture::from_duration(&event_loop.handle(), Duration::from_millis(1500));
565 let timeout_3 = TimeoutFuture::from_duration(&event_loop.handle(), Duration::MAX);
567
568 let (exec, sched) = crate::sources::futures::executor().unwrap();
569 event_loop
570 .handle()
571 .insert_source(exec, move |(), &mut (), got| {
572 *got += 1;
573 })
574 .unwrap();
575
576 sched.schedule(timeout_1).unwrap();
577 sched.schedule(timeout_2).unwrap();
578 sched.schedule(timeout_3).unwrap();
579
580 event_loop
584 .dispatch(Some(Duration::ZERO), &mut dispatched)
585 .unwrap();
586 event_loop
587 .dispatch(Some(Duration::ZERO), &mut dispatched)
588 .unwrap();
589 assert_eq!(dispatched, 0);
590
591 event_loop
592 .dispatch(Some(Duration::from_millis(1000)), &mut dispatched)
593 .unwrap();
594 event_loop
595 .dispatch(Some(Duration::ZERO), &mut dispatched)
596 .unwrap();
597 assert_eq!(dispatched, 1);
598
599 event_loop
600 .dispatch(Some(Duration::from_millis(1100)), &mut dispatched)
601 .unwrap();
602 event_loop
603 .dispatch(Some(Duration::ZERO), &mut dispatched)
604 .unwrap();
605 assert_eq!(dispatched, 2);
606 }
607
608 #[test]
609 fn no_overflow() {
610 let mut event_loop = EventLoop::try_new().unwrap();
611
612 let mut dispatched = 0;
613
614 event_loop
615 .handle()
616 .insert_source(
617 Timer::from_duration(Duration::from_millis(500)),
618 |_, &mut (), dispatched| {
619 *dispatched += 1;
620 TimeoutAction::Drop
621 },
622 )
623 .unwrap();
624
625 event_loop
626 .handle()
627 .insert_source(Timer::from_duration(Duration::MAX), |_, &mut (), _| {
628 panic!("This timer should never go off")
629 })
630 .unwrap();
631
632 event_loop
633 .dispatch(Some(Duration::from_millis(250)), &mut dispatched)
634 .unwrap();
635 assert_eq!(dispatched, 0);
636
637 event_loop
638 .dispatch(Some(Duration::from_millis(510)), &mut dispatched)
639 .unwrap();
640 assert_eq!(dispatched, 1);
641
642 event_loop
643 .dispatch(Some(Duration::from_millis(510)), &mut dispatched)
644 .unwrap();
645 assert_eq!(dispatched, 1);
646 }
647}