source: libcfa/src/containers/queueLockFree.hfa @ 9cd5bd2

ADTast-experimentalpthread-emulation
Last change on this file since 9cd5bd2 was bbf61838, checked in by Thierry Delisle <tdelisle@…>, 2 years ago

Fixed missing header

  • Property mode set to 100644
File size: 3.8 KB
Line 
1#pragma once
2
3#include <assert.h>
4
5#include <bits/defs.hfa>
6
7forall( T &) {
8        //------------------------------------------------------------
9        // Queue based on the MCS lock
10        // It is a Multi-Producer/Single-Consumer queue threads pushing
11        // elements must hold on to the elements they push
12        // Not appropriate for an async message queue for example,
13        struct mcs_queue {
14                T * volatile tail;
15        };
16
17        static inline void ?{}(mcs_queue(T) & this) { this.tail = 0p; }
18        static inline bool empty(const mcs_queue(T) & this) { return !this.tail; }
19
20        static inline forall(| { T * volatile & ?`next ( T * ); })
21        {
22                // Adds an element to the list
23                // Multi-Thread Safe, Lock-Free
24                T * push(mcs_queue(T) & this, T * elem) __attribute__((artificial));
25                T * push(mcs_queue(T) & this, T * elem) {
26                        /* paranoid */ verify(!(elem`next));
27                        // Race to add to the tail
28                        T * prev = __atomic_exchange_n(&this.tail, elem, __ATOMIC_SEQ_CST);
29                        // If we aren't the first, we need to tell the person before us
30                        // No need to
31                        if (prev) prev`next = elem;
32                        return prev;
33                }
34
35                // Advances the head of the list, dropping the element given.
36                // Passing an element that is not the head is undefined behavior
37                // NOT Multi-Thread Safe, concurrent pushes are safe
38                T * advance(mcs_queue(T) & this, T * elem) __attribute__((artificial));
39                T * advance(mcs_queue(T) & this, T * elem) {
40                        T * expected = elem;
41                        // Check if this is already the last item
42                        if (__atomic_compare_exchange_n(&this.tail, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) return 0p;
43
44                        // If not wait for next item to show-up, filled by push
45                        while (!(elem`next)) Pause();
46
47                        // we need to return if the next link was empty
48                        T * ret = elem`next;
49
50                        // invalidate link to reset to initial state
51                        elem`next = 0p;
52                        return ret;
53                }
54        }
55
56        //------------------------------------------------------------
57        // Queue based on the MCS lock
58        // Extension of the above lock which supports 'blind' pops.
59        // i.e., popping a value from the head without knowing what the head is
60        // has no extra guarantees beyond the mcs_queue
61        struct mpsc_queue {
62                inline mcs_queue(T);
63                T * volatile head;
64        };
65
66        static inline void ?{}(mpsc_queue(T) & this) {
67                ((mcs_queue(T)&)this){};
68                this.head = 0p;
69        }
70
71        static inline forall(| { T * volatile & ?`next ( T * ); })
72        {
73                // Added a new element to the queue
74                // Multi-Thread Safe, Lock-Free
75                T * push(mpsc_queue(T) & this, T * elem) __attribute__((artificial));
76                T * push(mpsc_queue(T) & this, T * elem) {
77                        T * prev = push((mcs_queue(T)&)this, elem);
78                        if (!prev) this.head = elem;
79                        return prev;
80                }
81
82                // Pop an element from the queue
83                // return the element that was removed
84                // next is set to the new head of the queue
85                // NOT Multi-Thread Safe
86                T * pop(mpsc_queue(T) & this, T *& next) __attribute__((artificial));
87                T * pop(mpsc_queue(T) & this, T *& next) {
88                        T * elem = this.head;
89                        // If head is empty just return
90                        if (!elem) return 0p;
91
92                        // If there is already someone in the list, then it's easy
93                        if (elem`next) {
94                                this.head = next = elem`next;
95                                // force memory sync
96                                __atomic_thread_fence(__ATOMIC_SEQ_CST);
97
98                                // invalidate link to reset to initial state
99                                elem`next = 0p;
100                        }
101                        // Otherwise, there might be a race where it only looks but someone is enqueuing
102                        else {
103                                // null out head here, because we linearize with push
104                                // at the CAS in advance and therefore can write to head
105                                // after that point, it could overwrite the write in push
106                                this.head = 0p;
107                                next = advance((mcs_queue(T)&)this, elem);
108
109                                // Only write to the head if there is a next element
110                                // it is the only way we can guarantee we are not overwriting
111                                // a write made in push
112                                if (next) this.head = next;
113                        }
114
115                        // return removed element
116                        return elem;
117                }
118
119                // Same as previous function
120                T * pop(mpsc_queue(T) & this) {
121                        T * _ = 0p;
122                        return pop(this, _);
123                }
124        }
125}
Note: See TracBrowser for help on using the repository browser.