source: libcfa/src/containers/queueLockFree.hfa @ 304de00

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 304de00 was 304de00, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Added queueLockFree which contains two lockfree queue based on MCS.

  • Property mode set to 100644
File size: 3.4 KB
RevLine 
[304de00]1#pragma once
2
3#include <assert.h>
4
5forall( T &) {
6        //------------------------------------------------------------
7        // Queue based on the MCS lock
8        // It is a Multi-Producer/Single-Consumer queue threads pushing
9        // elements must hold on to the elements they push
10        // Not appropriate for an async message queue for example,
11        struct mcs_queue {
12                T * volatile tail;
13        };
14
15        static inline void ?{}(mcs_queue(T) & this) { this.tail = 0p; }
16        static inline bool empty(const mcs_queue(T) & this) { return !this.tail; }
17
18        static inline forall(| { T * volatile & ?`next ( T * ); })
19        {
20                // Adds an element to the list
21                // Multi-Thread Safe, Lock-Free
22                T * push(mcs_queue(T) & this, T & elem) {
23                        /* paranoid */ verify(!(&elem)`next);
24                        // Race to add to the tail
25                        T * prev = __atomic_exchange_n(&this.tail, &elem, __ATOMIC_SEQ_CST);
26                        // If we aren't the first, we need to tell the person before us
27                        // No need to
28                        if (prev) prev`next = &elem;
29                        return prev;
30                }
31
32                // Advances the head of the list, dropping the element given.
33                // Passing an element that is not the head is undefined behavior
34                // NOT Multi-Thread Safe, concurrent pushes are safe
35                T * advance(mcs_queue(T) & this, T & elem) {
36                        T * expected = &elem;
37                        // Check if this is already the last item
38                        if (__atomic_compare_exchange_n(&this.tail, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) return 0p;
39
40                        // If not wait for next item to show-up
41                        // added by push
42                        while (!(&elem)`next) Pause();
43                        return (&elem)`next;
44                }
45        }
46
47        //------------------------------------------------------------
48        // Queue based on the MCS lock
49        // Extension of the above lock which supports 'blind' pops.
50        // i.e., popping a value from the head without knowing what the head is
51        // has no extra guarantees beyond the mcs_queue
52        struct mpsc_queue {
53                inline mcs_queue(T);
54                T * volatile head;
55        };
56
57        static inline void ?{}(mpsc_queue(T) & this) {
58                ((mcs_queue(T)&)this){};
59                this.head = 0p;
60        }
61
62        static inline forall(| { T * volatile & ?`next ( T * ); })
63        {
64                // Added a new element to the queue
65                // Multi-Thread Safe, Lock-Free
66                T * push(mpsc_queue(T) & this, T & elem) {
67                        T * prev = push((mcs_queue(T)&)this, elem);
68                        if (!prev) this.head = &elem;
69                        return prev;
70                }
71
72                // Pop an element from the queue
73                // return the element that was removed
74                // next is set to the new head of the queue
75                // NOT Multi-Thread Safe
76                T * pop(mpsc_queue(T) & this, T *& next) {
77                        T * elem = this.head;
78                        // If head is empty just return
79                        if (!elem) return 0p;
80
81                        // If there is already someone in the list, then it's easy
82                        if (elem`next) {
83                                this.head = next = elem`next;
84                                // force memory sync
85                                __atomic_thread_fence(__ATOMIC_SEQ_CST);
86                        }
87                        // Otherwise, there might be a race where it only looks but someone is enqueuing
88                        else {
89                                // null out head here, because we linearize with push
90                                // at the CAS in advance and therefore can write to head
91                                // after that point, it could overwrite the write in push
92                                this.head = 0p;
93                                next = advance((mcs_queue(T)&)this, (*elem));
94
95                                // Only write to the head if there is a next element
96                                // it is the only way we can guarantee we are not overwriting
97                                // a write made in push
98                                if (next) this.head = next;
99                        }
100
101                        // invalidate link
102                        elem`next = 0p;
103
104                        // return removed element
105                        return elem;
106                }
107
108                // Same as previous function
109                T * pop(mpsc_queue(T) & this) {
110                        T * _ = 0p;
111                        return pop(this, _);
112                }
113        }
114}
Note: See TracBrowser for help on using the repository browser.