source: libcfa/src/containers/queueLockFree.hfa@ d00ce99

ADT ast-experimental enum forall-pointer-decay pthread-emulation qualifiedEnum
Last change on this file since d00ce99 was 2d95a2d, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Small fixes to lock free queues.

  • Property mode set to 100644
File size: 3.8 KB
Line 
1#pragma once
2
3#include <assert.h>
4
5forall( T &) {
6 //------------------------------------------------------------
7 // Queue based on the MCS lock
8 // It is a Multi-Producer/Single-Consumer queue threads pushing
9 // elements must hold on to the elements they push
10 // Not appropriate for an async message queue for example,
11 struct mcs_queue {
12 T * volatile tail;
13 };
14
15 static inline void ?{}(mcs_queue(T) & this) { this.tail = 0p; }
16 static inline bool empty(const mcs_queue(T) & this) { return !this.tail; }
17
18 static inline forall(| { T * volatile & ?`next ( T * ); })
19 {
20 // Adds an element to the list
21 // Multi-Thread Safe, Lock-Free
22 T * push(mcs_queue(T) & this, T * elem) __attribute__((artificial));
23 T * push(mcs_queue(T) & this, T * elem) {
24 /* paranoid */ verify(!(elem`next));
25 // Race to add to the tail
26 T * prev = __atomic_exchange_n(&this.tail, elem, __ATOMIC_SEQ_CST);
27 // If we aren't the first, we need to tell the person before us
28 // No need to
29 if (prev) prev`next = elem;
30 return prev;
31 }
32
33 // Advances the head of the list, dropping the element given.
34 // Passing an element that is not the head is undefined behavior
35 // NOT Multi-Thread Safe, concurrent pushes are safe
36 T * advance(mcs_queue(T) & this, T * elem) __attribute__((artificial));
37 T * advance(mcs_queue(T) & this, T * elem) {
38 T * expected = elem;
39 // Check if this is already the last item
40 if (__atomic_compare_exchange_n(&this.tail, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) return 0p;
41
42 // If not wait for next item to show-up, filled by push
43 while (!(elem`next)) Pause();
44
45 // we need to return if the next link was empty
46 T * ret = elem`next;
47
48 // invalidate link to reset to initial state
49 elem`next = 0p;
50 return ret;
51 }
52 }
53
54 //------------------------------------------------------------
55 // Queue based on the MCS lock
56 // Extension of the above lock which supports 'blind' pops.
57 // i.e., popping a value from the head without knowing what the head is
58 // has no extra guarantees beyond the mcs_queue
59 struct mpsc_queue {
60 inline mcs_queue(T);
61 T * volatile head;
62 };
63
64 static inline void ?{}(mpsc_queue(T) & this) {
65 ((mcs_queue(T)&)this){};
66 this.head = 0p;
67 }
68
69 static inline forall(| { T * volatile & ?`next ( T * ); })
70 {
71 // Added a new element to the queue
72 // Multi-Thread Safe, Lock-Free
73 T * push(mpsc_queue(T) & this, T * elem) __attribute__((artificial));
74 T * push(mpsc_queue(T) & this, T * elem) {
75 T * prev = push((mcs_queue(T)&)this, elem);
76 if (!prev) this.head = elem;
77 return prev;
78 }
79
80 // Pop an element from the queue
81 // return the element that was removed
82 // next is set to the new head of the queue
83 // NOT Multi-Thread Safe
84 T * pop(mpsc_queue(T) & this, T *& next) __attribute__((artificial));
85 T * pop(mpsc_queue(T) & this, T *& next) {
86 T * elem = this.head;
87 // If head is empty just return
88 if (!elem) return 0p;
89
90 // If there is already someone in the list, then it's easy
91 if (elem`next) {
92 this.head = next = elem`next;
93 // force memory sync
94 __atomic_thread_fence(__ATOMIC_SEQ_CST);
95
96 // invalidate link to reset to initial state
97 elem`next = 0p;
98 }
99 // Otherwise, there might be a race where it only looks but someone is enqueuing
100 else {
101 // null out head here, because we linearize with push
102 // at the CAS in advance and therefore can write to head
103 // after that point, it could overwrite the write in push
104 this.head = 0p;
105 next = advance((mcs_queue(T)&)this, elem);
106
107 // Only write to the head if there is a next element
108 // it is the only way we can guarantee we are not overwriting
109 // a write made in push
110 if (next) this.head = next;
111 }
112
113 // return removed element
114 return elem;
115 }
116
117 // Same as previous function
118 T * pop(mpsc_queue(T) & this) {
119 T * _ = 0p;
120 return pop(this, _);
121 }
122 }
123}
Note: See TracBrowser for help on using the repository browser.