source: libcfa/src/containers/queueLockFree.hfa @ 94fa946

ADTast-experimentalenumpthread-emulationqualifiedEnum
Last change on this file since 94fa946 was 2d95a2d, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Small fixes to lock free queues.

  • Property mode set to 100644
File size: 3.8 KB
Line 
1#pragma once
2
3#include <assert.h>
4
5forall( T &) {
6        //------------------------------------------------------------
7        // Queue based on the MCS lock
8        // It is a Multi-Producer/Single-Consumer queue threads pushing
9        // elements must hold on to the elements they push
10        // Not appropriate for an async message queue for example,
11        struct mcs_queue {
12                T * volatile tail;
13        };
14
15        static inline void ?{}(mcs_queue(T) & this) { this.tail = 0p; }
16        static inline bool empty(const mcs_queue(T) & this) { return !this.tail; }
17
18        static inline forall(| { T * volatile & ?`next ( T * ); })
19        {
20                // Adds an element to the list
21                // Multi-Thread Safe, Lock-Free
22                T * push(mcs_queue(T) & this, T * elem) __attribute__((artificial));
23                T * push(mcs_queue(T) & this, T * elem) {
24                        /* paranoid */ verify(!(elem`next));
25                        // Race to add to the tail
26                        T * prev = __atomic_exchange_n(&this.tail, elem, __ATOMIC_SEQ_CST);
27                        // If we aren't the first, we need to tell the person before us
28                        // No need to
29                        if (prev) prev`next = elem;
30                        return prev;
31                }
32
33                // Advances the head of the list, dropping the element given.
34                // Passing an element that is not the head is undefined behavior
35                // NOT Multi-Thread Safe, concurrent pushes are safe
36                T * advance(mcs_queue(T) & this, T * elem) __attribute__((artificial));
37                T * advance(mcs_queue(T) & this, T * elem) {
38                        T * expected = elem;
39                        // Check if this is already the last item
40                        if (__atomic_compare_exchange_n(&this.tail, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) return 0p;
41
42                        // If not wait for next item to show-up, filled by push
43                        while (!(elem`next)) Pause();
44
45                        // we need to return if the next link was empty
46                        T * ret = elem`next;
47
48                        // invalidate link to reset to initial state
49                        elem`next = 0p;
50                        return ret;
51                }
52        }
53
54        //------------------------------------------------------------
55        // Queue based on the MCS lock
56        // Extension of the above lock which supports 'blind' pops.
57        // i.e., popping a value from the head without knowing what the head is
58        // has no extra guarantees beyond the mcs_queue
59        struct mpsc_queue {
60                inline mcs_queue(T);
61                T * volatile head;
62        };
63
64        static inline void ?{}(mpsc_queue(T) & this) {
65                ((mcs_queue(T)&)this){};
66                this.head = 0p;
67        }
68
69        static inline forall(| { T * volatile & ?`next ( T * ); })
70        {
71                // Added a new element to the queue
72                // Multi-Thread Safe, Lock-Free
73                T * push(mpsc_queue(T) & this, T * elem) __attribute__((artificial));
74                T * push(mpsc_queue(T) & this, T * elem) {
75                        T * prev = push((mcs_queue(T)&)this, elem);
76                        if (!prev) this.head = elem;
77                        return prev;
78                }
79
80                // Pop an element from the queue
81                // return the element that was removed
82                // next is set to the new head of the queue
83                // NOT Multi-Thread Safe
84                T * pop(mpsc_queue(T) & this, T *& next) __attribute__((artificial));
85                T * pop(mpsc_queue(T) & this, T *& next) {
86                        T * elem = this.head;
87                        // If head is empty just return
88                        if (!elem) return 0p;
89
90                        // If there is already someone in the list, then it's easy
91                        if (elem`next) {
92                                this.head = next = elem`next;
93                                // force memory sync
94                                __atomic_thread_fence(__ATOMIC_SEQ_CST);
95
96                                // invalidate link to reset to initial state
97                                elem`next = 0p;
98                        }
99                        // Otherwise, there might be a race where it only looks but someone is enqueuing
100                        else {
101                                // null out head here, because we linearize with push
102                                // at the CAS in advance and therefore can write to head
103                                // after that point, it could overwrite the write in push
104                                this.head = 0p;
105                                next = advance((mcs_queue(T)&)this, elem);
106
107                                // Only write to the head if there is a next element
108                                // it is the only way we can guarantee we are not overwriting
109                                // a write made in push
110                                if (next) this.head = next;
111                        }
112
113                        // return removed element
114                        return elem;
115                }
116
117                // Same as previous function
118                T * pop(mpsc_queue(T) & this) {
119                        T * _ = 0p;
120                        return pop(this, _);
121                }
122        }
123}
Note: See TracBrowser for help on using the repository browser.