parking_lot/
raw_rwlock.rs

1// Copyright 2016 Amanieu d'Antras
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7
8use crate::elision::{have_elision, AtomicElisionExt};
9use crate::raw_mutex::{TOKEN_HANDOFF, TOKEN_NORMAL};
10use crate::util;
11use core::{
12    cell::Cell,
13    sync::atomic::{AtomicUsize, Ordering},
14};
15use lock_api::{RawRwLock as RawRwLock_, RawRwLockUpgrade};
16use parking_lot_core::{
17    self, deadlock, FilterOp, ParkResult, ParkToken, SpinWait, UnparkResult, UnparkToken,
18};
19use std::time::{Duration, Instant};
20
21// This reader-writer lock implementation is based on Boost's upgrade_mutex:
22// https://github.com/boostorg/thread/blob/fc08c1fe2840baeeee143440fba31ef9e9a813c8/include/boost/thread/v2/shared_mutex.hpp#L432
23//
24// This implementation uses 2 wait queues, one at key [addr] and one at key
25// [addr + 1]. The primary queue is used for all new waiting threads, and the
26// secondary queue is used by the thread which has acquired WRITER_BIT but is
27// waiting for the remaining readers to exit the lock.
28//
29// This implementation is fair between readers and writers since it uses the
30// order in which threads first started queuing to alternate between read phases
31// and write phases. In particular is it not vulnerable to write starvation
32// since readers will block if there is a pending writer.
33
34// There is at least one thread in the main queue.
35const PARKED_BIT: usize = 0b0001;
36// There is a parked thread holding WRITER_BIT. WRITER_BIT must be set.
37const WRITER_PARKED_BIT: usize = 0b0010;
38// A reader is holding an upgradable lock. The reader count must be non-zero and
39// WRITER_BIT must not be set.
40const UPGRADABLE_BIT: usize = 0b0100;
41// If the reader count is zero: a writer is currently holding an exclusive lock.
42// Otherwise: a writer is waiting for the remaining readers to exit the lock.
43const WRITER_BIT: usize = 0b1000;
44// Mask of bits used to count readers.
45const READERS_MASK: usize = !0b1111;
46// Base unit for counting readers.
47const ONE_READER: usize = 0b10000;
48
49// Token indicating what type of lock a queued thread is trying to acquire
50const TOKEN_SHARED: ParkToken = ParkToken(ONE_READER);
51const TOKEN_EXCLUSIVE: ParkToken = ParkToken(WRITER_BIT);
52const TOKEN_UPGRADABLE: ParkToken = ParkToken(ONE_READER | UPGRADABLE_BIT);
53
54/// Raw reader-writer lock type backed by the parking lot.
55pub struct RawRwLock {
56    state: AtomicUsize,
57}
58
59unsafe impl lock_api::RawRwLock for RawRwLock {
60    const INIT: RawRwLock = RawRwLock {
61        state: AtomicUsize::new(0),
62    };
63
64    type GuardMarker = crate::GuardMarker;
65
66    #[inline]
67    fn lock_exclusive(&self) {
68        if self
69            .state
70            .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
71            .is_err()
72        {
73            let result = self.lock_exclusive_slow(None);
74            debug_assert!(result);
75        }
76        self.deadlock_acquire();
77    }
78
79    #[inline]
80    fn try_lock_exclusive(&self) -> bool {
81        if self
82            .state
83            .compare_exchange(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
84            .is_ok()
85        {
86            self.deadlock_acquire();
87            true
88        } else {
89            false
90        }
91    }
92
93    #[inline]
94    unsafe fn unlock_exclusive(&self) {
95        self.deadlock_release();
96        if self
97            .state
98            .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed)
99            .is_ok()
100        {
101            return;
102        }
103        self.unlock_exclusive_slow(false);
104    }
105
106    #[inline]
107    fn lock_shared(&self) {
108        if !self.try_lock_shared_fast(false) {
109            let result = self.lock_shared_slow(false, None);
110            debug_assert!(result);
111        }
112        self.deadlock_acquire();
113    }
114
115    #[inline]
116    fn try_lock_shared(&self) -> bool {
117        let result = if self.try_lock_shared_fast(false) {
118            true
119        } else {
120            self.try_lock_shared_slow(false)
121        };
122        if result {
123            self.deadlock_acquire();
124        }
125        result
126    }
127
128    #[inline]
129    unsafe fn unlock_shared(&self) {
130        self.deadlock_release();
131        let state = if have_elision() {
132            self.state.elision_fetch_sub_release(ONE_READER)
133        } else {
134            self.state.fetch_sub(ONE_READER, Ordering::Release)
135        };
136        if state & (READERS_MASK | WRITER_PARKED_BIT) == (ONE_READER | WRITER_PARKED_BIT) {
137            self.unlock_shared_slow();
138        }
139    }
140
141    #[inline]
142    fn is_locked(&self) -> bool {
143        let state = self.state.load(Ordering::Relaxed);
144        state & (WRITER_BIT | READERS_MASK) != 0
145    }
146
147    #[inline]
148    fn is_locked_exclusive(&self) -> bool {
149        let state = self.state.load(Ordering::Relaxed);
150        state & (WRITER_BIT) != 0
151    }
152}
153
154unsafe impl lock_api::RawRwLockFair for RawRwLock {
155    #[inline]
156    unsafe fn unlock_shared_fair(&self) {
157        // Shared unlocking is always fair in this implementation.
158        self.unlock_shared();
159    }
160
161    #[inline]
162    unsafe fn unlock_exclusive_fair(&self) {
163        self.deadlock_release();
164        if self
165            .state
166            .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed)
167            .is_ok()
168        {
169            return;
170        }
171        self.unlock_exclusive_slow(true);
172    }
173
174    #[inline]
175    unsafe fn bump_shared(&self) {
176        if self.state.load(Ordering::Relaxed) & WRITER_BIT != 0 {
177            self.bump_shared_slow();
178        }
179    }
180
181    #[inline]
182    unsafe fn bump_exclusive(&self) {
183        if self.state.load(Ordering::Relaxed) & PARKED_BIT != 0 {
184            self.bump_exclusive_slow();
185        }
186    }
187}
188
189unsafe impl lock_api::RawRwLockDowngrade for RawRwLock {
190    #[inline]
191    unsafe fn downgrade(&self) {
192        let state = self
193            .state
194            .fetch_add(ONE_READER - WRITER_BIT, Ordering::Release);
195
196        // Wake up parked shared and upgradable threads if there are any
197        if state & PARKED_BIT != 0 {
198            self.downgrade_slow();
199        }
200    }
201}
202
203unsafe impl lock_api::RawRwLockTimed for RawRwLock {
204    type Duration = Duration;
205    type Instant = Instant;
206
207    #[inline]
208    fn try_lock_shared_for(&self, timeout: Self::Duration) -> bool {
209        let result = if self.try_lock_shared_fast(false) {
210            true
211        } else {
212            self.lock_shared_slow(false, util::to_deadline(timeout))
213        };
214        if result {
215            self.deadlock_acquire();
216        }
217        result
218    }
219
220    #[inline]
221    fn try_lock_shared_until(&self, timeout: Self::Instant) -> bool {
222        let result = if self.try_lock_shared_fast(false) {
223            true
224        } else {
225            self.lock_shared_slow(false, Some(timeout))
226        };
227        if result {
228            self.deadlock_acquire();
229        }
230        result
231    }
232
233    #[inline]
234    fn try_lock_exclusive_for(&self, timeout: Duration) -> bool {
235        let result = if self
236            .state
237            .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
238            .is_ok()
239        {
240            true
241        } else {
242            self.lock_exclusive_slow(util::to_deadline(timeout))
243        };
244        if result {
245            self.deadlock_acquire();
246        }
247        result
248    }
249
250    #[inline]
251    fn try_lock_exclusive_until(&self, timeout: Instant) -> bool {
252        let result = if self
253            .state
254            .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
255            .is_ok()
256        {
257            true
258        } else {
259            self.lock_exclusive_slow(Some(timeout))
260        };
261        if result {
262            self.deadlock_acquire();
263        }
264        result
265    }
266}
267
268unsafe impl lock_api::RawRwLockRecursive for RawRwLock {
269    #[inline]
270    fn lock_shared_recursive(&self) {
271        if !self.try_lock_shared_fast(true) {
272            let result = self.lock_shared_slow(true, None);
273            debug_assert!(result);
274        }
275        self.deadlock_acquire();
276    }
277
278    #[inline]
279    fn try_lock_shared_recursive(&self) -> bool {
280        let result = if self.try_lock_shared_fast(true) {
281            true
282        } else {
283            self.try_lock_shared_slow(true)
284        };
285        if result {
286            self.deadlock_acquire();
287        }
288        result
289    }
290}
291
292unsafe impl lock_api::RawRwLockRecursiveTimed for RawRwLock {
293    #[inline]
294    fn try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool {
295        let result = if self.try_lock_shared_fast(true) {
296            true
297        } else {
298            self.lock_shared_slow(true, util::to_deadline(timeout))
299        };
300        if result {
301            self.deadlock_acquire();
302        }
303        result
304    }
305
306    #[inline]
307    fn try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool {
308        let result = if self.try_lock_shared_fast(true) {
309            true
310        } else {
311            self.lock_shared_slow(true, Some(timeout))
312        };
313        if result {
314            self.deadlock_acquire();
315        }
316        result
317    }
318}
319
320unsafe impl lock_api::RawRwLockUpgrade for RawRwLock {
321    #[inline]
322    fn lock_upgradable(&self) {
323        if !self.try_lock_upgradable_fast() {
324            let result = self.lock_upgradable_slow(None);
325            debug_assert!(result);
326        }
327        self.deadlock_acquire();
328    }
329
330    #[inline]
331    fn try_lock_upgradable(&self) -> bool {
332        let result = if self.try_lock_upgradable_fast() {
333            true
334        } else {
335            self.try_lock_upgradable_slow()
336        };
337        if result {
338            self.deadlock_acquire();
339        }
340        result
341    }
342
343    #[inline]
344    unsafe fn unlock_upgradable(&self) {
345        self.deadlock_release();
346        let state = self.state.load(Ordering::Relaxed);
347        #[allow(clippy::collapsible_if)]
348        if state & PARKED_BIT == 0 {
349            if self
350                .state
351                .compare_exchange_weak(
352                    state,
353                    state - (ONE_READER | UPGRADABLE_BIT),
354                    Ordering::Release,
355                    Ordering::Relaxed,
356                )
357                .is_ok()
358            {
359                return;
360            }
361        }
362        self.unlock_upgradable_slow(false);
363    }
364
365    #[inline]
366    unsafe fn upgrade(&self) {
367        let state = self.state.fetch_sub(
368            (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
369            Ordering::Acquire,
370        );
371        if state & READERS_MASK != ONE_READER {
372            let result = self.upgrade_slow(None);
373            debug_assert!(result);
374        }
375    }
376
377    #[inline]
378    unsafe fn try_upgrade(&self) -> bool {
379        if self
380            .state
381            .compare_exchange_weak(
382                ONE_READER | UPGRADABLE_BIT,
383                WRITER_BIT,
384                Ordering::Acquire,
385                Ordering::Relaxed,
386            )
387            .is_ok()
388        {
389            true
390        } else {
391            self.try_upgrade_slow()
392        }
393    }
394}
395
396unsafe impl lock_api::RawRwLockUpgradeFair for RawRwLock {
397    #[inline]
398    unsafe fn unlock_upgradable_fair(&self) {
399        self.deadlock_release();
400        let state = self.state.load(Ordering::Relaxed);
401        #[allow(clippy::collapsible_if)]
402        if state & PARKED_BIT == 0 {
403            if self
404                .state
405                .compare_exchange_weak(
406                    state,
407                    state - (ONE_READER | UPGRADABLE_BIT),
408                    Ordering::Release,
409                    Ordering::Relaxed,
410                )
411                .is_ok()
412            {
413                return;
414            }
415        }
416        self.unlock_upgradable_slow(false);
417    }
418
419    #[inline]
420    unsafe fn bump_upgradable(&self) {
421        if self.state.load(Ordering::Relaxed) & PARKED_BIT != 0 {
422            self.bump_upgradable_slow();
423        }
424    }
425}
426
427unsafe impl lock_api::RawRwLockUpgradeDowngrade for RawRwLock {
428    #[inline]
429    unsafe fn downgrade_upgradable(&self) {
430        let state = self.state.fetch_sub(UPGRADABLE_BIT, Ordering::Relaxed);
431
432        // Wake up parked upgradable threads if there are any
433        if state & PARKED_BIT != 0 {
434            self.downgrade_slow();
435        }
436    }
437
438    #[inline]
439    unsafe fn downgrade_to_upgradable(&self) {
440        let state = self.state.fetch_add(
441            (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
442            Ordering::Release,
443        );
444
445        // Wake up parked shared threads if there are any
446        if state & PARKED_BIT != 0 {
447            self.downgrade_to_upgradable_slow();
448        }
449    }
450}
451
452unsafe impl lock_api::RawRwLockUpgradeTimed for RawRwLock {
453    #[inline]
454    fn try_lock_upgradable_until(&self, timeout: Instant) -> bool {
455        let result = if self.try_lock_upgradable_fast() {
456            true
457        } else {
458            self.lock_upgradable_slow(Some(timeout))
459        };
460        if result {
461            self.deadlock_acquire();
462        }
463        result
464    }
465
466    #[inline]
467    fn try_lock_upgradable_for(&self, timeout: Duration) -> bool {
468        let result = if self.try_lock_upgradable_fast() {
469            true
470        } else {
471            self.lock_upgradable_slow(util::to_deadline(timeout))
472        };
473        if result {
474            self.deadlock_acquire();
475        }
476        result
477    }
478
479    #[inline]
480    unsafe fn try_upgrade_until(&self, timeout: Instant) -> bool {
481        let state = self.state.fetch_sub(
482            (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
483            Ordering::Relaxed,
484        );
485        if state & READERS_MASK == ONE_READER {
486            true
487        } else {
488            self.upgrade_slow(Some(timeout))
489        }
490    }
491
492    #[inline]
493    unsafe fn try_upgrade_for(&self, timeout: Duration) -> bool {
494        let state = self.state.fetch_sub(
495            (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
496            Ordering::Relaxed,
497        );
498        if state & READERS_MASK == ONE_READER {
499            true
500        } else {
501            self.upgrade_slow(util::to_deadline(timeout))
502        }
503    }
504}
505
506impl RawRwLock {
507    #[inline(always)]
508    fn try_lock_shared_fast(&self, recursive: bool) -> bool {
509        let state = self.state.load(Ordering::Relaxed);
510
511        // We can't allow grabbing a shared lock if there is a writer, even if
512        // the writer is still waiting for the remaining readers to exit.
513        if state & WRITER_BIT != 0 {
514            // To allow recursive locks, we make an exception and allow readers
515            // to skip ahead of a pending writer to avoid deadlocking, at the
516            // cost of breaking the fairness guarantees.
517            if !recursive || state & READERS_MASK == 0 {
518                return false;
519            }
520        }
521
522        // Use hardware lock elision to avoid cache conflicts when multiple
523        // readers try to acquire the lock. We only do this if the lock is
524        // completely empty since elision handles conflicts poorly.
525        if have_elision() && state == 0 {
526            self.state
527                .elision_compare_exchange_acquire(0, ONE_READER)
528                .is_ok()
529        } else if let Some(new_state) = state.checked_add(ONE_READER) {
530            self.state
531                .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed)
532                .is_ok()
533        } else {
534            false
535        }
536    }
537
538    #[cold]
539    fn try_lock_shared_slow(&self, recursive: bool) -> bool {
540        let mut state = self.state.load(Ordering::Relaxed);
541        loop {
542            // This mirrors the condition in try_lock_shared_fast
543            #[allow(clippy::collapsible_if)]
544            if state & WRITER_BIT != 0 {
545                if !recursive || state & READERS_MASK == 0 {
546                    return false;
547                }
548            }
549            if have_elision() && state == 0 {
550                match self.state.elision_compare_exchange_acquire(0, ONE_READER) {
551                    Ok(_) => return true,
552                    Err(x) => state = x,
553                }
554            } else {
555                match self.state.compare_exchange_weak(
556                    state,
557                    state
558                        .checked_add(ONE_READER)
559                        .expect("RwLock reader count overflow"),
560                    Ordering::Acquire,
561                    Ordering::Relaxed,
562                ) {
563                    Ok(_) => return true,
564                    Err(x) => state = x,
565                }
566            }
567        }
568    }
569
570    #[inline(always)]
571    fn try_lock_upgradable_fast(&self) -> bool {
572        let state = self.state.load(Ordering::Relaxed);
573
574        // We can't grab an upgradable lock if there is already a writer or
575        // upgradable reader.
576        if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
577            return false;
578        }
579
580        if let Some(new_state) = state.checked_add(ONE_READER | UPGRADABLE_BIT) {
581            self.state
582                .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed)
583                .is_ok()
584        } else {
585            false
586        }
587    }
588
589    #[cold]
590    fn try_lock_upgradable_slow(&self) -> bool {
591        let mut state = self.state.load(Ordering::Relaxed);
592        loop {
593            // This mirrors the condition in try_lock_upgradable_fast
594            if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
595                return false;
596            }
597
598            match self.state.compare_exchange_weak(
599                state,
600                state
601                    .checked_add(ONE_READER | UPGRADABLE_BIT)
602                    .expect("RwLock reader count overflow"),
603                Ordering::Acquire,
604                Ordering::Relaxed,
605            ) {
606                Ok(_) => return true,
607                Err(x) => state = x,
608            }
609        }
610    }
611
612    #[cold]
613    fn lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool {
614        let try_lock = |state: &mut usize| {
615            loop {
616                if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
617                    return false;
618                }
619
620                // Grab WRITER_BIT if it isn't set, even if there are parked threads.
621                match self.state.compare_exchange_weak(
622                    *state,
623                    *state | WRITER_BIT,
624                    Ordering::Acquire,
625                    Ordering::Relaxed,
626                ) {
627                    Ok(_) => return true,
628                    Err(x) => *state = x,
629                }
630            }
631        };
632
633        // Step 1: grab exclusive ownership of WRITER_BIT
634        let timed_out = !self.lock_common(
635            timeout,
636            TOKEN_EXCLUSIVE,
637            try_lock,
638            WRITER_BIT | UPGRADABLE_BIT,
639        );
640        if timed_out {
641            return false;
642        }
643
644        // Step 2: wait for all remaining readers to exit the lock.
645        self.wait_for_readers(timeout, 0)
646    }
647
648    #[cold]
649    fn unlock_exclusive_slow(&self, force_fair: bool) {
650        // There are threads to unpark. Try to unpark as many as we can.
651        let callback = |mut new_state, result: UnparkResult| {
652            // If we are using a fair unlock then we should keep the
653            // rwlock locked and hand it off to the unparked threads.
654            if result.unparked_threads != 0 && (force_fair || result.be_fair) {
655                if result.have_more_threads {
656                    new_state |= PARKED_BIT;
657                }
658                self.state.store(new_state, Ordering::Release);
659                TOKEN_HANDOFF
660            } else {
661                // Clear the parked bit if there are no more parked threads.
662                if result.have_more_threads {
663                    self.state.store(PARKED_BIT, Ordering::Release);
664                } else {
665                    self.state.store(0, Ordering::Release);
666                }
667                TOKEN_NORMAL
668            }
669        };
670        // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
671        unsafe {
672            self.wake_parked_threads(0, callback);
673        }
674    }
675
676    #[cold]
677    fn lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool {
678        let try_lock = |state: &mut usize| {
679            let mut spinwait_shared = SpinWait::new();
680            loop {
681                // Use hardware lock elision to avoid cache conflicts when multiple
682                // readers try to acquire the lock. We only do this if the lock is
683                // completely empty since elision handles conflicts poorly.
684                if have_elision() && *state == 0 {
685                    match self.state.elision_compare_exchange_acquire(0, ONE_READER) {
686                        Ok(_) => return true,
687                        Err(x) => *state = x,
688                    }
689                }
690
691                // This is the same condition as try_lock_shared_fast
692                #[allow(clippy::collapsible_if)]
693                if *state & WRITER_BIT != 0 {
694                    if !recursive || *state & READERS_MASK == 0 {
695                        return false;
696                    }
697                }
698
699                if self
700                    .state
701                    .compare_exchange_weak(
702                        *state,
703                        state
704                            .checked_add(ONE_READER)
705                            .expect("RwLock reader count overflow"),
706                        Ordering::Acquire,
707                        Ordering::Relaxed,
708                    )
709                    .is_ok()
710                {
711                    return true;
712                }
713
714                // If there is high contention on the reader count then we want
715                // to leave some time between attempts to acquire the lock to
716                // let other threads make progress.
717                spinwait_shared.spin_no_yield();
718                *state = self.state.load(Ordering::Relaxed);
719            }
720        };
721        self.lock_common(timeout, TOKEN_SHARED, try_lock, WRITER_BIT)
722    }
723
724    #[cold]
725    fn unlock_shared_slow(&self) {
726        // At this point WRITER_PARKED_BIT is set and READER_MASK is empty. We
727        // just need to wake up a potentially sleeping pending writer.
728        // Using the 2nd key at addr + 1
729        let addr = self as *const _ as usize + 1;
730        let callback = |_result: UnparkResult| {
731            // Clear the WRITER_PARKED_BIT here since there can only be one
732            // parked writer thread.
733            self.state.fetch_and(!WRITER_PARKED_BIT, Ordering::Relaxed);
734            TOKEN_NORMAL
735        };
736        // SAFETY:
737        //   * `addr` is an address we control.
738        //   * `callback` does not panic or call into any function of `parking_lot`.
739        unsafe {
740            parking_lot_core::unpark_one(addr, callback);
741        }
742    }
743
744    #[cold]
745    fn lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool {
746        let try_lock = |state: &mut usize| {
747            let mut spinwait_shared = SpinWait::new();
748            loop {
749                if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
750                    return false;
751                }
752
753                if self
754                    .state
755                    .compare_exchange_weak(
756                        *state,
757                        state
758                            .checked_add(ONE_READER | UPGRADABLE_BIT)
759                            .expect("RwLock reader count overflow"),
760                        Ordering::Acquire,
761                        Ordering::Relaxed,
762                    )
763                    .is_ok()
764                {
765                    return true;
766                }
767
768                // If there is high contention on the reader count then we want
769                // to leave some time between attempts to acquire the lock to
770                // let other threads make progress.
771                spinwait_shared.spin_no_yield();
772                *state = self.state.load(Ordering::Relaxed);
773            }
774        };
775        self.lock_common(
776            timeout,
777            TOKEN_UPGRADABLE,
778            try_lock,
779            WRITER_BIT | UPGRADABLE_BIT,
780        )
781    }
782
783    #[cold]
784    fn unlock_upgradable_slow(&self, force_fair: bool) {
785        // Just release the lock if there are no parked threads.
786        let mut state = self.state.load(Ordering::Relaxed);
787        while state & PARKED_BIT == 0 {
788            match self.state.compare_exchange_weak(
789                state,
790                state - (ONE_READER | UPGRADABLE_BIT),
791                Ordering::Release,
792                Ordering::Relaxed,
793            ) {
794                Ok(_) => return,
795                Err(x) => state = x,
796            }
797        }
798
799        // There are threads to unpark. Try to unpark as many as we can.
800        let callback = |new_state, result: UnparkResult| {
801            // If we are using a fair unlock then we should keep the
802            // rwlock locked and hand it off to the unparked threads.
803            let mut state = self.state.load(Ordering::Relaxed);
804            if force_fair || result.be_fair {
805                // Fall back to normal unpark on overflow. Panicking is
806                // not allowed in parking_lot callbacks.
807                while let Some(mut new_state) =
808                    (state - (ONE_READER | UPGRADABLE_BIT)).checked_add(new_state)
809                {
810                    if result.have_more_threads {
811                        new_state |= PARKED_BIT;
812                    } else {
813                        new_state &= !PARKED_BIT;
814                    }
815                    match self.state.compare_exchange_weak(
816                        state,
817                        new_state,
818                        Ordering::Relaxed,
819                        Ordering::Relaxed,
820                    ) {
821                        Ok(_) => return TOKEN_HANDOFF,
822                        Err(x) => state = x,
823                    }
824                }
825            }
826
827            // Otherwise just release the upgradable lock and update PARKED_BIT.
828            loop {
829                let mut new_state = state - (ONE_READER | UPGRADABLE_BIT);
830                if result.have_more_threads {
831                    new_state |= PARKED_BIT;
832                } else {
833                    new_state &= !PARKED_BIT;
834                }
835                match self.state.compare_exchange_weak(
836                    state,
837                    new_state,
838                    Ordering::Relaxed,
839                    Ordering::Relaxed,
840                ) {
841                    Ok(_) => return TOKEN_NORMAL,
842                    Err(x) => state = x,
843                }
844            }
845        };
846        // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
847        unsafe {
848            self.wake_parked_threads(0, callback);
849        }
850    }
851
852    #[cold]
853    fn try_upgrade_slow(&self) -> bool {
854        let mut state = self.state.load(Ordering::Relaxed);
855        loop {
856            if state & READERS_MASK != ONE_READER {
857                return false;
858            }
859            match self.state.compare_exchange_weak(
860                state,
861                state - (ONE_READER | UPGRADABLE_BIT) + WRITER_BIT,
862                Ordering::Relaxed,
863                Ordering::Relaxed,
864            ) {
865                Ok(_) => return true,
866                Err(x) => state = x,
867            }
868        }
869    }
870
871    #[cold]
872    fn upgrade_slow(&self, timeout: Option<Instant>) -> bool {
873        self.deadlock_release();
874        let result = self.wait_for_readers(timeout, ONE_READER | UPGRADABLE_BIT);
875        self.deadlock_acquire();
876        result
877    }
878
879    #[cold]
880    fn downgrade_slow(&self) {
881        // We only reach this point if PARKED_BIT is set.
882        let callback = |_, result: UnparkResult| {
883            // Clear the parked bit if there no more parked threads
884            if !result.have_more_threads {
885                self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
886            }
887            TOKEN_NORMAL
888        };
889        // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
890        unsafe {
891            self.wake_parked_threads(ONE_READER, callback);
892        }
893    }
894
895    #[cold]
896    fn downgrade_to_upgradable_slow(&self) {
897        // We only reach this point if PARKED_BIT is set.
898        let callback = |_, result: UnparkResult| {
899            // Clear the parked bit if there no more parked threads
900            if !result.have_more_threads {
901                self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
902            }
903            TOKEN_NORMAL
904        };
905        // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
906        unsafe {
907            self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback);
908        }
909    }
910
911    #[cold]
912    unsafe fn bump_shared_slow(&self) {
913        self.unlock_shared();
914        self.lock_shared();
915    }
916
917    #[cold]
918    fn bump_exclusive_slow(&self) {
919        self.deadlock_release();
920        self.unlock_exclusive_slow(true);
921        self.lock_exclusive();
922    }
923
924    #[cold]
925    fn bump_upgradable_slow(&self) {
926        self.deadlock_release();
927        self.unlock_upgradable_slow(true);
928        self.lock_upgradable();
929    }
930
931    /// Common code for waking up parked threads after releasing `WRITER_BIT` or
932    /// `UPGRADABLE_BIT`.
933    ///
934    /// # Safety
935    ///
936    /// `callback` must uphold the requirements of the `callback` parameter to
937    /// `parking_lot_core::unpark_filter`. Meaning no panics or calls into any function in
938    /// `parking_lot`.
939    #[inline]
940    unsafe fn wake_parked_threads(
941        &self,
942        new_state: usize,
943        callback: impl FnOnce(usize, UnparkResult) -> UnparkToken,
944    ) {
945        // We must wake up at least one upgrader or writer if there is one,
946        // otherwise they may end up parked indefinitely since unlock_shared
947        // does not call wake_parked_threads.
948        let new_state = Cell::new(new_state);
949        let addr = self as *const _ as usize;
950        let filter = |ParkToken(token)| {
951            let s = new_state.get();
952
953            // If we are waking up a writer, don't wake anything else.
954            if s & WRITER_BIT != 0 {
955                return FilterOp::Stop;
956            }
957
958            // Otherwise wake *all* readers and one upgrader/writer.
959            if token & (UPGRADABLE_BIT | WRITER_BIT) != 0 && s & UPGRADABLE_BIT != 0 {
960                // Skip writers and upgradable readers if we already have
961                // a writer/upgradable reader.
962                FilterOp::Skip
963            } else {
964                new_state.set(s + token);
965                FilterOp::Unpark
966            }
967        };
968        let callback = |result| callback(new_state.get(), result);
969        // SAFETY:
970        // * `addr` is an address we control.
971        // * `filter` does not panic or call into any function of `parking_lot`.
972        // * `callback` safety responsibility is on caller
973        parking_lot_core::unpark_filter(addr, filter, callback);
974    }
975
976    // Common code for waiting for readers to exit the lock after acquiring
977    // WRITER_BIT.
978    #[inline]
979    fn wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool {
980        // At this point WRITER_BIT is already set, we just need to wait for the
981        // remaining readers to exit the lock.
982        let mut spinwait = SpinWait::new();
983        let mut state = self.state.load(Ordering::Acquire);
984        while state & READERS_MASK != 0 {
985            // Spin a few times to wait for readers to exit
986            if spinwait.spin() {
987                state = self.state.load(Ordering::Acquire);
988                continue;
989            }
990
991            // Set the parked bit
992            if state & WRITER_PARKED_BIT == 0 {
993                if let Err(x) = self.state.compare_exchange_weak(
994                    state,
995                    state | WRITER_PARKED_BIT,
996                    Ordering::Acquire,
997                    Ordering::Acquire,
998                ) {
999                    state = x;
1000                    continue;
1001                }
1002            }
1003
1004            // Park our thread until we are woken up by an unlock
1005            // Using the 2nd key at addr + 1
1006            let addr = self as *const _ as usize + 1;
1007            let validate = || {
1008                let state = self.state.load(Ordering::Relaxed);
1009                state & READERS_MASK != 0 && state & WRITER_PARKED_BIT != 0
1010            };
1011            let before_sleep = || {};
1012            let timed_out = |_, was_last_thread: bool| {
1013                // Clear the parked bit while holding the queue lock. There can
1014                // only be one thread parked (this one).
1015                debug_assert!(was_last_thread);
1016                self.state.fetch_and(!WRITER_PARKED_BIT, Ordering::Relaxed);
1017            };
1018            // SAFETY:
1019            //   * `addr` is an address we control.
1020            //   * `validate`/`timed_out` does not panic or call into any function of `parking_lot`.
1021            //   * `before_sleep` does not call `park`, nor does it panic.
1022            let park_result = unsafe {
1023                parking_lot_core::park(
1024                    addr,
1025                    validate,
1026                    before_sleep,
1027                    timed_out,
1028                    TOKEN_EXCLUSIVE,
1029                    timeout,
1030                )
1031            };
1032            match park_result {
1033                // We still need to re-check the state if we are unparked
1034                // since a previous writer timing-out could have allowed
1035                // another reader to sneak in before we parked.
1036                ParkResult::Unparked(_) | ParkResult::Invalid => {
1037                    state = self.state.load(Ordering::Acquire);
1038                    continue;
1039                }
1040
1041                // Timeout expired
1042                ParkResult::TimedOut => {
1043                    // We need to release WRITER_BIT and revert back to
1044                    // our previous value. We also wake up any threads that
1045                    // might be waiting on WRITER_BIT.
1046                    let state = self
1047                        .state
1048                        .fetch_add(prev_value.wrapping_sub(WRITER_BIT), Ordering::Relaxed);
1049                    if state & PARKED_BIT != 0 {
1050                        let callback = |_, result: UnparkResult| {
1051                            // Clear the parked bit if there no more parked threads
1052                            if !result.have_more_threads {
1053                                self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
1054                            }
1055                            TOKEN_NORMAL
1056                        };
1057                        // SAFETY: `callback` does not panic or call any function of `parking_lot`.
1058                        unsafe {
1059                            self.wake_parked_threads(prev_value, callback);
1060                        }
1061                    }
1062                    return false;
1063                }
1064            }
1065        }
1066        true
1067    }
1068
1069    /// Common code for acquiring a lock
1070    #[inline]
1071    fn lock_common(
1072        &self,
1073        timeout: Option<Instant>,
1074        token: ParkToken,
1075        mut try_lock: impl FnMut(&mut usize) -> bool,
1076        validate_flags: usize,
1077    ) -> bool {
1078        let mut spinwait = SpinWait::new();
1079        let mut state = self.state.load(Ordering::Relaxed);
1080        loop {
1081            // Attempt to grab the lock
1082            if try_lock(&mut state) {
1083                return true;
1084            }
1085
1086            // If there are no parked threads, try spinning a few times.
1087            if state & (PARKED_BIT | WRITER_PARKED_BIT) == 0 && spinwait.spin() {
1088                state = self.state.load(Ordering::Relaxed);
1089                continue;
1090            }
1091
1092            // Set the parked bit
1093            if state & PARKED_BIT == 0 {
1094                if let Err(x) = self.state.compare_exchange_weak(
1095                    state,
1096                    state | PARKED_BIT,
1097                    Ordering::Relaxed,
1098                    Ordering::Relaxed,
1099                ) {
1100                    state = x;
1101                    continue;
1102                }
1103            }
1104
1105            // Park our thread until we are woken up by an unlock
1106            let addr = self as *const _ as usize;
1107            let validate = || {
1108                let state = self.state.load(Ordering::Relaxed);
1109                state & PARKED_BIT != 0 && (state & validate_flags != 0)
1110            };
1111            let before_sleep = || {};
1112            let timed_out = |_, was_last_thread| {
1113                // Clear the parked bit if we were the last parked thread
1114                if was_last_thread {
1115                    self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
1116                }
1117            };
1118
1119            // SAFETY:
1120            // * `addr` is an address we control.
1121            // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`.
1122            // * `before_sleep` does not call `park`, nor does it panic.
1123            let park_result = unsafe {
1124                parking_lot_core::park(addr, validate, before_sleep, timed_out, token, timeout)
1125            };
1126            match park_result {
1127                // The thread that unparked us passed the lock on to us
1128                // directly without unlocking it.
1129                ParkResult::Unparked(TOKEN_HANDOFF) => return true,
1130
1131                // We were unparked normally, try acquiring the lock again
1132                ParkResult::Unparked(_) => (),
1133
1134                // The validation function failed, try locking again
1135                ParkResult::Invalid => (),
1136
1137                // Timeout expired
1138                ParkResult::TimedOut => return false,
1139            }
1140
1141            // Loop back and try locking again
1142            spinwait.reset();
1143            state = self.state.load(Ordering::Relaxed);
1144        }
1145    }
1146
1147    #[inline]
1148    fn deadlock_acquire(&self) {
1149        unsafe { deadlock::acquire_resource(self as *const _ as usize) };
1150        unsafe { deadlock::acquire_resource(self as *const _ as usize + 1) };
1151    }
1152
1153    #[inline]
1154    fn deadlock_release(&self) {
1155        unsafe { deadlock::release_resource(self as *const _ as usize) };
1156        unsafe { deadlock::release_resource(self as *const _ as usize + 1) };
1157    }
1158}