arc_swap/debt/list.rs
1//! A linked list of debt nodes.
2//!
3//! A node may or may not be owned by a thread. Reader debts are allocated in its owned node,
4//! writer walks everything (but may also use some owned values).
5//!
6//! The list is prepend-only ‒ if thread dies, the node lives on (and can be claimed by another
7//! thread later on). This makes the implementation much simpler, since everything here is
8//! `'static` and we don't have to care about knowing when to free stuff.
9//!
10//! The nodes contain both the fast primary slots and a secondary fallback ones.
11//!
12//! # Synchronization
13//!
14//! We synchronize several things here.
15//!
16//! The addition of nodes is synchronized through the head (Load on each read, AcqReal on each
17//! attempt to add another node). Note that certain parts never change after that (they aren't even
18//! atomic) and other things that do change take care of themselves (the debt slots have their own
19//! synchronization, etc).
20//!
21//! The ownership is acquire-release lock pattern.
22//!
23//! Similar, the counting of active writers is an acquire-release lock pattern.
24//!
25//! We also do release-acquire "send" from the start-cooldown to check-cooldown to make sure we see
26//! at least as up to date value of the writers as when the cooldown started. That we if we see 0,
27//! we know it must have happened since then.
28
29use std::cell::Cell;
30use std::ptr;
31use std::slice::Iter;
32use std::sync::atomic::Ordering::*;
33use std::sync::atomic::{AtomicPtr, AtomicUsize};
34
35use super::fast::{Local as FastLocal, Slots as FastSlots};
36use super::helping::{Local as HelpingLocal, Slots as HelpingSlots};
37use super::Debt;
38use crate::RefCnt;
39
40const NODE_UNUSED: usize = 0;
41const NODE_USED: usize = 1;
42const NODE_COOLDOWN: usize = 2;
43
44/// The head of the debt linked list.
45static LIST_HEAD: AtomicPtr<Node> = AtomicPtr::new(ptr::null_mut());
46
47pub struct NodeReservation<'a>(&'a Node);
48
49impl Drop for NodeReservation<'_> {
50 fn drop(&mut self) {
51 self.0.active_writers.fetch_sub(1, Release);
52 }
53}
54
55/// One thread-local node for debts.
56#[repr(C, align(64))]
57pub(crate) struct Node {
58 fast: FastSlots,
59 helping: HelpingSlots,
60 in_use: AtomicUsize,
61 // Next node in the list.
62 //
63 // It is a pointer because we touch it before synchronization (we don't _dereference_ it before
64 // synchronization, only manipulate the pointer itself). That is illegal according to strict
65 // interpretation of the rules by MIRI on references.
66 next: *const Node,
67 active_writers: AtomicUsize,
68}
69
70impl Default for Node {
71 fn default() -> Self {
72 Node {
73 fast: FastSlots::default(),
74 helping: HelpingSlots::default(),
75 in_use: AtomicUsize::new(NODE_USED),
76 next: ptr::null(),
77 active_writers: AtomicUsize::new(0),
78 }
79 }
80}
81
82impl Node {
83 /// Goes through the debt linked list.
84 ///
85 /// This traverses the linked list, calling the closure on each node. If the closure returns
86 /// `Some`, it terminates with that value early, otherwise it runs to the end.
87 pub(crate) fn traverse<R, F: FnMut(&'static Node) -> Option<R>>(mut f: F) -> Option<R> {
88 // Acquire ‒ we want to make sure we read the correct version of data at the end of the
89 // pointer. Any write to the DEBT_HEAD is with Release.
90 //
91 // Furthermore, we need to see the newest version of the list in case we examine the debts
92 // - if a new one is added recently, we don't want a stale read -> SeqCst.
93 //
94 // Note that the other pointers in the chain never change and are *ordinary* pointers. The
95 // whole linked list is synchronized through the head.
96 let mut current = unsafe { LIST_HEAD.load(SeqCst).as_ref() };
97 while let Some(node) = current {
98 let result = f(node);
99 if result.is_some() {
100 return result;
101 }
102 current = unsafe { node.next.as_ref() };
103 }
104 None
105 }
106
107 /// Put the current thread node into cooldown
108 fn start_cooldown(&self) {
109 // Trick: Make sure we have an up to date value of the active_writers in this thread, so we
110 // can properly release it below.
111 let _reservation = self.reserve_writer();
112 assert_eq!(NODE_USED, self.in_use.swap(NODE_COOLDOWN, Release));
113 }
114
115 /// Perform a cooldown if the node is ready.
116 ///
117 /// See the ABA protection at the [helping].
118 fn check_cooldown(&self) {
119 // Check if the node is in cooldown, for two reasons:
120 // * Skip most of nodes fast, without dealing with them.
121 // * More importantly, sync the value of active_writers to be at least the value when the
122 // cooldown started. That way we know the 0 we observe happened some time after
123 // start_cooldown.
124 if self.in_use.load(Acquire) == NODE_COOLDOWN {
125 // The rest can be nicely relaxed ‒ no memory is being synchronized by these
126 // operations. We just see an up to date 0 and allow someone (possibly us) to claim the
127 // node later on.
128 if self.active_writers.load(Relaxed) == 0 {
129 let _ = self
130 .in_use
131 .compare_exchange(NODE_COOLDOWN, NODE_UNUSED, Relaxed, Relaxed);
132 }
133 }
134 }
135
136 /// Mark this node that a writer is currently playing with it.
137 pub fn reserve_writer(&self) -> NodeReservation {
138 self.active_writers.fetch_add(1, Acquire);
139 NodeReservation(self)
140 }
141
142 /// "Allocate" a node.
143 ///
144 /// Either a new one is created, or previous one is reused. The node is claimed to become
145 /// in_use.
146 fn get() -> &'static Self {
147 // Try to find an unused one in the chain and reuse it.
148 Self::traverse(|node| {
149 node.check_cooldown();
150 if node
151 .in_use
152 // We claim a unique control over the generation and the right to write to slots if
153 // they are NO_DEPT
154 .compare_exchange(NODE_UNUSED, NODE_USED, SeqCst, Relaxed)
155 .is_ok()
156 {
157 Some(node)
158 } else {
159 None
160 }
161 })
162 // If that didn't work, create a new one and prepend to the list.
163 .unwrap_or_else(|| {
164 let node = Box::leak(Box::<Node>::default());
165 node.helping.init();
166 // We don't want to read any data in addition to the head, Relaxed is fine
167 // here.
168 //
169 // We do need to release the data to others, but for that, we acquire in the
170 // compare_exchange below.
171 let mut head = LIST_HEAD.load(Relaxed);
172 loop {
173 node.next = head;
174 if let Err(old) = LIST_HEAD.compare_exchange_weak(
175 head, node,
176 // We need to release *the whole chain* here. For that, we need to
177 // acquire it first.
178 //
179 // SeqCst because we need to make sure it is properly set "before" we do
180 // anything to the debts.
181 SeqCst, Relaxed, // Nothing changed, go next round of the loop.
182 ) {
183 head = old;
184 } else {
185 return node;
186 }
187 }
188 })
189 }
190
191 /// Iterate over the fast slots.
192 pub(crate) fn fast_slots(&self) -> Iter<Debt> {
193 self.fast.into_iter()
194 }
195
196 /// Access the helping slot.
197 pub(crate) fn helping_slot(&self) -> &Debt {
198 self.helping.slot()
199 }
200}
201
202/// A wrapper around a node pointer, to un-claim the node on thread shutdown.
203pub(crate) struct LocalNode {
204 /// Node for this thread, if any.
205 ///
206 /// We don't necessarily have to own one, but if we don't, we'll get one before the first use.
207 node: Cell<Option<&'static Node>>,
208
209 /// Thread-local data for the fast slots.
210 fast: FastLocal,
211
212 /// Thread local data for the helping strategy.
213 helping: HelpingLocal,
214}
215
216impl LocalNode {
217 pub(crate) fn with<R, F: FnOnce(&LocalNode) -> R>(f: F) -> R {
218 let f = Cell::new(Some(f));
219 THREAD_HEAD
220 .try_with(|head| {
221 if head.node.get().is_none() {
222 head.node.set(Some(Node::get()));
223 }
224 let f = f.take().unwrap();
225 f(head)
226 })
227 // During the application shutdown, the thread local storage may be already
228 // deallocated. In that case, the above fails but we still need something. So we just
229 // find or allocate a node and use it just once.
230 //
231 // Note that the situation should be very very rare and not happen often, so the slower
232 // performance doesn't matter that much.
233 .unwrap_or_else(|_| {
234 let tmp_node = LocalNode {
235 node: Cell::new(Some(Node::get())),
236 fast: FastLocal::default(),
237 helping: HelpingLocal::default(),
238 };
239 let f = f.take().unwrap();
240 f(&tmp_node)
241 // Drop of tmp_node -> sends the node we just used into cooldown.
242 })
243 }
244
245 /// Creates a new debt.
246 ///
247 /// This stores the debt of the given pointer (untyped, casted into an usize) and returns a
248 /// reference to that slot, or gives up with `None` if all the slots are currently full.
249 #[inline]
250 pub(crate) fn new_fast(&self, ptr: usize) -> Option<&'static Debt> {
251 let node = &self.node.get().expect("LocalNode::with ensures it is set");
252 debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
253 node.fast.get_debt(ptr, &self.fast)
254 }
255
256 /// Initializes a helping slot transaction.
257 ///
258 /// Returns the generation (with tag).
259 pub(crate) fn new_helping(&self, ptr: usize) -> usize {
260 let node = &self.node.get().expect("LocalNode::with ensures it is set");
261 debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
262 let (gen, discard) = node.helping.get_debt(ptr, &self.helping);
263 if discard {
264 // Too many generations happened, make sure the writers give the poor node a break for
265 // a while so they don't observe the generation wrapping around.
266 node.start_cooldown();
267 self.node.take();
268 }
269 gen
270 }
271
272 /// Confirm the helping transaction.
273 ///
274 /// The generation comes from previous new_helping.
275 ///
276 /// Will either return a debt with the pointer, or a debt to pay and a replacement (already
277 /// protected) address.
278 pub(crate) fn confirm_helping(
279 &self,
280 gen: usize,
281 ptr: usize,
282 ) -> Result<&'static Debt, (&'static Debt, usize)> {
283 let node = &self.node.get().expect("LocalNode::with ensures it is set");
284 debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
285 let slot = node.helping_slot();
286 node.helping
287 .confirm(gen, ptr)
288 .map(|()| slot)
289 .map_err(|repl| (slot, repl))
290 }
291
292 /// The writer side of a helping slot.
293 ///
294 /// This potentially helps the `who` node (uses self as the local node, which must be
295 /// different) by loading the address that one is trying to load.
296 pub(super) fn help<R, T>(&self, who: &Node, storage_addr: usize, replacement: &R)
297 where
298 T: RefCnt,
299 R: Fn() -> T,
300 {
301 let node = &self.node.get().expect("LocalNode::with ensures it is set");
302 debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
303 node.helping.help(&who.helping, storage_addr, replacement)
304 }
305}
306
307impl Drop for LocalNode {
308 fn drop(&mut self) {
309 if let Some(node) = self.node.get() {
310 // Release - syncing writes/ownership of this Node
311 node.start_cooldown();
312 }
313 }
314}
315
316thread_local! {
317 /// A debt node assigned to this thread.
318 static THREAD_HEAD: LocalNode = LocalNode {
319 node: Cell::new(None),
320 fast: FastLocal::default(),
321 helping: HelpingLocal::default(),
322 };
323}
324
325#[cfg(test)]
326mod tests {
327 use super::*;
328
329 impl Node {
330 fn is_empty(&self) -> bool {
331 self.fast_slots()
332 .chain(std::iter::once(self.helping_slot()))
333 .all(|d| d.0.load(Relaxed) == Debt::NONE)
334 }
335
336 fn get_thread() -> &'static Self {
337 LocalNode::with(|h| h.node.get().unwrap())
338 }
339 }
340
341 /// A freshly acquired thread local node is empty.
342 #[test]
343 fn new_empty() {
344 assert!(Node::get_thread().is_empty());
345 }
346}