source: libcfa/src/containers/queueLockFree.hfa@ affb51b

ADT ast-experimental
Last change on this file since affb51b was bbf61838, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Fixed missing header

  • Property mode set to 100644
File size: 3.8 KB
RevLine 
[304de00]1#pragma once
2
3#include <assert.h>
4
[bbf61838]5#include <bits/defs.hfa>
6
[304de00]7forall( T &) {
8 //------------------------------------------------------------
9 // Queue based on the MCS lock
10 // It is a Multi-Producer/Single-Consumer queue threads pushing
11 // elements must hold on to the elements they push
12 // Not appropriate for an async message queue for example,
13 struct mcs_queue {
14 T * volatile tail;
15 };
16
17 static inline void ?{}(mcs_queue(T) & this) { this.tail = 0p; }
18 static inline bool empty(const mcs_queue(T) & this) { return !this.tail; }
19
20 static inline forall(| { T * volatile & ?`next ( T * ); })
21 {
22 // Adds an element to the list
23 // Multi-Thread Safe, Lock-Free
[2d95a2d]24 T * push(mcs_queue(T) & this, T * elem) __attribute__((artificial));
25 T * push(mcs_queue(T) & this, T * elem) {
26 /* paranoid */ verify(!(elem`next));
[304de00]27 // Race to add to the tail
[2d95a2d]28 T * prev = __atomic_exchange_n(&this.tail, elem, __ATOMIC_SEQ_CST);
[304de00]29 // If we aren't the first, we need to tell the person before us
30 // No need to
[2d95a2d]31 if (prev) prev`next = elem;
[304de00]32 return prev;
33 }
34
35 // Advances the head of the list, dropping the element given.
36 // Passing an element that is not the head is undefined behavior
37 // NOT Multi-Thread Safe, concurrent pushes are safe
[2d95a2d]38 T * advance(mcs_queue(T) & this, T * elem) __attribute__((artificial));
39 T * advance(mcs_queue(T) & this, T * elem) {
40 T * expected = elem;
[304de00]41 // Check if this is already the last item
42 if (__atomic_compare_exchange_n(&this.tail, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) return 0p;
43
[2d95a2d]44 // If not wait for next item to show-up, filled by push
45 while (!(elem`next)) Pause();
46
47 // we need to return if the next link was empty
48 T * ret = elem`next;
49
50 // invalidate link to reset to initial state
51 elem`next = 0p;
52 return ret;
[304de00]53 }
54 }
55
56 //------------------------------------------------------------
57 // Queue based on the MCS lock
58 // Extension of the above lock which supports 'blind' pops.
59 // i.e., popping a value from the head without knowing what the head is
60 // has no extra guarantees beyond the mcs_queue
61 struct mpsc_queue {
62 inline mcs_queue(T);
63 T * volatile head;
64 };
65
66 static inline void ?{}(mpsc_queue(T) & this) {
67 ((mcs_queue(T)&)this){};
68 this.head = 0p;
69 }
70
71 static inline forall(| { T * volatile & ?`next ( T * ); })
72 {
73 // Added a new element to the queue
74 // Multi-Thread Safe, Lock-Free
[2d95a2d]75 T * push(mpsc_queue(T) & this, T * elem) __attribute__((artificial));
76 T * push(mpsc_queue(T) & this, T * elem) {
[304de00]77 T * prev = push((mcs_queue(T)&)this, elem);
[2d95a2d]78 if (!prev) this.head = elem;
[304de00]79 return prev;
80 }
81
82 // Pop an element from the queue
83 // return the element that was removed
84 // next is set to the new head of the queue
85 // NOT Multi-Thread Safe
[2d95a2d]86 T * pop(mpsc_queue(T) & this, T *& next) __attribute__((artificial));
[304de00]87 T * pop(mpsc_queue(T) & this, T *& next) {
88 T * elem = this.head;
89 // If head is empty just return
90 if (!elem) return 0p;
91
92 // If there is already someone in the list, then it's easy
93 if (elem`next) {
94 this.head = next = elem`next;
95 // force memory sync
96 __atomic_thread_fence(__ATOMIC_SEQ_CST);
[2d95a2d]97
98 // invalidate link to reset to initial state
99 elem`next = 0p;
[304de00]100 }
101 // Otherwise, there might be a race where it only looks but someone is enqueuing
102 else {
103 // null out head here, because we linearize with push
104 // at the CAS in advance and therefore can write to head
105 // after that point, it could overwrite the write in push
106 this.head = 0p;
[2d95a2d]107 next = advance((mcs_queue(T)&)this, elem);
[304de00]108
109 // Only write to the head if there is a next element
110 // it is the only way we can guarantee we are not overwriting
111 // a write made in push
112 if (next) this.head = next;
113 }
114
115 // return removed element
116 return elem;
117 }
118
119 // Same as previous function
120 T * pop(mpsc_queue(T) & this) {
121 T * _ = 0p;
122 return pop(this, _);
123 }
124 }
125}
Note: See TracBrowser for help on using the repository browser.