#include #include #include #include struct processor; struct __attribute__((aligned(64))) processor_id { std::atomic handle; std::atomic lock; processor_id() = default; processor_id(processor * proc) : handle(proc), lock() { /*paranoid*/ assert(std::atomic_is_lock_free(&lock)); } }; extern unsigned num(); #define ERROR throw 1 class processor_list { private: static const constexpr std::size_t cache_line_size = 64; static_assert(sizeof (processor_id) <= cache_line_size, "ERROR: Instances must fit in one cache line" ); static_assert(alignof(processor_id) == cache_line_size, "ERROR: Instances must aligned to one cache line" ); const unsigned max; // total cachelines allocated std::atomic_uint alloc; // cachelines currently in use std::atomic_uint ready; // cachelines ready to iterate over (!= to alloc when thread is in second half of doregister) std::atomic lock; // writerlock processor_id * data; // data pointer private: inline void acquire(std::atomic & ll) { while( __builtin_expect(ll.exchange(true),false) ) { while(ll.load(std::memory_order_relaxed)) Pause(); } /* paranoid */ assert(ll); } public: processor_list() : max(num()) , alloc(0) , ready(0) , lock{false} , data( new processor_id[max] ) { /*paranoid*/ assert(num() == max); /*paranoid*/ assert(std::atomic_is_lock_free(&alloc)); /*paranoid*/ assert(std::atomic_is_lock_free(&ready)); } ~processor_list() { delete[] data; } //======================================================================= // Lock-Free registering/unregistering of threads unsigned doregister(processor * proc) { // Step - 1 : check if there is already space in the data uint_fast32_t s = ready; // Check among all the ready for(uint_fast32_t i = 0; i < s; i++) { processor * null = nullptr; // Re-write every loop since compare thrashes it if( data[i].handle.load(std::memory_order_relaxed) == null && data[i].handle.compare_exchange_strong(null, proc)) { /*paranoid*/ assert(i < ready); /*paranoid*/ assert(alignof(decltype(data[i])) == cache_line_size); /*paranoid*/ assert((uintptr_t(&data[i]) % cache_line_size) == 0); return i; } } if(max <= alloc) ERROR; // Step - 2 : F&A to get a new spot in the array. uint_fast32_t n = alloc++; if(max <= n) ERROR; // Step - 3 : Mark space as used and then publish it. void * storage = &data[n]; new (storage) processor_id( proc ); while(true) { unsigned copy = n; if( ready.load(std::memory_order_relaxed) == n && ready.compare_exchange_weak(copy, n + 1) ) break; Pause(); } // Return new spot. /*paranoid*/ assert(n < ready); /*paranoid*/ assert(alignof(decltype(data[n])) == cache_line_size); /*paranoid*/ assert((uintptr_t(&data[n]) % cache_line_size) == 0); return n; } processor * unregister(unsigned iproc) { /*paranoid*/ assert(iproc < ready); auto ret = data[iproc].handle.load(std::memory_order_relaxed); data[iproc].handle = nullptr; return ret; } // Reset all registration // Unsafe in most cases, use for testing only. void reset() { alloc = 0; ready = 0; } processor * get(unsigned iproc) { return data[iproc].handle.load(std::memory_order_relaxed); } //======================================================================= // Reader-writer lock implementation // Concurrent with doregister/unregister, // i.e., threads can be added at any point during or between the entry/exit //----------------------------------------------------------------------- // Reader side void read_lock(unsigned iproc) { /*paranoid*/ assert(iproc < ready); // Step 1 : make sure no writer are in the middle of the critical section while(lock.load(std::memory_order_relaxed)) Pause(); // Fence needed because we don't want to start trying to acquire the lock // before we read a false. // Not needed on x86 // std::atomic_thread_fence(std::memory_order_seq_cst); // Step 2 : acquire our local lock acquire( data[iproc].lock ); /*paranoid*/ assert(data[iproc].lock); } void read_unlock(unsigned iproc) { /*paranoid*/ assert(iproc < ready); /*paranoid*/ assert(data[iproc].lock); data[iproc].lock.store(false, std::memory_order_release); } //----------------------------------------------------------------------- // Writer side uint_fast32_t write_lock() { // Step 1 : lock global lock // It is needed to avoid processors that register mid Critical-Section // to simply lock their own lock and enter. acquire(lock); // Step 2 : lock per-proc lock // Processors that are currently being registered aren't counted // but can't be in read_lock or in the critical section. // All other processors are counted uint_fast32_t s = ready; for(uint_fast32_t i = 0; i < s; i++) { acquire( data[i].lock ); } return s; } void write_unlock(uint_fast32_t last_s) { // Step 1 : release local locks // This must be done while the global lock is held to avoid // threads that where created mid critical section // to race to lock their local locks and have the writer // immidiately unlock them // Alternative solution : return s in write_lock and pass it to write_unlock for(uint_fast32_t i = 0; i < last_s; i++) { assert(data[i].lock); data[i].lock.store(false, std::memory_order_release); } // Step 2 : release global lock /*paranoid*/ assert(true == lock); lock.store(false, std::memory_order_release); } //----------------------------------------------------------------------- // Checking support uint_fast32_t epoch_check() { // Step 1 : lock global lock // It is needed to avoid processors that register mid Critical-Section // to simply lock their own lock and enter. while(lock.load(std::memory_order_relaxed)) Pause(); // Step 2 : lock per-proc lock // Processors that are currently being registered aren't counted // but can't be in read_lock or in the critical section. // All other processors are counted uint_fast32_t s = ready; for(uint_fast32_t i = 0; i < s; i++) { while(data[i].lock.load(std::memory_order_relaxed)) Pause(); } return s; } public: }; #undef ERROR