1use crate::elision::{have_elision, AtomicElisionExt};
9use crate::raw_mutex::{TOKEN_HANDOFF, TOKEN_NORMAL};
10use crate::util;
11use core::{
12 cell::Cell,
13 sync::atomic::{AtomicUsize, Ordering},
14};
15use lock_api::{RawRwLock as RawRwLock_, RawRwLockUpgrade};
16use parking_lot_core::{
17 self, deadlock, FilterOp, ParkResult, ParkToken, SpinWait, UnparkResult, UnparkToken,
18};
19use std::time::{Duration, Instant};
20
21const PARKED_BIT: usize = 0b0001;
36const WRITER_PARKED_BIT: usize = 0b0010;
38const UPGRADABLE_BIT: usize = 0b0100;
41const WRITER_BIT: usize = 0b1000;
44const READERS_MASK: usize = !0b1111;
46const ONE_READER: usize = 0b10000;
48
49const TOKEN_SHARED: ParkToken = ParkToken(ONE_READER);
51const TOKEN_EXCLUSIVE: ParkToken = ParkToken(WRITER_BIT);
52const TOKEN_UPGRADABLE: ParkToken = ParkToken(ONE_READER | UPGRADABLE_BIT);
53
54pub struct RawRwLock {
56 state: AtomicUsize,
57}
58
59unsafe impl lock_api::RawRwLock for RawRwLock {
60 const INIT: RawRwLock = RawRwLock {
61 state: AtomicUsize::new(0),
62 };
63
64 type GuardMarker = crate::GuardMarker;
65
66 #[inline]
67 fn lock_exclusive(&self) {
68 if self
69 .state
70 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
71 .is_err()
72 {
73 let result = self.lock_exclusive_slow(None);
74 debug_assert!(result);
75 }
76 self.deadlock_acquire();
77 }
78
79 #[inline]
80 fn try_lock_exclusive(&self) -> bool {
81 if self
82 .state
83 .compare_exchange(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
84 .is_ok()
85 {
86 self.deadlock_acquire();
87 true
88 } else {
89 false
90 }
91 }
92
93 #[inline]
94 unsafe fn unlock_exclusive(&self) {
95 self.deadlock_release();
96 if self
97 .state
98 .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed)
99 .is_ok()
100 {
101 return;
102 }
103 self.unlock_exclusive_slow(false);
104 }
105
106 #[inline]
107 fn lock_shared(&self) {
108 if !self.try_lock_shared_fast(false) {
109 let result = self.lock_shared_slow(false, None);
110 debug_assert!(result);
111 }
112 self.deadlock_acquire();
113 }
114
115 #[inline]
116 fn try_lock_shared(&self) -> bool {
117 let result = if self.try_lock_shared_fast(false) {
118 true
119 } else {
120 self.try_lock_shared_slow(false)
121 };
122 if result {
123 self.deadlock_acquire();
124 }
125 result
126 }
127
128 #[inline]
129 unsafe fn unlock_shared(&self) {
130 self.deadlock_release();
131 let state = if have_elision() {
132 self.state.elision_fetch_sub_release(ONE_READER)
133 } else {
134 self.state.fetch_sub(ONE_READER, Ordering::Release)
135 };
136 if state & (READERS_MASK | WRITER_PARKED_BIT) == (ONE_READER | WRITER_PARKED_BIT) {
137 self.unlock_shared_slow();
138 }
139 }
140
141 #[inline]
142 fn is_locked(&self) -> bool {
143 let state = self.state.load(Ordering::Relaxed);
144 state & (WRITER_BIT | READERS_MASK) != 0
145 }
146
147 #[inline]
148 fn is_locked_exclusive(&self) -> bool {
149 let state = self.state.load(Ordering::Relaxed);
150 state & (WRITER_BIT) != 0
151 }
152}
153
154unsafe impl lock_api::RawRwLockFair for RawRwLock {
155 #[inline]
156 unsafe fn unlock_shared_fair(&self) {
157 self.unlock_shared();
159 }
160
161 #[inline]
162 unsafe fn unlock_exclusive_fair(&self) {
163 self.deadlock_release();
164 if self
165 .state
166 .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed)
167 .is_ok()
168 {
169 return;
170 }
171 self.unlock_exclusive_slow(true);
172 }
173
174 #[inline]
175 unsafe fn bump_shared(&self) {
176 if self.state.load(Ordering::Relaxed) & WRITER_BIT != 0 {
177 self.bump_shared_slow();
178 }
179 }
180
181 #[inline]
182 unsafe fn bump_exclusive(&self) {
183 if self.state.load(Ordering::Relaxed) & PARKED_BIT != 0 {
184 self.bump_exclusive_slow();
185 }
186 }
187}
188
189unsafe impl lock_api::RawRwLockDowngrade for RawRwLock {
190 #[inline]
191 unsafe fn downgrade(&self) {
192 let state = self
193 .state
194 .fetch_add(ONE_READER - WRITER_BIT, Ordering::Release);
195
196 if state & PARKED_BIT != 0 {
198 self.downgrade_slow();
199 }
200 }
201}
202
203unsafe impl lock_api::RawRwLockTimed for RawRwLock {
204 type Duration = Duration;
205 type Instant = Instant;
206
207 #[inline]
208 fn try_lock_shared_for(&self, timeout: Self::Duration) -> bool {
209 let result = if self.try_lock_shared_fast(false) {
210 true
211 } else {
212 self.lock_shared_slow(false, util::to_deadline(timeout))
213 };
214 if result {
215 self.deadlock_acquire();
216 }
217 result
218 }
219
220 #[inline]
221 fn try_lock_shared_until(&self, timeout: Self::Instant) -> bool {
222 let result = if self.try_lock_shared_fast(false) {
223 true
224 } else {
225 self.lock_shared_slow(false, Some(timeout))
226 };
227 if result {
228 self.deadlock_acquire();
229 }
230 result
231 }
232
233 #[inline]
234 fn try_lock_exclusive_for(&self, timeout: Duration) -> bool {
235 let result = if self
236 .state
237 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
238 .is_ok()
239 {
240 true
241 } else {
242 self.lock_exclusive_slow(util::to_deadline(timeout))
243 };
244 if result {
245 self.deadlock_acquire();
246 }
247 result
248 }
249
250 #[inline]
251 fn try_lock_exclusive_until(&self, timeout: Instant) -> bool {
252 let result = if self
253 .state
254 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
255 .is_ok()
256 {
257 true
258 } else {
259 self.lock_exclusive_slow(Some(timeout))
260 };
261 if result {
262 self.deadlock_acquire();
263 }
264 result
265 }
266}
267
268unsafe impl lock_api::RawRwLockRecursive for RawRwLock {
269 #[inline]
270 fn lock_shared_recursive(&self) {
271 if !self.try_lock_shared_fast(true) {
272 let result = self.lock_shared_slow(true, None);
273 debug_assert!(result);
274 }
275 self.deadlock_acquire();
276 }
277
278 #[inline]
279 fn try_lock_shared_recursive(&self) -> bool {
280 let result = if self.try_lock_shared_fast(true) {
281 true
282 } else {
283 self.try_lock_shared_slow(true)
284 };
285 if result {
286 self.deadlock_acquire();
287 }
288 result
289 }
290}
291
292unsafe impl lock_api::RawRwLockRecursiveTimed for RawRwLock {
293 #[inline]
294 fn try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool {
295 let result = if self.try_lock_shared_fast(true) {
296 true
297 } else {
298 self.lock_shared_slow(true, util::to_deadline(timeout))
299 };
300 if result {
301 self.deadlock_acquire();
302 }
303 result
304 }
305
306 #[inline]
307 fn try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool {
308 let result = if self.try_lock_shared_fast(true) {
309 true
310 } else {
311 self.lock_shared_slow(true, Some(timeout))
312 };
313 if result {
314 self.deadlock_acquire();
315 }
316 result
317 }
318}
319
320unsafe impl lock_api::RawRwLockUpgrade for RawRwLock {
321 #[inline]
322 fn lock_upgradable(&self) {
323 if !self.try_lock_upgradable_fast() {
324 let result = self.lock_upgradable_slow(None);
325 debug_assert!(result);
326 }
327 self.deadlock_acquire();
328 }
329
330 #[inline]
331 fn try_lock_upgradable(&self) -> bool {
332 let result = if self.try_lock_upgradable_fast() {
333 true
334 } else {
335 self.try_lock_upgradable_slow()
336 };
337 if result {
338 self.deadlock_acquire();
339 }
340 result
341 }
342
343 #[inline]
344 unsafe fn unlock_upgradable(&self) {
345 self.deadlock_release();
346 let state = self.state.load(Ordering::Relaxed);
347 #[allow(clippy::collapsible_if)]
348 if state & PARKED_BIT == 0 {
349 if self
350 .state
351 .compare_exchange_weak(
352 state,
353 state - (ONE_READER | UPGRADABLE_BIT),
354 Ordering::Release,
355 Ordering::Relaxed,
356 )
357 .is_ok()
358 {
359 return;
360 }
361 }
362 self.unlock_upgradable_slow(false);
363 }
364
365 #[inline]
366 unsafe fn upgrade(&self) {
367 let state = self.state.fetch_sub(
368 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
369 Ordering::Acquire,
370 );
371 if state & READERS_MASK != ONE_READER {
372 let result = self.upgrade_slow(None);
373 debug_assert!(result);
374 }
375 }
376
377 #[inline]
378 unsafe fn try_upgrade(&self) -> bool {
379 if self
380 .state
381 .compare_exchange_weak(
382 ONE_READER | UPGRADABLE_BIT,
383 WRITER_BIT,
384 Ordering::Acquire,
385 Ordering::Relaxed,
386 )
387 .is_ok()
388 {
389 true
390 } else {
391 self.try_upgrade_slow()
392 }
393 }
394}
395
396unsafe impl lock_api::RawRwLockUpgradeFair for RawRwLock {
397 #[inline]
398 unsafe fn unlock_upgradable_fair(&self) {
399 self.deadlock_release();
400 let state = self.state.load(Ordering::Relaxed);
401 #[allow(clippy::collapsible_if)]
402 if state & PARKED_BIT == 0 {
403 if self
404 .state
405 .compare_exchange_weak(
406 state,
407 state - (ONE_READER | UPGRADABLE_BIT),
408 Ordering::Release,
409 Ordering::Relaxed,
410 )
411 .is_ok()
412 {
413 return;
414 }
415 }
416 self.unlock_upgradable_slow(false);
417 }
418
419 #[inline]
420 unsafe fn bump_upgradable(&self) {
421 if self.state.load(Ordering::Relaxed) & PARKED_BIT != 0 {
422 self.bump_upgradable_slow();
423 }
424 }
425}
426
427unsafe impl lock_api::RawRwLockUpgradeDowngrade for RawRwLock {
428 #[inline]
429 unsafe fn downgrade_upgradable(&self) {
430 let state = self.state.fetch_sub(UPGRADABLE_BIT, Ordering::Relaxed);
431
432 if state & PARKED_BIT != 0 {
434 self.downgrade_slow();
435 }
436 }
437
438 #[inline]
439 unsafe fn downgrade_to_upgradable(&self) {
440 let state = self.state.fetch_add(
441 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
442 Ordering::Release,
443 );
444
445 if state & PARKED_BIT != 0 {
447 self.downgrade_to_upgradable_slow();
448 }
449 }
450}
451
452unsafe impl lock_api::RawRwLockUpgradeTimed for RawRwLock {
453 #[inline]
454 fn try_lock_upgradable_until(&self, timeout: Instant) -> bool {
455 let result = if self.try_lock_upgradable_fast() {
456 true
457 } else {
458 self.lock_upgradable_slow(Some(timeout))
459 };
460 if result {
461 self.deadlock_acquire();
462 }
463 result
464 }
465
466 #[inline]
467 fn try_lock_upgradable_for(&self, timeout: Duration) -> bool {
468 let result = if self.try_lock_upgradable_fast() {
469 true
470 } else {
471 self.lock_upgradable_slow(util::to_deadline(timeout))
472 };
473 if result {
474 self.deadlock_acquire();
475 }
476 result
477 }
478
479 #[inline]
480 unsafe fn try_upgrade_until(&self, timeout: Instant) -> bool {
481 let state = self.state.fetch_sub(
482 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
483 Ordering::Relaxed,
484 );
485 if state & READERS_MASK == ONE_READER {
486 true
487 } else {
488 self.upgrade_slow(Some(timeout))
489 }
490 }
491
492 #[inline]
493 unsafe fn try_upgrade_for(&self, timeout: Duration) -> bool {
494 let state = self.state.fetch_sub(
495 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
496 Ordering::Relaxed,
497 );
498 if state & READERS_MASK == ONE_READER {
499 true
500 } else {
501 self.upgrade_slow(util::to_deadline(timeout))
502 }
503 }
504}
505
506impl RawRwLock {
507 #[inline(always)]
508 fn try_lock_shared_fast(&self, recursive: bool) -> bool {
509 let state = self.state.load(Ordering::Relaxed);
510
511 if state & WRITER_BIT != 0 {
514 if !recursive || state & READERS_MASK == 0 {
518 return false;
519 }
520 }
521
522 if have_elision() && state == 0 {
526 self.state
527 .elision_compare_exchange_acquire(0, ONE_READER)
528 .is_ok()
529 } else if let Some(new_state) = state.checked_add(ONE_READER) {
530 self.state
531 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed)
532 .is_ok()
533 } else {
534 false
535 }
536 }
537
538 #[cold]
539 fn try_lock_shared_slow(&self, recursive: bool) -> bool {
540 let mut state = self.state.load(Ordering::Relaxed);
541 loop {
542 #[allow(clippy::collapsible_if)]
544 if state & WRITER_BIT != 0 {
545 if !recursive || state & READERS_MASK == 0 {
546 return false;
547 }
548 }
549 if have_elision() && state == 0 {
550 match self.state.elision_compare_exchange_acquire(0, ONE_READER) {
551 Ok(_) => return true,
552 Err(x) => state = x,
553 }
554 } else {
555 match self.state.compare_exchange_weak(
556 state,
557 state
558 .checked_add(ONE_READER)
559 .expect("RwLock reader count overflow"),
560 Ordering::Acquire,
561 Ordering::Relaxed,
562 ) {
563 Ok(_) => return true,
564 Err(x) => state = x,
565 }
566 }
567 }
568 }
569
570 #[inline(always)]
571 fn try_lock_upgradable_fast(&self) -> bool {
572 let state = self.state.load(Ordering::Relaxed);
573
574 if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
577 return false;
578 }
579
580 if let Some(new_state) = state.checked_add(ONE_READER | UPGRADABLE_BIT) {
581 self.state
582 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed)
583 .is_ok()
584 } else {
585 false
586 }
587 }
588
589 #[cold]
590 fn try_lock_upgradable_slow(&self) -> bool {
591 let mut state = self.state.load(Ordering::Relaxed);
592 loop {
593 if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
595 return false;
596 }
597
598 match self.state.compare_exchange_weak(
599 state,
600 state
601 .checked_add(ONE_READER | UPGRADABLE_BIT)
602 .expect("RwLock reader count overflow"),
603 Ordering::Acquire,
604 Ordering::Relaxed,
605 ) {
606 Ok(_) => return true,
607 Err(x) => state = x,
608 }
609 }
610 }
611
612 #[cold]
613 fn lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool {
614 let try_lock = |state: &mut usize| {
615 loop {
616 if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
617 return false;
618 }
619
620 match self.state.compare_exchange_weak(
622 *state,
623 *state | WRITER_BIT,
624 Ordering::Acquire,
625 Ordering::Relaxed,
626 ) {
627 Ok(_) => return true,
628 Err(x) => *state = x,
629 }
630 }
631 };
632
633 let timed_out = !self.lock_common(
635 timeout,
636 TOKEN_EXCLUSIVE,
637 try_lock,
638 WRITER_BIT | UPGRADABLE_BIT,
639 );
640 if timed_out {
641 return false;
642 }
643
644 self.wait_for_readers(timeout, 0)
646 }
647
648 #[cold]
649 fn unlock_exclusive_slow(&self, force_fair: bool) {
650 let callback = |mut new_state, result: UnparkResult| {
652 if result.unparked_threads != 0 && (force_fair || result.be_fair) {
655 if result.have_more_threads {
656 new_state |= PARKED_BIT;
657 }
658 self.state.store(new_state, Ordering::Release);
659 TOKEN_HANDOFF
660 } else {
661 if result.have_more_threads {
663 self.state.store(PARKED_BIT, Ordering::Release);
664 } else {
665 self.state.store(0, Ordering::Release);
666 }
667 TOKEN_NORMAL
668 }
669 };
670 unsafe {
672 self.wake_parked_threads(0, callback);
673 }
674 }
675
676 #[cold]
677 fn lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool {
678 let try_lock = |state: &mut usize| {
679 let mut spinwait_shared = SpinWait::new();
680 loop {
681 if have_elision() && *state == 0 {
685 match self.state.elision_compare_exchange_acquire(0, ONE_READER) {
686 Ok(_) => return true,
687 Err(x) => *state = x,
688 }
689 }
690
691 #[allow(clippy::collapsible_if)]
693 if *state & WRITER_BIT != 0 {
694 if !recursive || *state & READERS_MASK == 0 {
695 return false;
696 }
697 }
698
699 if self
700 .state
701 .compare_exchange_weak(
702 *state,
703 state
704 .checked_add(ONE_READER)
705 .expect("RwLock reader count overflow"),
706 Ordering::Acquire,
707 Ordering::Relaxed,
708 )
709 .is_ok()
710 {
711 return true;
712 }
713
714 spinwait_shared.spin_no_yield();
718 *state = self.state.load(Ordering::Relaxed);
719 }
720 };
721 self.lock_common(timeout, TOKEN_SHARED, try_lock, WRITER_BIT)
722 }
723
724 #[cold]
725 fn unlock_shared_slow(&self) {
726 let addr = self as *const _ as usize + 1;
730 let callback = |_result: UnparkResult| {
731 self.state.fetch_and(!WRITER_PARKED_BIT, Ordering::Relaxed);
734 TOKEN_NORMAL
735 };
736 unsafe {
740 parking_lot_core::unpark_one(addr, callback);
741 }
742 }
743
744 #[cold]
745 fn lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool {
746 let try_lock = |state: &mut usize| {
747 let mut spinwait_shared = SpinWait::new();
748 loop {
749 if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
750 return false;
751 }
752
753 if self
754 .state
755 .compare_exchange_weak(
756 *state,
757 state
758 .checked_add(ONE_READER | UPGRADABLE_BIT)
759 .expect("RwLock reader count overflow"),
760 Ordering::Acquire,
761 Ordering::Relaxed,
762 )
763 .is_ok()
764 {
765 return true;
766 }
767
768 spinwait_shared.spin_no_yield();
772 *state = self.state.load(Ordering::Relaxed);
773 }
774 };
775 self.lock_common(
776 timeout,
777 TOKEN_UPGRADABLE,
778 try_lock,
779 WRITER_BIT | UPGRADABLE_BIT,
780 )
781 }
782
783 #[cold]
784 fn unlock_upgradable_slow(&self, force_fair: bool) {
785 let mut state = self.state.load(Ordering::Relaxed);
787 while state & PARKED_BIT == 0 {
788 match self.state.compare_exchange_weak(
789 state,
790 state - (ONE_READER | UPGRADABLE_BIT),
791 Ordering::Release,
792 Ordering::Relaxed,
793 ) {
794 Ok(_) => return,
795 Err(x) => state = x,
796 }
797 }
798
799 let callback = |new_state, result: UnparkResult| {
801 let mut state = self.state.load(Ordering::Relaxed);
804 if force_fair || result.be_fair {
805 while let Some(mut new_state) =
808 (state - (ONE_READER | UPGRADABLE_BIT)).checked_add(new_state)
809 {
810 if result.have_more_threads {
811 new_state |= PARKED_BIT;
812 } else {
813 new_state &= !PARKED_BIT;
814 }
815 match self.state.compare_exchange_weak(
816 state,
817 new_state,
818 Ordering::Relaxed,
819 Ordering::Relaxed,
820 ) {
821 Ok(_) => return TOKEN_HANDOFF,
822 Err(x) => state = x,
823 }
824 }
825 }
826
827 loop {
829 let mut new_state = state - (ONE_READER | UPGRADABLE_BIT);
830 if result.have_more_threads {
831 new_state |= PARKED_BIT;
832 } else {
833 new_state &= !PARKED_BIT;
834 }
835 match self.state.compare_exchange_weak(
836 state,
837 new_state,
838 Ordering::Relaxed,
839 Ordering::Relaxed,
840 ) {
841 Ok(_) => return TOKEN_NORMAL,
842 Err(x) => state = x,
843 }
844 }
845 };
846 unsafe {
848 self.wake_parked_threads(0, callback);
849 }
850 }
851
852 #[cold]
853 fn try_upgrade_slow(&self) -> bool {
854 let mut state = self.state.load(Ordering::Relaxed);
855 loop {
856 if state & READERS_MASK != ONE_READER {
857 return false;
858 }
859 match self.state.compare_exchange_weak(
860 state,
861 state - (ONE_READER | UPGRADABLE_BIT) + WRITER_BIT,
862 Ordering::Relaxed,
863 Ordering::Relaxed,
864 ) {
865 Ok(_) => return true,
866 Err(x) => state = x,
867 }
868 }
869 }
870
871 #[cold]
872 fn upgrade_slow(&self, timeout: Option<Instant>) -> bool {
873 self.deadlock_release();
874 let result = self.wait_for_readers(timeout, ONE_READER | UPGRADABLE_BIT);
875 self.deadlock_acquire();
876 result
877 }
878
879 #[cold]
880 fn downgrade_slow(&self) {
881 let callback = |_, result: UnparkResult| {
883 if !result.have_more_threads {
885 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
886 }
887 TOKEN_NORMAL
888 };
889 unsafe {
891 self.wake_parked_threads(ONE_READER, callback);
892 }
893 }
894
895 #[cold]
896 fn downgrade_to_upgradable_slow(&self) {
897 let callback = |_, result: UnparkResult| {
899 if !result.have_more_threads {
901 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
902 }
903 TOKEN_NORMAL
904 };
905 unsafe {
907 self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback);
908 }
909 }
910
911 #[cold]
912 unsafe fn bump_shared_slow(&self) {
913 self.unlock_shared();
914 self.lock_shared();
915 }
916
917 #[cold]
918 fn bump_exclusive_slow(&self) {
919 self.deadlock_release();
920 self.unlock_exclusive_slow(true);
921 self.lock_exclusive();
922 }
923
924 #[cold]
925 fn bump_upgradable_slow(&self) {
926 self.deadlock_release();
927 self.unlock_upgradable_slow(true);
928 self.lock_upgradable();
929 }
930
931 #[inline]
940 unsafe fn wake_parked_threads(
941 &self,
942 new_state: usize,
943 callback: impl FnOnce(usize, UnparkResult) -> UnparkToken,
944 ) {
945 let new_state = Cell::new(new_state);
949 let addr = self as *const _ as usize;
950 let filter = |ParkToken(token)| {
951 let s = new_state.get();
952
953 if s & WRITER_BIT != 0 {
955 return FilterOp::Stop;
956 }
957
958 if token & (UPGRADABLE_BIT | WRITER_BIT) != 0 && s & UPGRADABLE_BIT != 0 {
960 FilterOp::Skip
963 } else {
964 new_state.set(s + token);
965 FilterOp::Unpark
966 }
967 };
968 let callback = |result| callback(new_state.get(), result);
969 parking_lot_core::unpark_filter(addr, filter, callback);
974 }
975
976 #[inline]
979 fn wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool {
980 let mut spinwait = SpinWait::new();
983 let mut state = self.state.load(Ordering::Acquire);
984 while state & READERS_MASK != 0 {
985 if spinwait.spin() {
987 state = self.state.load(Ordering::Acquire);
988 continue;
989 }
990
991 if state & WRITER_PARKED_BIT == 0 {
993 if let Err(x) = self.state.compare_exchange_weak(
994 state,
995 state | WRITER_PARKED_BIT,
996 Ordering::Acquire,
997 Ordering::Acquire,
998 ) {
999 state = x;
1000 continue;
1001 }
1002 }
1003
1004 let addr = self as *const _ as usize + 1;
1007 let validate = || {
1008 let state = self.state.load(Ordering::Relaxed);
1009 state & READERS_MASK != 0 && state & WRITER_PARKED_BIT != 0
1010 };
1011 let before_sleep = || {};
1012 let timed_out = |_, was_last_thread: bool| {
1013 debug_assert!(was_last_thread);
1016 self.state.fetch_and(!WRITER_PARKED_BIT, Ordering::Relaxed);
1017 };
1018 let park_result = unsafe {
1023 parking_lot_core::park(
1024 addr,
1025 validate,
1026 before_sleep,
1027 timed_out,
1028 TOKEN_EXCLUSIVE,
1029 timeout,
1030 )
1031 };
1032 match park_result {
1033 ParkResult::Unparked(_) | ParkResult::Invalid => {
1037 state = self.state.load(Ordering::Acquire);
1038 continue;
1039 }
1040
1041 ParkResult::TimedOut => {
1043 let state = self
1047 .state
1048 .fetch_add(prev_value.wrapping_sub(WRITER_BIT), Ordering::Relaxed);
1049 if state & PARKED_BIT != 0 {
1050 let callback = |_, result: UnparkResult| {
1051 if !result.have_more_threads {
1053 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
1054 }
1055 TOKEN_NORMAL
1056 };
1057 unsafe {
1059 self.wake_parked_threads(prev_value, callback);
1060 }
1061 }
1062 return false;
1063 }
1064 }
1065 }
1066 true
1067 }
1068
1069 #[inline]
1071 fn lock_common(
1072 &self,
1073 timeout: Option<Instant>,
1074 token: ParkToken,
1075 mut try_lock: impl FnMut(&mut usize) -> bool,
1076 validate_flags: usize,
1077 ) -> bool {
1078 let mut spinwait = SpinWait::new();
1079 let mut state = self.state.load(Ordering::Relaxed);
1080 loop {
1081 if try_lock(&mut state) {
1083 return true;
1084 }
1085
1086 if state & (PARKED_BIT | WRITER_PARKED_BIT) == 0 && spinwait.spin() {
1088 state = self.state.load(Ordering::Relaxed);
1089 continue;
1090 }
1091
1092 if state & PARKED_BIT == 0 {
1094 if let Err(x) = self.state.compare_exchange_weak(
1095 state,
1096 state | PARKED_BIT,
1097 Ordering::Relaxed,
1098 Ordering::Relaxed,
1099 ) {
1100 state = x;
1101 continue;
1102 }
1103 }
1104
1105 let addr = self as *const _ as usize;
1107 let validate = || {
1108 let state = self.state.load(Ordering::Relaxed);
1109 state & PARKED_BIT != 0 && (state & validate_flags != 0)
1110 };
1111 let before_sleep = || {};
1112 let timed_out = |_, was_last_thread| {
1113 if was_last_thread {
1115 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
1116 }
1117 };
1118
1119 let park_result = unsafe {
1124 parking_lot_core::park(addr, validate, before_sleep, timed_out, token, timeout)
1125 };
1126 match park_result {
1127 ParkResult::Unparked(TOKEN_HANDOFF) => return true,
1130
1131 ParkResult::Unparked(_) => (),
1133
1134 ParkResult::Invalid => (),
1136
1137 ParkResult::TimedOut => return false,
1139 }
1140
1141 spinwait.reset();
1143 state = self.state.load(Ordering::Relaxed);
1144 }
1145 }
1146
1147 #[inline]
1148 fn deadlock_acquire(&self) {
1149 unsafe { deadlock::acquire_resource(self as *const _ as usize) };
1150 unsafe { deadlock::acquire_resource(self as *const _ as usize + 1) };
1151 }
1152
1153 #[inline]
1154 fn deadlock_release(&self) {
1155 unsafe { deadlock::release_resource(self as *const _ as usize) };
1156 unsafe { deadlock::release_resource(self as *const _ as usize + 1) };
1157 }
1158}