source: doc/theses/thierry_delisle_PhD/code/readyQ_proto/links2.hpp @ 747d0fa

ADTast-experimentalpthread-emulation
Last change on this file since 747d0fa was 780a614, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Added comparison of the mpsc queue to the protoptype.

  • Property mode set to 100644
File size: 3.4 KB
Line 
1#pragma once
2
3#include <assert.h>
4
5#include "utils.hpp"
6
7//------------------------------------------------------------
8// Queue based on the MCS lock
9// It is a Multi-Producer/Single-Consumer queue threads pushing
10// elements must hold on to the elements they push
11// Not appropriate for an async message queue for example,
12template<typename node_t>
13class mcs_queue {
14        node_t * volatile tail;
15
16public:
17        mcs_queue(): tail(nullptr) {}
18
19        inline bool empty() const { return !tail; }
20
21        node_t * push( node_t * elem ) {
22                /* paranoid */ assert(!elem->_links.next);
23                // Race to add to the tail
24                node_t * prev = __atomic_exchange_n(&tail, elem, __ATOMIC_SEQ_CST);
25                // If we aren't the first, we need to tell the person before us
26                // No need to
27                if (prev) prev->_links.next = elem;
28                return prev;
29        }
30
31        // Advances the head of the list, dropping the element given.
32        // Passing an element that is not the head is undefined behavior
33        // NOT Multi-Thread Safe, concurrent pushes are safe
34        node_t * advance(node_t * elem) {
35                node_t * expected = elem;
36                // Check if this is already the last item
37                if (__atomic_compare_exchange_n(&tail, &expected, nullptr, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) return nullptr;
38
39                // If not wait for next item to show-up, filled by push
40                while (!elem->_links.next) Pause();
41
42                // we need to return if the next link was empty
43                node_t * ret = elem->_links.next;
44
45                // invalidate link to reset to initial state
46                elem->_links.next = nullptr;
47                return ret;
48        }
49};
50
51//------------------------------------------------------------
52// Queue based on the MCS lock
53// Extension of the above lock which supports 'blind' pops.
54// i.e., popping a value from the head without knowing what the head is
55// has no extra guarantees beyond the mcs_queue
56template<typename node_t>
57class mpsc_queue : private mcs_queue<node_t> {
58        node_t * volatile _head;
59public:
60        mpsc_queue(): mcs_queue<node_t>(), _head(nullptr) {}
61
62        inline bool empty() const { return mcs_queue<node_t>::empty(); }
63
64        node_t * head() const { return _head; }
65
66        // Added a new element to the queue
67        // Multi-Thread Safe, Lock-Free
68        inline node_t * push(node_t * elem) {
69                node_t * prev = mcs_queue<node_t>::push(elem);
70                if (!prev) _head = elem;
71                return prev;
72        }
73
74        // Pop an element from the queue
75        // return the element that was removed
76        // next is set to the new head of the queue
77        // NOT Multi-Thread Safe
78        inline node_t * pop(node_t *& next) {
79                node_t * elem = _head;
80                // If head is empty just return
81                if (!elem) return nullptr;
82
83                // If there is already someone in the list, then it's easy
84                if (elem->_links.next) {
85                        _head = next = elem->_links.next;
86                        // force memory sync
87                        __atomic_thread_fence(__ATOMIC_SEQ_CST);
88
89                        // invalidate link to reset to initial state
90                        elem->_links.next = nullptr;
91                }
92                // Otherwise, there might be a race where it only looks but someone is enqueuing
93                else {
94                        // null out head here, because we linearize with push
95                        // at the CAS in advance and therefore can write to head
96                        // after that point, it could overwrite the write in push
97                        _head = nullptr;
98                        next = mcs_queue<node_t>::advance(elem);
99
100                        // Only write to the head if there is a next element
101                        // it is the only way we can guarantee we are not overwriting
102                        // a write made in push
103                        if (next) _head = next;
104                }
105
106                // return removed element
107                return elem;
108        }
109
110        // Same as previous function
111        inline node_t * pop() {
112                node_t * _ = nullptr;
113                return pop(_);
114        }
115};
Note: See TracBrowser for help on using the repository browser.