1use super::context::Context;
4use super::error::*;
5use super::select::{Operation, Selected, Token};
6use super::utils::{Backoff, CachePadded};
7use super::waker::SyncWaker;
8use crate::cell::UnsafeCell;
9use crate::marker::PhantomData;
10use crate::mem::MaybeUninit;
11use crate::ptr;
12use crate::sync::atomic::{self, Atomic, AtomicPtr, AtomicUsize, Ordering};
13use crate::time::Instant;
14
15const WRITE: usize = 1;
20const READ: usize = 2;
21const DESTROY: usize = 4;
22
23const LAP: usize = 32;
25const BLOCK_CAP: usize = LAP - 1;
27const SHIFT: usize = 1;
29const MARK_BIT: usize = 1;
33
34struct Slot<T> {
36 msg: UnsafeCell<MaybeUninit<T>>,
38
39 state: Atomic<usize>,
41}
42
43impl<T> Slot<T> {
44 fn wait_write(&self) {
46 let backoff = Backoff::new();
47 while self.state.load(Ordering::Acquire) & WRITE == 0 {
48 backoff.spin_heavy();
49 }
50 }
51}
52
53struct Block<T> {
57 next: Atomic<*mut Block<T>>,
59
60 slots: [Slot<T>; BLOCK_CAP],
62}
63
64impl<T> Block<T> {
65 fn new() -> Box<Block<T>> {
67 unsafe { Box::new_zeroed().assume_init() }
74 }
75
76 fn wait_next(&self) -> *mut Block<T> {
78 let backoff = Backoff::new();
79 loop {
80 let next = self.next.load(Ordering::Acquire);
81 if !next.is_null() {
82 return next;
83 }
84 backoff.spin_heavy();
85 }
86 }
87
88 unsafe fn destroy(this: *mut Block<T>, start: usize) {
90 for i in start..BLOCK_CAP - 1 {
93 let slot = unsafe { (*this).slots.get_unchecked(i) };
94
95 if slot.state.load(Ordering::Acquire) & READ == 0
97 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
98 {
99 return;
101 }
102 }
103
104 drop(unsafe { Box::from_raw(this) });
106 }
107}
108
109#[derive(#[automatically_derived]
impl<T: ::core::fmt::Debug> ::core::fmt::Debug for Position<T> {
#[inline]
fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
::core::fmt::Formatter::debug_struct_field2_finish(f, "Position",
"index", &self.index, "block", &&self.block)
}
}Debug)]
111struct Position<T> {
112 index: Atomic<usize>,
114
115 block: Atomic<*mut Block<T>>,
117}
118
119#[derive(#[automatically_derived]
impl ::core::fmt::Debug for ListToken {
#[inline]
fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
::core::fmt::Formatter::debug_struct_field2_finish(f, "ListToken",
"block", &self.block, "offset", &&self.offset)
}
}Debug)]
121pub(crate) struct ListToken {
122 block: *const u8,
124
125 offset: usize,
127}
128
129impl Default for ListToken {
130 #[inline]
131 fn default() -> Self {
132 ListToken { block: ptr::null(), offset: 0 }
133 }
134}
135
136pub(crate) struct Channel<T> {
144 head: CachePadded<Position<T>>,
146
147 tail: CachePadded<Position<T>>,
149
150 receivers: SyncWaker,
152
153 _marker: PhantomData<T>,
155}
156
157impl<T> Channel<T> {
158 pub(crate) fn new() -> Self {
160 Channel {
161 head: CachePadded::new(Position {
162 block: AtomicPtr::new(ptr::null_mut()),
163 index: AtomicUsize::new(0),
164 }),
165 tail: CachePadded::new(Position {
166 block: AtomicPtr::new(ptr::null_mut()),
167 index: AtomicUsize::new(0),
168 }),
169 receivers: SyncWaker::new(),
170 _marker: PhantomData,
171 }
172 }
173
174 fn start_send(&self, token: &mut Token) -> bool {
176 let backoff = Backoff::new();
177 let mut tail = self.tail.index.load(Ordering::Acquire);
178 let mut block = self.tail.block.load(Ordering::Acquire);
179 let mut next_block = None;
180
181 loop {
182 if tail & MARK_BIT != 0 {
184 token.list.block = ptr::null();
185 return true;
186 }
187
188 let offset = (tail >> SHIFT) % LAP;
190
191 if offset == BLOCK_CAP {
193 backoff.spin_heavy();
194 tail = self.tail.index.load(Ordering::Acquire);
195 block = self.tail.block.load(Ordering::Acquire);
196 continue;
197 }
198
199 if offset + 1 == BLOCK_CAP && next_block.is_none() {
202 next_block = Some(Block::<T>::new());
203 }
204
205 if block.is_null() {
208 let new = Box::into_raw(Block::<T>::new());
209
210 if self
211 .tail
212 .block
213 .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
214 .is_ok()
215 {
216 #[cfg(miri)]
220 crate::thread::yield_now();
221 self.head.block.store(new, Ordering::Release);
222 block = new;
223 } else {
224 next_block = unsafe { Some(Box::from_raw(new)) };
225 tail = self.tail.index.load(Ordering::Acquire);
226 block = self.tail.block.load(Ordering::Acquire);
227 continue;
228 }
229 }
230
231 let new_tail = tail + (1 << SHIFT);
232
233 match self.tail.index.compare_exchange_weak(
235 tail,
236 new_tail,
237 Ordering::SeqCst,
238 Ordering::Acquire,
239 ) {
240 Ok(_) => unsafe {
241 if offset + 1 == BLOCK_CAP {
243 let next_block = Box::into_raw(next_block.unwrap());
244 self.tail.block.store(next_block, Ordering::Release);
245 self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
246 (*block).next.store(next_block, Ordering::Release);
247 }
248
249 token.list.block = block as *const u8;
250 token.list.offset = offset;
251 return true;
252 },
253 Err(_) => {
254 backoff.spin_light();
255 tail = self.tail.index.load(Ordering::Acquire);
256 block = self.tail.block.load(Ordering::Acquire);
257 }
258 }
259 }
260 }
261
262 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
264 if token.list.block.is_null() {
266 return Err(msg);
267 }
268
269 let block = token.list.block as *mut Block<T>;
271 let offset = token.list.offset;
272 unsafe {
273 let slot = (*block).slots.get_unchecked(offset);
274 slot.msg.get().write(MaybeUninit::new(msg));
275 slot.state.fetch_or(WRITE, Ordering::Release);
276 }
277
278 self.receivers.notify();
280 Ok(())
281 }
282
283 fn start_recv(&self, token: &mut Token) -> bool {
285 let backoff = Backoff::new();
286 let mut head = self.head.index.load(Ordering::Acquire);
287 let mut block = self.head.block.load(Ordering::Acquire);
288
289 loop {
290 let offset = (head >> SHIFT) % LAP;
292
293 if offset == BLOCK_CAP {
295 backoff.spin_heavy();
296 head = self.head.index.load(Ordering::Acquire);
297 block = self.head.block.load(Ordering::Acquire);
298 continue;
299 }
300
301 let mut new_head = head + (1 << SHIFT);
302
303 if new_head & MARK_BIT == 0 {
304 atomic::fence(Ordering::SeqCst);
305 let tail = self.tail.index.load(Ordering::Relaxed);
306
307 if head >> SHIFT == tail >> SHIFT {
309 if tail & MARK_BIT != 0 {
311 token.list.block = ptr::null();
313 return true;
314 } else {
315 return false;
317 }
318 }
319
320 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
322 new_head |= MARK_BIT;
323 }
324 }
325
326 if block.is_null() {
329 backoff.spin_heavy();
330 head = self.head.index.load(Ordering::Acquire);
331 block = self.head.block.load(Ordering::Acquire);
332 continue;
333 }
334
335 match self.head.index.compare_exchange_weak(
337 head,
338 new_head,
339 Ordering::SeqCst,
340 Ordering::Acquire,
341 ) {
342 Ok(_) => unsafe {
343 if offset + 1 == BLOCK_CAP {
345 let next = (*block).wait_next();
346 let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
347 if !(*next).next.load(Ordering::Relaxed).is_null() {
348 next_index |= MARK_BIT;
349 }
350
351 self.head.block.store(next, Ordering::Release);
352 self.head.index.store(next_index, Ordering::Release);
353 }
354
355 token.list.block = block as *const u8;
356 token.list.offset = offset;
357 return true;
358 },
359 Err(_) => {
360 backoff.spin_light();
361 head = self.head.index.load(Ordering::Acquire);
362 block = self.head.block.load(Ordering::Acquire);
363 }
364 }
365 }
366 }
367
368 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
370 if token.list.block.is_null() {
371 return Err(());
373 }
374
375 let block = token.list.block as *mut Block<T>;
377 let offset = token.list.offset;
378 unsafe {
379 let slot = (*block).slots.get_unchecked(offset);
380 slot.wait_write();
381 let msg = slot.msg.get().read().assume_init();
382
383 if offset + 1 == BLOCK_CAP {
386 Block::destroy(block, 0);
387 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
388 Block::destroy(block, offset + 1);
389 }
390
391 Ok(msg)
392 }
393 }
394
395 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
397 self.send(msg, None).map_err(|err| match err {
398 SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
399 SendTimeoutError::Timeout(_) => ::core::panicking::panic("internal error: entered unreachable code")unreachable!(),
400 })
401 }
402
403 pub(crate) fn send(
405 &self,
406 msg: T,
407 _deadline: Option<Instant>,
408 ) -> Result<(), SendTimeoutError<T>> {
409 let token = &mut Token::default();
410 if !self.start_send(token) {
::core::panicking::panic("assertion failed: self.start_send(token)")
};assert!(self.start_send(token));
411 unsafe { self.write(token, msg).map_err(SendTimeoutError::Disconnected) }
412 }
413
414 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
416 let token = &mut Token::default();
417
418 if self.start_recv(token) {
419 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
420 } else {
421 Err(TryRecvError::Empty)
422 }
423 }
424
425 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
427 let token = &mut Token::default();
428 loop {
429 if self.start_recv(token) {
430 unsafe {
431 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
432 }
433 }
434
435 if let Some(d) = deadline {
436 if Instant::now() >= d {
437 return Err(RecvTimeoutError::Timeout);
438 }
439 }
440
441 Context::with(|cx| {
443 let oper = Operation::hook(token);
444 self.receivers.register(oper, cx);
445
446 if !self.is_empty() || self.is_disconnected() {
448 let _ = cx.try_select(Selected::Aborted);
449 }
450
451 let sel = unsafe { cx.wait_until(deadline) };
454
455 match sel {
456 Selected::Waiting => ::core::panicking::panic("internal error: entered unreachable code")unreachable!(),
457 Selected::Aborted | Selected::Disconnected => {
458 self.receivers.unregister(oper).unwrap();
459 }
462 Selected::Operation(_) => {}
463 }
464 });
465 }
466 }
467
468 pub(crate) fn len(&self) -> usize {
470 loop {
471 let mut tail = self.tail.index.load(Ordering::SeqCst);
473 let mut head = self.head.index.load(Ordering::SeqCst);
474
475 if self.tail.index.load(Ordering::SeqCst) == tail {
477 tail &= !((1 << SHIFT) - 1);
479 head &= !((1 << SHIFT) - 1);
480
481 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
483 tail = tail.wrapping_add(1 << SHIFT);
484 }
485 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
486 head = head.wrapping_add(1 << SHIFT);
487 }
488
489 let lap = (head >> SHIFT) / LAP;
491 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
492 head = head.wrapping_sub((lap * LAP) << SHIFT);
493
494 tail >>= SHIFT;
496 head >>= SHIFT;
497
498 return tail - head - tail / LAP;
500 }
501 }
502 }
503
504 pub(crate) fn capacity(&self) -> Option<usize> {
506 None
507 }
508
509 pub(crate) fn disconnect_senders(&self) -> bool {
513 let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
514
515 if tail & MARK_BIT == 0 {
516 self.receivers.disconnect();
517 true
518 } else {
519 false
520 }
521 }
522
523 pub(crate) fn disconnect_receivers(&self) -> bool {
527 let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
528
529 if tail & MARK_BIT == 0 {
530 self.discard_all_messages();
533 true
534 } else {
535 false
536 }
537 }
538
539 fn discard_all_messages(&self) {
543 let backoff = Backoff::new();
544 let mut tail = self.tail.index.load(Ordering::Acquire);
545 loop {
546 let offset = (tail >> SHIFT) % LAP;
547 if offset != BLOCK_CAP {
548 break;
549 }
550
551 backoff.spin_heavy();
555 tail = self.tail.index.load(Ordering::Acquire);
556 }
557
558 let mut head = self.head.index.load(Ordering::Acquire);
559 let mut block = self.head.block.swap(ptr::null_mut(), Ordering::AcqRel);
563
564 if head >> SHIFT != tail >> SHIFT {
566 while block.is_null() {
571 backoff.spin_heavy();
572 block = self.head.block.swap(ptr::null_mut(), Ordering::AcqRel);
573 }
574 }
575 unsafe {
583 while head >> SHIFT != tail >> SHIFT {
585 let offset = (head >> SHIFT) % LAP;
586
587 if offset < BLOCK_CAP {
588 let slot = (*block).slots.get_unchecked(offset);
590 slot.wait_write();
591 let p = &mut *slot.msg.get();
592 p.as_mut_ptr().drop_in_place();
593 } else {
594 (*block).wait_next();
595 let next = (*block).next.load(Ordering::Acquire);
597 drop(Box::from_raw(block));
598 block = next;
599 }
600
601 head = head.wrapping_add(1 << SHIFT);
602 }
603
604 if !block.is_null() {
606 drop(Box::from_raw(block));
607 }
608 }
609
610 head &= !MARK_BIT;
611 self.head.index.store(head, Ordering::Release);
612 }
613
614 pub(crate) fn is_disconnected(&self) -> bool {
616 self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
617 }
618
619 pub(crate) fn is_empty(&self) -> bool {
621 let head = self.head.index.load(Ordering::SeqCst);
622 let tail = self.tail.index.load(Ordering::SeqCst);
623 head >> SHIFT == tail >> SHIFT
624 }
625
626 pub(crate) fn is_full(&self) -> bool {
628 false
629 }
630}
631
632impl<T> Drop for Channel<T> {
633 fn drop(&mut self) {
634 let mut head = self.head.index.load(Ordering::Relaxed);
635 let mut tail = self.tail.index.load(Ordering::Relaxed);
636 let mut block = self.head.block.load(Ordering::Relaxed);
637
638 head &= !((1 << SHIFT) - 1);
640 tail &= !((1 << SHIFT) - 1);
641
642 unsafe {
643 while head != tail {
645 let offset = (head >> SHIFT) % LAP;
646
647 if offset < BLOCK_CAP {
648 let slot = (*block).slots.get_unchecked(offset);
650 let p = &mut *slot.msg.get();
651 p.as_mut_ptr().drop_in_place();
652 } else {
653 let next = (*block).next.load(Ordering::Relaxed);
655 drop(Box::from_raw(block));
656 block = next;
657 }
658
659 head = head.wrapping_add(1 << SHIFT);
660 }
661
662 if !block.is_null() {
664 drop(Box::from_raw(block));
665 }
666 }
667 }
668}