// // Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo // // The contents of this file are covered under the licence agreement in the // file "LICENCE" distributed with Cforall. // // ready_queue.cfa -- // // Author : Thierry Delisle // Created On : Mon Nov dd 16:29:18 2019 // Last Modified By : // Last Modified On : // Update Count : // #define __cforall_thread__ #include "bits/defs.hfa" #include "kernel_private.hfa" #define _GNU_SOURCE #include "stdlib.hfa" static const size_t cache_line_size = 64; static inline unsigned __max_processors_fallback() { #ifdef __CFA_MAX_PROCESSORS__ return __CFA_MAX_PROCESSORS__; #else // No overriden function, no environment variable, no define // fall back to a magic number return 128; #endif } __attribute__((weak)) unsigned __max_processors() { const char * max_cores_s = getenv("CFA_MAX_PROCESSORS"); if(!max_cores_s) { __cfaabi_dbg_print_nolock("No CFA_MAX_PROCESSORS in ENV"); return __max_processors_fallback(); } char * endptr = 0p; long int max_cores_l = strtol(max_cores_s, &endptr, 10); if(max_cores_l < 1 || max_cores_l > 65535) { __cfaabi_dbg_print_nolock("CFA_MAX_PROCESSORS out of range : %ld", max_cores_l); return __max_processors_fallback(); } if('\0' != *endptr) { __cfaabi_dbg_print_nolock("CFA_MAX_PROCESSORS not a decimal number : %s", max_cores_s); return __max_processors_fallback(); } return max_cores_l; } //======================================================================= // Cluster wide reader-writer lock //======================================================================= void ?{}(__clusterRWLock_t & this) { this.max = __max_processors(); this.alloc = 0; this.ready = 0; this.lock = false; this.data = alloc(this.max); /*paranoid*/ verify( 0 == (((uintptr_t)(this.data )) % 64) ); /*paranoid*/ verify( 0 == (((uintptr_t)(this.data + 1)) % 64) ); /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc)); /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready)); } void ^?{}(__clusterRWLock_t & this) { free(this.data); } void ?{}( __processor_id & this, struct processor * proc ) { this.handle = proc; this.lock = false; } //======================================================================= // Lock-Free registering/unregistering of threads unsigned doregister( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) { // Step - 1 : check if there is already space in the data uint_fast32_t s = ready; // Check among all the ready for(uint_fast32_t i = 0; i < s; i++) { processor * null = 0p; // Re-write every loop since compare thrashes it if( __atomic_load_n(&data[i].handle, (int)__ATOMIC_RELAXED) == null && __atomic_compare_exchange_n( &data[i].handle, &null, proc, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { /*paranoid*/ verify(i < ready); /*paranoid*/ verify(__alignof__(data[i]) == cache_line_size); /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0); return i; } } if(max <= alloc) abort("Trying to create more than %ud processors", cltr->ready_lock.max); // Step - 2 : F&A to get a new spot in the array. uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST); if(max <= n) abort("Trying to create more than %ud processors", cltr->ready_lock.max); // Step - 3 : Mark space as used and then publish it. __processor_id * storage = (__processor_id *)&data[n]; (*storage){ proc }; while(true) { unsigned copy = n; if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n && __atomic_compare_exchange_n(&ready, ©, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) break; asm volatile("pause"); } // Return new spot. /*paranoid*/ verify(n < ready); /*paranoid*/ verify(__alignof__(data[n]) == cache_line_size); /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0); return n; } void unregister( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) { unsigned id = proc->id; /*paranoid*/ verify(id < ready); /*paranoid*/ verify(proc == __atomic_load_n(&data[id].handle, __ATOMIC_RELAXED)); __atomic_store_n(&data[id].handle, 0p, __ATOMIC_RELEASE); } //----------------------------------------------------------------------- // Writer side : acquire when changing the ready queue, e.g. adding more // queues or removing them. uint_fast32_t ready_mutate_lock( struct cluster & cltr ) with(cltr.ready_lock) { // Step 1 : lock global lock // It is needed to avoid processors that register mid Critical-Section // to simply lock their own lock and enter. __atomic_acquire( &lock ); // Step 2 : lock per-proc lock // Processors that are currently being registered aren't counted // but can't be in read_lock or in the critical section. // All other processors are counted uint_fast32_t s = ready; for(uint_fast32_t i = 0; i < s; i++) { __atomic_acquire( &data[i].lock ); } return s; } void ready_mutate_unlock( struct cluster & cltr, uint_fast32_t last_s ) with(cltr.ready_lock) { // Step 1 : release local locks // This must be done while the global lock is held to avoid // threads that where created mid critical section // to race to lock their local locks and have the writer // immidiately unlock them // Alternative solution : return s in write_lock and pass it to write_unlock for(uint_fast32_t i = 0; i < last_s; i++) { verify(data[i].lock); __atomic_store_n(&data[i].lock, (bool)false, __ATOMIC_RELEASE); } // Step 2 : release global lock /*paranoid*/ assert(true == lock); __atomic_store_n(&lock, (bool)false, __ATOMIC_RELEASE); } //======================================================================= // Intrusive Queue used by ready queue //======================================================================= static const size_t fields_offset = offsetof( thread_desc, next ); // Get the head pointer (one before the first element) from the anchor static inline thread_desc * head(const __intrusive_ready_queue_t & this) { thread_desc * rhead = (thread_desc *)( (uintptr_t)( &this.before ) - fields_offset ); /* paranoid */ verify(rhead); return rhead; } // Get the tail pointer (one after the last element) from the anchor static inline thread_desc * tail(const __intrusive_ready_queue_t & this) { thread_desc * rtail = (thread_desc *)( (uintptr_t)( &this.after ) - fields_offset ); /* paranoid */ verify(rtail); return rtail; } // Ctor void ?{}( __intrusive_ready_queue_t & this ) { this.before.prev = 0p; this.before.next = tail(this); this.after .prev = head(this); this.after .next = 0p; // We add a boat-load of assertions here because the anchor code is very fragile /* paranoid */ verify(((uintptr_t)( head(this) ) + fields_offset) == (uintptr_t)(&this.before)); /* paranoid */ verify(((uintptr_t)( tail(this) ) + fields_offset) == (uintptr_t)(&this.after )); /* paranoid */ verify(head(this)->prev == 0p ); /* paranoid */ verify(head(this)->next == tail(this) ); /* paranoid */ verify(tail(this)->next == 0p ); /* paranoid */ verify(tail(this)->prev == head(this) ); /* paranoid */ verify(&head(this)->prev == &this.before.prev ); /* paranoid */ verify(&head(this)->next == &this.before.next ); /* paranoid */ verify(&tail(this)->prev == &this.after .prev ); /* paranoid */ verify(&tail(this)->next == &this.after .next ); /* paranoid */ verify(sizeof(__intrusive_ready_queue_t) == 128); /* paranoid */ verify(sizeof(this) == 128); /* paranoid */ verify(__alignof__(__intrusive_ready_queue_t) == 128); /* paranoid */ verify(__alignof__(this) == 128); /* paranoid */ verifyf(((intptr_t)(&this) % 128) == 0, "Expected address to be aligned %p %% 128 == %zd", &this, ((intptr_t)(&this) % 128)); } // Dtor is trivial void ^?{}( __intrusive_ready_queue_t & this ) { // Make sure the list is empty /* paranoid */ verify(head(this)->prev == 0p ); /* paranoid */ verify(head(this)->next == tail(this) ); /* paranoid */ verify(tail(this)->next == 0p ); /* paranoid */ verify(tail(this)->prev == head(this) ); } bool push(__intrusive_ready_queue_t & this, thread_desc * node) { verify(this.lock); verify(node->ts != 0); verify(node->next == 0p); verify(node->prev == 0p); // Get the relevant nodes locally thread_desc * tail = tail(this); thread_desc * prev = tail->prev; // Do the push node->next = tail; node->prev = prev; prev->next = node; tail->prev = node; // Update stats #ifndef __CFA_NO_SCHED_STATS__ this.stat.diff++; this.stat.push++; #endif // Check if the queue used to be empty if(this.before.ts == 0l) { this.before.ts = node->ts; verify(node->prev == head(this)); return true; } return false; } [thread_desc *, bool] pop(__intrusive_ready_queue_t & this) { verify(this.lock); thread_desc * head = head(this); thread_desc * tail = tail(this); thread_desc * node = head->next; thread_desc * next = node->next; if(node == tail) return [0p, false]; /* paranoid */ verify(node); head->next = next; next->prev = head; #ifndef __CFA_NO_SCHED_STATS__ this.stat.diff--; this.stat.pop ++; #endif if(next == tail) { this.before.ts = 0ul; node->[next, prev] = 0p; return [node, true]; } else { verify(next->ts != 0); this.before.ts = next->ts; verify(this.before.ts != 0); node->[next, prev] = 0p; return [node, false]; } } static inline unsigned long long ts(__intrusive_ready_queue_t & this) { return this.before.ts; }