Skip to main content

std/sys/sync/once/
futex.rs

1use crate::cell::Cell;
2use crate::sync as public;
3use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release};
4use crate::sync::once::OnceExclusiveState;
5use crate::sys::futex::{Futex, Primitive, futex_wait, futex_wake_all};
6
7// On some platforms, the OS is very nice and handles the waiter queue for us.
8// This means we only need one atomic value with 4 states:
9
10/// No initialization has run yet, and no thread is currently using the Once.
11const INCOMPLETE: Primitive = 3;
12/// Some thread has previously attempted to initialize the Once, but it panicked,
13/// so the Once is now poisoned. There are no other threads currently accessing
14/// this Once.
15const POISONED: Primitive = 2;
16/// Some thread is currently attempting to run initialization. It may succeed,
17/// so all future threads need to wait for it to finish.
18const RUNNING: Primitive = 1;
19/// Initialization has completed and all future calls should finish immediately.
20/// By choosing this state as the all-zero state the `is_completed` check can be
21/// a bit faster on some platforms.
22const COMPLETE: Primitive = 0;
23
24// An additional bit indicates whether there are waiting threads:
25
26/// May only be set if the state is not COMPLETE.
27const QUEUED: Primitive = 4;
28
29// Threads wait by setting the QUEUED bit and calling `futex_wait` on the state
30// variable. When the running thread finishes, it will wake all waiting threads using
31// `futex_wake_all`.
32
33const STATE_MASK: Primitive = 0b11;
34
35pub struct OnceState {
36    poisoned: bool,
37    set_state_to: Cell<Primitive>,
38}
39
40impl OnceState {
41    #[inline]
42    pub fn is_poisoned(&self) -> bool {
43        self.poisoned
44    }
45
46    #[inline]
47    pub fn poison(&self) {
48        self.set_state_to.set(POISONED);
49    }
50}
51
52struct CompletionGuard<'a> {
53    state_and_queued: &'a Futex,
54    set_state_on_drop_to: Primitive,
55}
56
57impl<'a> Drop for CompletionGuard<'a> {
58    fn drop(&mut self) {
59        // Use release ordering to propagate changes to all threads checking
60        // up on the Once. `futex_wake_all` does its own synchronization, hence
61        // we do not need `AcqRel`.
62        if self.state_and_queued.swap(self.set_state_on_drop_to, Release) & QUEUED != 0 {
63            futex_wake_all(self.state_and_queued);
64        }
65    }
66}
67
68pub struct Once {
69    state_and_queued: Futex,
70}
71
72impl Once {
73    #[inline]
74    pub const fn new() -> Once {
75        Once { state_and_queued: Futex::new(INCOMPLETE) }
76    }
77
78    #[inline]
79    pub const fn new_complete() -> Once {
80        Once { state_and_queued: Futex::new(COMPLETE) }
81    }
82
83    #[inline]
84    pub fn is_completed(&self) -> bool {
85        // Use acquire ordering to make all initialization changes visible to the
86        // current thread.
87        self.state_and_queued.load(Acquire) == COMPLETE
88    }
89
90    #[inline]
91    pub(crate) fn state(&mut self) -> OnceExclusiveState {
92        match *self.state_and_queued.get_mut() {
93            INCOMPLETE => OnceExclusiveState::Incomplete,
94            POISONED => OnceExclusiveState::Poisoned,
95            COMPLETE => OnceExclusiveState::Complete,
96            _ => {
    ::core::panicking::panic_fmt(format_args!("internal error: entered unreachable code: {0}",
            format_args!("invalid Once state")));
}unreachable!("invalid Once state"),
97        }
98    }
99
100    #[inline]
101    pub(crate) fn set_state(&mut self, new_state: OnceExclusiveState) {
102        *self.state_and_queued.get_mut() = match new_state {
103            OnceExclusiveState::Incomplete => INCOMPLETE,
104            OnceExclusiveState::Poisoned => POISONED,
105            OnceExclusiveState::Complete => COMPLETE,
106        };
107    }
108
109    #[cold]
110    #[track_caller]
111    pub fn wait(&self, ignore_poisoning: bool) {
112        let mut state_and_queued = self.state_and_queued.load(Acquire);
113        loop {
114            let state = state_and_queued & STATE_MASK;
115            let queued = state_and_queued & QUEUED != 0;
116            match state {
117                COMPLETE => return,
118                POISONED if !ignore_poisoning => {
119                    // Panic to propagate the poison.
120                    {
    ::core::panicking::panic_fmt(format_args!("Once instance has previously been poisoned"));
};panic!("Once instance has previously been poisoned");
121                }
122                _ => {
123                    // Set the QUEUED bit if it has not already been set.
124                    if !queued {
125                        state_and_queued += QUEUED;
126                        if let Err(new) = self.state_and_queued.compare_exchange_weak(
127                            state,
128                            state_and_queued,
129                            Relaxed,
130                            Acquire,
131                        ) {
132                            state_and_queued = new;
133                            continue;
134                        }
135                    }
136
137                    futex_wait(&self.state_and_queued, state_and_queued, None);
138                    state_and_queued = self.state_and_queued.load(Acquire);
139                }
140            }
141        }
142    }
143
144    #[cold]
145    #[track_caller]
146    pub fn call(&self, ignore_poisoning: bool, f: &mut dyn FnMut(&public::OnceState)) {
147        let mut state_and_queued = self.state_and_queued.load(Acquire);
148        loop {
149            let state = state_and_queued & STATE_MASK;
150            let queued = state_and_queued & QUEUED != 0;
151            match state {
152                COMPLETE => return,
153                POISONED if !ignore_poisoning => {
154                    // Panic to propagate the poison.
155                    {
    ::core::panicking::panic_fmt(format_args!("Once instance has previously been poisoned"));
};panic!("Once instance has previously been poisoned");
156                }
157                INCOMPLETE | POISONED => {
158                    // Try to register the current thread as the one running.
159                    let next = RUNNING + if queued { QUEUED } else { 0 };
160                    if let Err(new) = self.state_and_queued.compare_exchange_weak(
161                        state_and_queued,
162                        next,
163                        Acquire,
164                        Acquire,
165                    ) {
166                        state_and_queued = new;
167                        continue;
168                    }
169
170                    // `waiter_queue` will manage other waiting threads, and
171                    // wake them up on drop.
172                    let mut waiter_queue = CompletionGuard {
173                        state_and_queued: &self.state_and_queued,
174                        set_state_on_drop_to: POISONED,
175                    };
176                    // Run the function, letting it know if we're poisoned or not.
177                    let f_state = public::OnceState {
178                        inner: OnceState {
179                            poisoned: state == POISONED,
180                            set_state_to: Cell::new(COMPLETE),
181                        },
182                    };
183                    f(&f_state);
184                    waiter_queue.set_state_on_drop_to = f_state.inner.set_state_to.get();
185                    return;
186                }
187                _ => {
188                    // All other values must be RUNNING.
189                    if !(state == RUNNING) {
    ::core::panicking::panic("assertion failed: state == RUNNING")
};assert!(state == RUNNING);
190
191                    // Set the QUEUED bit if it is not already set.
192                    if !queued {
193                        state_and_queued += QUEUED;
194                        if let Err(new) = self.state_and_queued.compare_exchange_weak(
195                            state,
196                            state_and_queued,
197                            Relaxed,
198                            Acquire,
199                        ) {
200                            state_and_queued = new;
201                            continue;
202                        }
203                    }
204
205                    futex_wait(&self.state_and_queued, state_and_queued, None);
206                    state_and_queued = self.state_and_queued.load(Acquire);
207                }
208            }
209        }
210    }
211}