source: doc/theses/thierry_delisle_PhD/code/readyQ_proto/processor_list.hpp @ f9f3775

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since f9f3775 was f9f3775, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Moved phd code for the readQ prototype to it's own folder

  • Property mode set to 100644
File size: 6.2 KB
Line 
1#include <cassert>
2
3#include <atomic>
4#include <new>
5#include <type_traits>
6
7struct processor;
8
9struct __attribute__((aligned(64))) processor_id {
10        std::atomic<processor *> handle;
11        std::atomic<bool> lock;
12
13        processor_id() = default;
14        processor_id(processor * proc) : handle(proc), lock() {
15                /*paranoid*/ assert(std::atomic_is_lock_free(&lock));
16        }
17};
18
19extern unsigned num();
20
21#define ERROR throw 1
22
23class processor_list {
24private:
25
26        static const constexpr std::size_t cache_line_size = 64;
27
28        static_assert(sizeof (processor_id) <= cache_line_size, "ERROR: Instances must fit in one cache line" );
29        static_assert(alignof(processor_id) == cache_line_size, "ERROR: Instances must aligned to one cache line" );
30
31        const unsigned max;     // total cachelines allocated
32        std::atomic_uint alloc; // cachelines currently in use
33        std::atomic_uint ready; // cachelines ready to iterate over (!= to alloc when thread is in second half of doregister)
34        std::atomic<bool> lock; // writerlock
35        processor_id * data;    // data pointer
36
37private:
38        inline void acquire(std::atomic<bool> & ll) {
39                while( __builtin_expect(ll.exchange(true),false) ) {
40                        while(ll.load(std::memory_order_relaxed))
41                                asm volatile("pause");
42                }
43                /* paranoid */ assert(ll);
44        }
45
46public:
47        processor_list()
48                : max(num())
49                , alloc(0)
50                , ready(0)
51                , lock{false}
52                , data( new processor_id[max] )
53        {
54                /*paranoid*/ assert(num() == max);
55                /*paranoid*/ assert(std::atomic_is_lock_free(&alloc));
56                /*paranoid*/ assert(std::atomic_is_lock_free(&ready));
57        }
58
59        ~processor_list() {
60                delete[] data;
61        }
62
63        //=======================================================================
64        // Lock-Free registering/unregistering of threads
65        unsigned doregister(processor * proc) {
66                // Step - 1 : check if there is already space in the data
67                uint_fast32_t s = ready;
68
69                // Check among all the ready
70                for(uint_fast32_t i = 0; i < s; i++) {
71                        processor * null = nullptr; // Re-write every loop since compare thrashes it
72                        if( data[i].handle.load(std::memory_order_relaxed) == null
73                         && data[i].handle.compare_exchange_strong(null, proc)) {
74                                /*paranoid*/ assert(i < ready);
75                                /*paranoid*/ assert(alignof(decltype(data[i])) == cache_line_size);
76                                /*paranoid*/ assert((uintptr_t(&data[i]) % cache_line_size) == 0);
77                                return i;
78                        }
79                }
80
81                if(max <= alloc) ERROR;
82
83                // Step - 2 : F&A to get a new spot in the array.
84                uint_fast32_t n = alloc++;
85                if(max <= n) ERROR;
86
87                // Step - 3 : Mark space as used and then publish it.
88                void * storage = &data[n];
89                new (storage) processor_id( proc );
90                while(true) {
91                        unsigned copy = n;
92                        if( ready.load(std::memory_order_relaxed) == n
93                         && ready.compare_exchange_weak(copy, n + 1) )
94                                break;
95                        asm volatile("pause");
96                }
97
98                // Return new spot.
99                /*paranoid*/ assert(n < ready);
100                /*paranoid*/ assert(alignof(decltype(data[n])) == cache_line_size);
101                /*paranoid*/ assert((uintptr_t(&data[n]) % cache_line_size) == 0);
102                return n;
103        }
104
105        processor * unregister(unsigned iproc) {
106                /*paranoid*/ assert(iproc < ready);
107                auto ret = data[iproc].handle.load(std::memory_order_relaxed);
108                data[iproc].handle = nullptr;
109                return ret;
110        }
111
112        // Reset all registration
113        // Unsafe in most cases, use for testing only.
114        void reset() {
115                alloc = 0;
116                ready = 0;
117        }
118
119        processor * get(unsigned iproc) {
120                return data[iproc].handle.load(std::memory_order_relaxed);
121        }
122
123        //=======================================================================
124        // Reader-writer lock implementation
125        // Concurrent with doregister/unregister,
126        //    i.e., threads can be added at any point during or between the entry/exit
127
128        //-----------------------------------------------------------------------
129        // Reader side
130        void read_lock(unsigned iproc) {
131                /*paranoid*/ assert(iproc < ready);
132
133                // Step 1 : make sure no writer are in the middle of the critical section
134                while(lock.load(std::memory_order_relaxed))
135                        asm volatile("pause");
136
137                // Fence needed because we don't want to start trying to acquire the lock
138                // before we read a false.
139                // Not needed on x86
140                // std::atomic_thread_fence(std::memory_order_seq_cst);
141
142                // Step 2 : acquire our local lock
143                acquire( data[iproc].lock );
144                /*paranoid*/ assert(data[iproc].lock);
145        }
146
147        void read_unlock(unsigned iproc) {
148                /*paranoid*/ assert(iproc < ready);
149                /*paranoid*/ assert(data[iproc].lock);
150                data[iproc].lock.store(false, std::memory_order_release);
151        }
152
153        //-----------------------------------------------------------------------
154        // Writer side
155        uint_fast32_t write_lock() {
156                // Step 1 : lock global lock
157                // It is needed to avoid processors that register mid Critical-Section
158                //   to simply lock their own lock and enter.
159                acquire(lock);
160
161                // Step 2 : lock per-proc lock
162                // Processors that are currently being registered aren't counted
163                //   but can't be in read_lock or in the critical section.
164                // All other processors are counted
165                uint_fast32_t s = ready;
166                for(uint_fast32_t i = 0; i < s; i++) {
167                        acquire( data[i].lock );
168                }
169
170                return s;
171        }
172
173        void write_unlock(uint_fast32_t last_s) {
174                // Step 1 : release local locks
175                // This must be done while the global lock is held to avoid
176                //   threads that where created mid critical section
177                //   to race to lock their local locks and have the writer
178                //   immidiately unlock them
179                // Alternative solution : return s in write_lock and pass it to write_unlock
180                for(uint_fast32_t i = 0; i < last_s; i++) {
181                        assert(data[i].lock);
182                        data[i].lock.store(false, std::memory_order_release);
183                }
184
185                // Step 2 : release global lock
186                /*paranoid*/ assert(true == lock);
187                lock.store(false, std::memory_order_release);
188        }
189
190        //-----------------------------------------------------------------------
191        // Checking support
192        uint_fast32_t epoch_check() {
193                // Step 1 : lock global lock
194                // It is needed to avoid processors that register mid Critical-Section
195                //   to simply lock their own lock and enter.
196                while(lock.load(std::memory_order_relaxed))
197                        asm volatile("pause");
198
199                // Step 2 : lock per-proc lock
200                // Processors that are currently being registered aren't counted
201                //   but can't be in read_lock or in the critical section.
202                // All other processors are counted
203                uint_fast32_t s = ready;
204                for(uint_fast32_t i = 0; i < s; i++) {
205                        while(data[i].lock.load(std::memory_order_relaxed))
206                                asm volatile("pause");
207                }
208
209                return s;
210        }
211
212public:
213};
214
215#undef ERROR
Note: See TracBrowser for help on using the repository browser.