source: libcfa/src/containers/queueLockFree.hfa@ 357fae8

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since 357fae8 was 304de00, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Added queueLockFree which contains two lockfree queue based on MCS.

  • Property mode set to 100644
File size: 3.4 KB
Line 
1#pragma once
2
3#include <assert.h>
4
5forall( T &) {
6 //------------------------------------------------------------
7 // Queue based on the MCS lock
8 // It is a Multi-Producer/Single-Consumer queue threads pushing
9 // elements must hold on to the elements they push
10 // Not appropriate for an async message queue for example,
11 struct mcs_queue {
12 T * volatile tail;
13 };
14
15 static inline void ?{}(mcs_queue(T) & this) { this.tail = 0p; }
16 static inline bool empty(const mcs_queue(T) & this) { return !this.tail; }
17
18 static inline forall(| { T * volatile & ?`next ( T * ); })
19 {
20 // Adds an element to the list
21 // Multi-Thread Safe, Lock-Free
22 T * push(mcs_queue(T) & this, T & elem) {
23 /* paranoid */ verify(!(&elem)`next);
24 // Race to add to the tail
25 T * prev = __atomic_exchange_n(&this.tail, &elem, __ATOMIC_SEQ_CST);
26 // If we aren't the first, we need to tell the person before us
27 // No need to
28 if (prev) prev`next = &elem;
29 return prev;
30 }
31
32 // Advances the head of the list, dropping the element given.
33 // Passing an element that is not the head is undefined behavior
34 // NOT Multi-Thread Safe, concurrent pushes are safe
35 T * advance(mcs_queue(T) & this, T & elem) {
36 T * expected = &elem;
37 // Check if this is already the last item
38 if (__atomic_compare_exchange_n(&this.tail, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) return 0p;
39
40 // If not wait for next item to show-up
41 // added by push
42 while (!(&elem)`next) Pause();
43 return (&elem)`next;
44 }
45 }
46
47 //------------------------------------------------------------
48 // Queue based on the MCS lock
49 // Extension of the above lock which supports 'blind' pops.
50 // i.e., popping a value from the head without knowing what the head is
51 // has no extra guarantees beyond the mcs_queue
52 struct mpsc_queue {
53 inline mcs_queue(T);
54 T * volatile head;
55 };
56
57 static inline void ?{}(mpsc_queue(T) & this) {
58 ((mcs_queue(T)&)this){};
59 this.head = 0p;
60 }
61
62 static inline forall(| { T * volatile & ?`next ( T * ); })
63 {
64 // Added a new element to the queue
65 // Multi-Thread Safe, Lock-Free
66 T * push(mpsc_queue(T) & this, T & elem) {
67 T * prev = push((mcs_queue(T)&)this, elem);
68 if (!prev) this.head = &elem;
69 return prev;
70 }
71
72 // Pop an element from the queue
73 // return the element that was removed
74 // next is set to the new head of the queue
75 // NOT Multi-Thread Safe
76 T * pop(mpsc_queue(T) & this, T *& next) {
77 T * elem = this.head;
78 // If head is empty just return
79 if (!elem) return 0p;
80
81 // If there is already someone in the list, then it's easy
82 if (elem`next) {
83 this.head = next = elem`next;
84 // force memory sync
85 __atomic_thread_fence(__ATOMIC_SEQ_CST);
86 }
87 // Otherwise, there might be a race where it only looks but someone is enqueuing
88 else {
89 // null out head here, because we linearize with push
90 // at the CAS in advance and therefore can write to head
91 // after that point, it could overwrite the write in push
92 this.head = 0p;
93 next = advance((mcs_queue(T)&)this, (*elem));
94
95 // Only write to the head if there is a next element
96 // it is the only way we can guarantee we are not overwriting
97 // a write made in push
98 if (next) this.head = next;
99 }
100
101 // invalidate link
102 elem`next = 0p;
103
104 // return removed element
105 return elem;
106 }
107
108 // Same as previous function
109 T * pop(mpsc_queue(T) & this) {
110 T * _ = 0p;
111 return pop(this, _);
112 }
113 }
114}
Note: See TracBrowser for help on using the repository browser.