Skip to main content

std/sync/mpmc/
context.rs

1//! Thread-local channel context.
2
3use super::select::Selected;
4use super::waker::current_thread_id;
5use crate::cell::Cell;
6use crate::ptr;
7use crate::sync::Arc;
8use crate::sync::atomic::{Atomic, AtomicPtr, AtomicUsize, Ordering};
9use crate::thread::{self, Thread};
10use crate::time::Instant;
11
12/// Thread-local context.
13#[derive(#[automatically_derived]
impl ::core::fmt::Debug for Context {
    #[inline]
    fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
        ::core::fmt::Formatter::debug_struct_field1_finish(f, "Context",
            "inner", &&self.inner)
    }
}Debug, #[automatically_derived]
impl ::core::clone::Clone for Context {
    #[inline]
    fn clone(&self) -> Context {
        Context { inner: ::core::clone::Clone::clone(&self.inner) }
    }
}Clone)]
14pub struct Context {
15    inner: Arc<Inner>,
16}
17
18/// Inner representation of `Context`.
19#[derive(#[automatically_derived]
impl ::core::fmt::Debug for Inner {
    #[inline]
    fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
        ::core::fmt::Formatter::debug_struct_field4_finish(f, "Inner",
            "select", &self.select, "packet", &self.packet, "thread",
            &self.thread, "thread_id", &&self.thread_id)
    }
}Debug)]
20struct Inner {
21    /// Selected operation.
22    select: Atomic<usize>,
23
24    /// A slot into which another thread may store a pointer to its `Packet`.
25    packet: Atomic<*mut ()>,
26
27    /// Thread handle.
28    thread: Thread,
29
30    /// Thread id.
31    thread_id: usize,
32}
33
34impl Context {
35    /// Creates a new context for the duration of the closure.
36    #[inline]
37    pub fn with<F, R>(f: F) -> R
38    where
39        F: FnOnce(&Context) -> R,
40    {
41        #[doc = r" Cached thread-local context."]
const CONTEXT: crate::thread::LocalKey<Cell<Option<Context>>> =
    {
        #[inline]
        fn __rust_std_internal_init_fn() -> Cell<Option<Context>> {
            Cell::new(Some(Context::new()))
        }
        unsafe {
            crate::thread::LocalKey::new(const {
                        if crate::mem::needs_drop::<Cell<Option<Context>>>() {
                            |__rust_std_internal_init|
                                {
                                    #[thread_local]
                                    static __RUST_STD_INTERNAL_VAL:
                                        crate::thread::local_impl::LazyStorage<Cell<Option<Context>>,
                                        ()> =
                                        crate::thread::local_impl::LazyStorage::new();
                                    __RUST_STD_INTERNAL_VAL.get_or_init(__rust_std_internal_init,
                                        __rust_std_internal_init_fn)
                                }
                        } else {
                            |__rust_std_internal_init|
                                {
                                    #[thread_local]
                                    static __RUST_STD_INTERNAL_VAL:
                                        crate::thread::local_impl::LazyStorage<Cell<Option<Context>>,
                                        !> =
                                        crate::thread::local_impl::LazyStorage::new();
                                    __RUST_STD_INTERNAL_VAL.get_or_init(__rust_std_internal_init,
                                        __rust_std_internal_init_fn)
                                }
                        }
                    })
        }
    };thread_local! {
42            /// Cached thread-local context.
43            static CONTEXT: Cell<Option<Context>> = Cell::new(Some(Context::new()));
44        }
45
46        let mut f = Some(f);
47        let mut f = |cx: &Context| -> R {
48            let f = f.take().unwrap();
49            f(cx)
50        };
51
52        CONTEXT
53            .try_with(|cell| match cell.take() {
54                None => f(&Context::new()),
55                Some(cx) => {
56                    cx.reset();
57                    let res = f(&cx);
58                    cell.set(Some(cx));
59                    res
60                }
61            })
62            .unwrap_or_else(|_| f(&Context::new()))
63    }
64
65    /// Creates a new `Context`.
66    #[cold]
67    fn new() -> Context {
68        Context {
69            inner: Arc::new(Inner {
70                select: AtomicUsize::new(Selected::Waiting.into()),
71                packet: AtomicPtr::new(ptr::null_mut()),
72                thread: thread::current_or_unnamed(),
73                thread_id: current_thread_id(),
74            }),
75        }
76    }
77
78    /// Resets `select` and `packet`.
79    #[inline]
80    fn reset(&self) {
81        self.inner.select.store(Selected::Waiting.into(), Ordering::Release);
82        self.inner.packet.store(ptr::null_mut(), Ordering::Release);
83    }
84
85    /// Attempts to select an operation.
86    ///
87    /// On failure, the previously selected operation is returned.
88    #[inline]
89    pub fn try_select(&self, select: Selected) -> Result<(), Selected> {
90        self.inner
91            .select
92            .compare_exchange(
93                Selected::Waiting.into(),
94                select.into(),
95                Ordering::AcqRel,
96                Ordering::Acquire,
97            )
98            .map(|_| ())
99            .map_err(|e| e.into())
100    }
101
102    /// Stores a packet.
103    ///
104    /// This method must be called after `try_select` succeeds and there is a packet to provide.
105    #[inline]
106    pub fn store_packet(&self, packet: *mut ()) {
107        if !packet.is_null() {
108            self.inner.packet.store(packet, Ordering::Release);
109        }
110    }
111
112    /// Waits until an operation is selected and returns it.
113    ///
114    /// If the deadline is reached, `Selected::Aborted` will be selected.
115    ///
116    /// # Safety
117    /// This may only be called from the thread this `Context` belongs to.
118    #[inline]
119    pub unsafe fn wait_until(&self, deadline: Option<Instant>) -> Selected {
120        loop {
121            // Check whether an operation has been selected.
122            let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
123            if sel != Selected::Waiting {
124                return sel;
125            }
126
127            // If there's a deadline, park the current thread until the deadline is reached.
128            if let Some(end) = deadline {
129                let now = Instant::now();
130
131                if now < end {
132                    // SAFETY: guaranteed by caller.
133                    unsafe { self.inner.thread.park_timeout(end - now) };
134                } else {
135                    // The deadline has been reached. Try aborting select.
136                    return match self.try_select(Selected::Aborted) {
137                        Ok(()) => Selected::Aborted,
138                        Err(s) => s,
139                    };
140                }
141            } else {
142                // SAFETY: guaranteed by caller.
143                unsafe { self.inner.thread.park() };
144            }
145        }
146    }
147
148    /// Unparks the thread this context belongs to.
149    #[inline]
150    pub fn unpark(&self) {
151        self.inner.thread.unpark();
152    }
153
154    /// Returns the id of the thread this context belongs to.
155    #[inline]
156    pub fn thread_id(&self) -> usize {
157        self.inner.thread_id
158    }
159}