// // Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo // // The contents of this file are covered under the licence agreement in the // file "LICENCE" distributed with Cforall. // // ready_queue.cfa -- // // Author : Thierry Delisle // Created On : Mon Nov dd 16:29:18 2019 // Last Modified By : // Last Modified On : // Update Count : // #define __cforall_thread__ // #define __CFA_DEBUG_PRINT_READY_QUEUE__ #include "bits/defs.hfa" #include "kernel_private.hfa" #define _GNU_SOURCE #include "stdlib.hfa" static const size_t cache_line_size = 64; // No overriden function, no environment variable, no define // fall back to a magic number #ifndef __CFA_MAX_PROCESSORS__ #define __CFA_MAX_PROCESSORS__ 128 #endif // returns the maximum number of processors the RWLock support __attribute__((weak)) unsigned __max_processors() { const char * max_cores_s = getenv("CFA_MAX_PROCESSORS"); if(!max_cores_s) { __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n"); return __CFA_MAX_PROCESSORS__; } char * endptr = 0p; long int max_cores_l = strtol(max_cores_s, &endptr, 10); if(max_cores_l < 1 || max_cores_l > 65535) { __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l); return __CFA_MAX_PROCESSORS__; } if('\0' != *endptr) { __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s); return __CFA_MAX_PROCESSORS__; } return max_cores_l; } // Picks a random 1 bit in 'mask' according to random number 'rnum'. static inline unsigned rand_bit(unsigned rnum, __cfa_readyQ_mask_t mask) { #if defined( __i386 ) static_assert(sizeof(mask) == 4); unsigned bit = mask ? rnum % __builtin_popcount(mask) : 0; #if !defined(__BMI2__) #error rand_bit not implemented for non __BMI2__ i386 #else uint32_t picked = _pdep_u32(1ul << bit, mask); return picked ? __builtin_ctz(picked) : 0; #endif #elif defined( __x86_64 ) static_assert(sizeof(mask) == 8); unsigned bit = mask ? rnum % __builtin_popcountl(mask) : 0; #if !defined(__BMI2__) uint64_t v = mask; // Input value to find position with rank r. unsigned int r = bit + 1;// Input: bit's desired rank [1-64]. unsigned int s; // Output: Resulting position of bit with rank r [1-64] uint64_t a, b, c, d; // Intermediate temporaries for bit count. unsigned int t; // Bit count temporary. // Do a normal parallel bit count for a 64-bit integer, // but store all intermediate steps. a = v - ((v >> 1) & ~0UL/3); b = (a & ~0UL/5) + ((a >> 2) & ~0UL/5); c = (b + (b >> 4)) & ~0UL/0x11; d = (c + (c >> 8)) & ~0UL/0x101; t = (d >> 32) + (d >> 48); // Now do branchless select! s = 64; s -= ((t - r) & 256) >> 3; r -= (t & ((t - r) >> 8)); t = (d >> (s - 16)) & 0xff; s -= ((t - r) & 256) >> 4; r -= (t & ((t - r) >> 8)); t = (c >> (s - 8)) & 0xf; s -= ((t - r) & 256) >> 5; r -= (t & ((t - r) >> 8)); t = (b >> (s - 4)) & 0x7; s -= ((t - r) & 256) >> 6; r -= (t & ((t - r) >> 8)); t = (a >> (s - 2)) & 0x3; s -= ((t - r) & 256) >> 7; r -= (t & ((t - r) >> 8)); t = (v >> (s - 1)) & 0x1; s -= ((t - r) & 256) >> 8; return s - 1; #else uint64_t picked = _pdep_u64(1ul << bit, mask); return picked ? __builtin_ctzl(picked) : 0; #endif #elif defined( __ARM_ARCH ) #error rand_bit not implemented for arm #else #error uknown hardware architecture #endif } //----------------------------------------------------------------------------- // Helpers used by extract // (_mask_bitsidx() & X) returns a bit index valid for a __cfa_readyQ_mask_t, where X is any integer static inline __cfa_readyQ_mask_t _mask_bitsidx () __attribute__ ((const)) { return (8 * sizeof(__cfa_readyQ_mask_t)) - 1; } // (X >> _mask_shiftidx()) retuns an index into an array of __cfa_readyQ_mask_t static inline __cfa_readyQ_mask_t _mask_shiftidx() __attribute__ ((const)) { return (8 * sizeof(__cfa_readyQ_mask_t)) - __builtin_clzl(_mask_bitsidx()); } // Assuming a large bit mask represented as an array of __cfa_readyQ_mask_t // Given an index into the large mask, returns the bit index and which __cfa_readyQ_mask_t index in the array static inline [__cfa_readyQ_mask_t, __cfa_readyQ_mask_t] extract(__cfa_readyQ_mask_t idx) { __cfa_readyQ_mask_t word = idx >> _mask_shiftidx(); __cfa_readyQ_mask_t bit = idx & _mask_bitsidx(); return [bit, word]; } //======================================================================= // Cluster wide reader-writer lock //======================================================================= void ?{}(__clusterRWLock_t & this) { this.max = __max_processors(); this.alloc = 0; this.ready = 0; this.lock = false; this.data = alloc(this.max); /*paranoid*/ verify( 0 == (((uintptr_t)(this.data )) % 64) ); /*paranoid*/ verify( 0 == (((uintptr_t)(this.data + 1)) % 64) ); /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc)); /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready)); } void ^?{}(__clusterRWLock_t & this) { free(this.data); } void ?{}( __processor_id & this, struct processor * proc ) { this.handle = proc; this.lock = false; } //======================================================================= // Lock-Free registering/unregistering of threads unsigned doregister2( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) { __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p with cluster %p\n", proc, cltr); // Step - 1 : check if there is already space in the data uint_fast32_t s = ready; // Check among all the ready for(uint_fast32_t i = 0; i < s; i++) { processor * null = 0p; // Re-write every loop since compare thrashes it if( __atomic_load_n(&data[i].handle, (int)__ATOMIC_RELAXED) == null && __atomic_compare_exchange_n( &data[i].handle, &null, proc, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { /*paranoid*/ verify(i < ready); /*paranoid*/ verify(__alignof__(data[i]) == cache_line_size); /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0); return i; } } if(max <= alloc) abort("Trying to create more than %ud processors", cltr->ready_lock.max); // Step - 2 : F&A to get a new spot in the array. uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST); if(max <= n) abort("Trying to create more than %ud processors", cltr->ready_lock.max); // Step - 3 : Mark space as used and then publish it. __processor_id * storage = (__processor_id *)&data[n]; (*storage){ proc }; while(true) { unsigned copy = n; if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n && __atomic_compare_exchange_n(&ready, ©, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) break; asm volatile("pause"); } __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p done, id %lu\n", proc, n); // Return new spot. /*paranoid*/ verify(n < ready); /*paranoid*/ verify(__alignof__(data[n]) == cache_line_size); /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0); return n; } void unregister2( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) { unsigned id = proc->id; /*paranoid*/ verify(id < ready); /*paranoid*/ verify(proc == __atomic_load_n(&data[id].handle, __ATOMIC_RELAXED)); __atomic_store_n(&data[id].handle, 0p, __ATOMIC_RELEASE); __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc); } //----------------------------------------------------------------------- // Writer side : acquire when changing the ready queue, e.g. adding more // queues or removing them. uint_fast32_t ready_mutate_lock( struct cluster & cltr ) with(cltr.ready_lock) { // Step 1 : lock global lock // It is needed to avoid processors that register mid Critical-Section // to simply lock their own lock and enter. __atomic_acquire( &lock ); // Step 2 : lock per-proc lock // Processors that are currently being registered aren't counted // but can't be in read_lock or in the critical section. // All other processors are counted uint_fast32_t s = ready; for(uint_fast32_t i = 0; i < s; i++) { __atomic_acquire( &data[i].lock ); } return s; } void ready_mutate_unlock( struct cluster & cltr, uint_fast32_t last_s ) with(cltr.ready_lock) { // Step 1 : release local locks // This must be done while the global lock is held to avoid // threads that where created mid critical section // to race to lock their local locks and have the writer // immidiately unlock them // Alternative solution : return s in write_lock and pass it to write_unlock for(uint_fast32_t i = 0; i < last_s; i++) { verify(data[i].lock); __atomic_store_n(&data[i].lock, (bool)false, __ATOMIC_RELEASE); } // Step 2 : release global lock /*paranoid*/ assert(true == lock); __atomic_store_n(&lock, (bool)false, __ATOMIC_RELEASE); } //======================================================================= // Intrusive Queue used by ready queue //======================================================================= // Get the head pointer (one before the first element) from the anchor static inline $thread * head(const __intrusive_lane_t & this) { $thread * rhead = ($thread *)( (uintptr_t)( &this.before ) - offsetof( $thread, link ) ); /* paranoid */ verify(rhead); return rhead; } // Get the tail pointer (one after the last element) from the anchor static inline $thread * tail(const __intrusive_lane_t & this) { $thread * rtail = ($thread *)( (uintptr_t)( &this.after ) - offsetof( $thread, link ) ); /* paranoid */ verify(rtail); return rtail; } // Ctor void ?{}( __intrusive_lane_t & this ) { this.lock = false; #if defined(__CFA_WITH_VERIFY__) this.last_id = -1u; this.count = 0u; #endif this.before.link.prev = 0p; this.before.link.next = tail(this); this.before.link.ts = 0; this.after .link.prev = head(this); this.after .link.next = 0p; this.after .link.ts = 0; #if !defined(__CFA_NO_SCHED_STATS__) this.stat.diff = 0; this.stat.push = 0; this.stat.pop = 0; #endif // We add a boat-load of assertions here because the anchor code is very fragile /* paranoid */ verify(((uintptr_t)( head(this) ) + offsetof( $thread, link )) == (uintptr_t)(&this.before)); /* paranoid */ verify(((uintptr_t)( tail(this) ) + offsetof( $thread, link )) == (uintptr_t)(&this.after )); /* paranoid */ verify(head(this)->link.prev == 0p ); /* paranoid */ verify(head(this)->link.next == tail(this) ); /* paranoid */ verify(tail(this)->link.next == 0p ); /* paranoid */ verify(tail(this)->link.prev == head(this) ); /* paranoid */ verify(&head(this)->link.prev == &this.before.link.prev ); /* paranoid */ verify(&head(this)->link.next == &this.before.link.next ); /* paranoid */ verify(&tail(this)->link.prev == &this.after .link.prev ); /* paranoid */ verify(&tail(this)->link.next == &this.after .link.next ); /* paranoid */ verify(sizeof(__intrusive_lane_t) == 128); /* paranoid */ verify(sizeof(this) == 128); /* paranoid */ verify(__alignof__(__intrusive_lane_t) == 128); /* paranoid */ verify(__alignof__(this) == 128); /* paranoid */ verifyf(((intptr_t)(&this) % 128) == 0, "Expected address to be aligned %p %% 128 == %zd", &this, ((intptr_t)(&this) % 128)); /* paranoid */ verifyf(_mask_shiftidx() == 6 , "%llu", _mask_shiftidx()); /* paranoid */ verifyf(_mask_bitsidx () == 63, "%llu", _mask_bitsidx()); } // Dtor is trivial void ^?{}( __intrusive_lane_t & this ) { // Make sure the list is empty /* paranoid */ verify(head(this)->link.prev == 0p ); /* paranoid */ verify(head(this)->link.next == tail(this) ); /* paranoid */ verify(tail(this)->link.next == 0p ); /* paranoid */ verify(tail(this)->link.prev == head(this) ); /* paranoid */ verify(this.count == 0u ); } // Push a thread onto this lane // returns true of lane was empty before push, false otherwise bool push(__intrusive_lane_t & this, $thread * node) { #if defined(__CFA_WITH_VERIFY__) /* paranoid */ verify(this.lock); /* paranoid */ verify(node->link.ts != 0); /* paranoid */ verify(node->link.next == 0p); /* paranoid */ verify(node->link.prev == 0p); /* paranoid */ verify(tail(this)->link.next == 0p); /* paranoid */ verify(head(this)->link.prev == 0p); this.count++; if(this.before.link.ts == 0l) { /* paranoid */ verify(tail(this)->link.prev == head(this)); /* paranoid */ verify(head(this)->link.next == tail(this)); } else { /* paranoid */ verify(tail(this)->link.prev != head(this)); /* paranoid */ verify(head(this)->link.next != tail(this)); } #endif // Get the relevant nodes locally $thread * tail = tail(this); $thread * prev = tail->link.prev; // Do the push node->link.next = tail; node->link.prev = prev; prev->link.next = node; tail->link.prev = node; // Update stats #if !defined(__CFA_NO_SCHED_STATS__) this.stat.diff++; this.stat.push++; #endif verify(node->link.next == tail(this)); // Check if the queue used to be empty if(this.before.link.ts == 0l) { this.before.link.ts = node->link.ts; /* paranoid */ verify(node->link.prev == head(this)); return true; } return false; } // Pop a thread from this lane (must be non-empty) // returns popped // returns true of lane was empty before push, false otherwise [$thread *, bool] pop(__intrusive_lane_t & this) { /* paranoid */ verify(this.lock); /* paranoid */ verify(this.before.link.ts != 0ul); // Get anchors locally $thread * head = head(this); $thread * tail = tail(this); // Get the relevant nodes locally $thread * node = head->link.next; $thread * next = node->link.next; #if defined(__CFA_WITH_VERIFY__) this.count--; /* paranoid */ verify(node != tail); /* paranoid */ verify(node); #endif // Do the pop head->link.next = next; next->link.prev = head; node->link.[next, prev] = 0p; // Update head time stamp this.before.link.ts = next->link.ts; // Update stats #ifndef __CFA_NO_SCHED_STATS__ this.stat.diff--; this.stat.pop ++; #endif // Check if we emptied list and return accordingly /* paranoid */ verify(tail(this)->link.next == 0p); /* paranoid */ verify(head(this)->link.prev == 0p); if(next == tail) { /* paranoid */ verify(this.before.link.ts == 0); /* paranoid */ verify(tail(this)->link.prev == head(this)); /* paranoid */ verify(head(this)->link.next == tail(this)); return [node, true]; } else { /* paranoid */ verify(next->link.ts != 0); /* paranoid */ verify(tail(this)->link.prev != head(this)); /* paranoid */ verify(head(this)->link.next != tail(this)); /* paranoid */ verify(this.before.link.ts != 0); return [node, false]; } } // Check whether or not list is empty static inline bool is_empty(__intrusive_lane_t & this) { // Cannot verify here since it may not be locked return this.before.link.ts == 0; } // Return the timestamp static inline unsigned long long ts(__intrusive_lane_t & this) { // Cannot verify here since it may not be locked return this.before.link.ts; } //======================================================================= // Cforall Reqdy Queue used by ready queue //======================================================================= // Thread local mirror of ready queue statistics #if !defined(__CFA_NO_STATISTICS__) static __attribute__((aligned(128))) thread_local struct { struct { struct { size_t attempt; size_t success; } push; struct { size_t maskrds; size_t attempt; size_t success; } pop; } pick; struct { size_t value; size_t count; } used; } tls = { /* pick */{ /* push */{ 0, 0 }, /* pop */{ 0, 0, 0 }, }, /* used */{ 0, 0 } }; #endif //----------------------------------------------------------------------- void ?{}(__ready_queue_t & this) with (this) { used.count = 0; for( i ; __cfa_lane_mask_size ) { used.mask[i] = 0; } lanes.data = alloc(4); for( i; 4 ) { (lanes.data[i]){}; } lanes.count = 4; #if !defined(__CFA_NO_STATISTICS__) global_stats.pick.push.attempt = 0; global_stats.pick.push.success = 0; global_stats.pick.pop .maskrds = 0; global_stats.pick.pop .attempt = 0; global_stats.pick.pop .success = 0; global_stats.used.value = 0; global_stats.used.count = 0; #endif } void ^?{}(__ready_queue_t & this) with (this) { verify( 4 == lanes.count ); verify( 0 == used .count ); for( i; 4 ) { ^(lanes.data[i]){}; } free(lanes.data); #if defined(__CFA_WITH_VERIFY__) for( i ; __cfa_lane_mask_size ) { assert( 0 == used.mask[i] ); } #endif } //----------------------------------------------------------------------- enum mask_strictness { STRICT, NOCHECK }; // Set a given bit in the bit mask array // strictness determines of the bit had to be cleared before static inline void mask_set(__cfa_readyQ_mask_t * mask, unsigned index, mask_strictness strict) { // Extract the array and bit indexes __cfa_readyQ_mask_t word; __cfa_readyQ_mask_t bit; [bit, word] = extract(index); __cfadbg_print_safe(ready_queue, "Kernel : Ready queue extracted index %u as [bit %llu, word %llu]\n", index, bit, word); // Conditional check verifyf( strict != STRICT || // Conditional check if it was expected to be cleared ((mask[word] & (1ull << bit)) == 0), "Before set %llu:%llu (%u), %llx & %llx", word, bit, index, mask[word], (1ull << bit) ); // Atomically set the bit __attribute__((unused)) bool ret = __atomic_bts(&mask[word], bit); // Conditional check verifyf( strict != STRICT || // Conditional check if it was expected to be cleared !ret, "Bit was not set but bts returned true" ); // Unconditional check verifyf( (mask[word] & (1ull << bit)) != 0, "After set %llu:%llu (%u), %llx & %llx", word, bit, index, mask[word], (1ull << bit) ); } static inline void mask_clear(__cfa_readyQ_mask_t * mask, unsigned index, mask_strictness strict) { // Extract the array and bit indexes __cfa_readyQ_mask_t word; __cfa_readyQ_mask_t bit; [bit, word] = extract(index); // Conditional check verifyf( strict != STRICT || // Conditional check if it was expected to be set ((mask[word] & (1ull << bit)) != 0), "Before clear %llu:%llu (%u), %llx & %llx", word, bit, index, mask[word], (1ull << bit) ); // Atomically clear the bit __attribute__((unused)) bool ret = __atomic_btr(&mask[word], bit); // Conditional check verifyf( strict != STRICT || // Conditional check if it was expected to be cleared ret, "Bit was set but btr returned false" ); // Unconditional check verifyf( (mask[word] & (1ull << bit)) == 0, "After clear %llu:%llu (%u), %llx & %llx", word, bit, index, mask[word], (1ull << bit) ); } //----------------------------------------------------------------------- __attribute__((hot)) bool push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) { __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p (mask %llu)\n", thrd, cltr, used.mask[0]); // write timestamp thrd->link.ts = rdtscl(); // Try to pick a lane and lock it unsigned i; do { // Pick the index of a lane i = __tls_rand() % lanes.count; #if !defined(__CFA_NO_STATISTICS__) tls.pick.push.attempt++; #endif // If we can't lock it retry } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); #if defined(__CFA_WITH_VERIFY__) /* paranoid */ verify(lanes.data[i].last_id == -1u); /* paranoid */ lanes.data[i].last_id = kernelTLS.this_processor->id; #endif __attribute__((unused)) size_t num = __atomic_load_n( &used.count, __ATOMIC_RELAXED ); bool first = false; // Actually push it bool lane_first = push(lanes.data[i], thrd); // If this lane used to be empty we need to do more if(lane_first) { // Update the bit mask mask_set((__cfa_readyQ_mask_t *)used.mask, i, STRICT); // Update the global count size_t ret = __atomic_fetch_add( &used.count, 1z, __ATOMIC_SEQ_CST); // Check if the entire queue used to be empty first = (ret == 0); } #if defined(__CFA_WITH_VERIFY__) /* paranoid */ verifyf( used.count <= lanes.count, "Non-empty count (%zu) exceeds actual count (%zu)\n", used.count, lanes.count ); /* paranoid */ verifyf( lanes.data[i].last_id == kernelTLS.this_processor->id, "Expected last processor to lock queue %u to be %u, was %u\n", i, lanes.data[i].last_id, kernelTLS.this_processor->id ); /* paranoid */ verifyf( lanes.data[i].lock, "List %u is not locked\n", i ); /* paranoid */ lanes.data[i].last_id = -1u; #endif // Unlock and return __atomic_unlock( &lanes.data[i].lock ); __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first); // Update statistics #if !defined(__CFA_NO_STATISTICS__) tls.pick.push.success++; tls.used.value += num; tls.used.count += 1; #endif // return whether or not the list was empty before this push return first; } //----------------------------------------------------------------------- // Given 2 indexes, pick the list with the oldest push an try to pop from it static struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j) with (cltr->ready_queue) { #if !defined(__CFA_NO_STATISTICS__) tls.pick.pop.attempt++; #endif // Pick the bet list int w = i; if( __builtin_expect(!is_empty(lanes.data[j]), true) ) { w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j; } // Get relevant elements locally __intrusive_lane_t & lane = lanes.data[w]; // If list looks empty retry if( is_empty(lane) ) return 0p; // If we can't get the lock retry if( !__atomic_try_acquire(&lane.lock) ) return 0p; #if defined(__CFA_WITH_VERIFY__) /* paranoid */ verify(lane.last_id == -1u); /* paranoid */ lane.last_id = kernelTLS.this_processor->id; #endif // If list is empty, unlock and retry if( is_empty(lane) ) { #if defined(__CFA_WITH_VERIFY__) /* paranoid */ verify(lane.last_id == kernelTLS.this_processor->id); /* paranoid */ lane.last_id = -1u; #endif __atomic_unlock(&lane.lock); return 0p; } // Actually pop the list struct $thread * thrd; bool emptied; [thrd, emptied] = pop(lane); /* paranoid */ verify(thrd); /* paranoid */ verify(lane.last_id == kernelTLS.this_processor->id); /* paranoid */ verify(lane.lock); // If this was the last element in the lane if(emptied) { // Update the global count __atomic_fetch_sub( &used.count, 1z, __ATOMIC_SEQ_CST); // Update the bit mask mask_clear((__cfa_readyQ_mask_t *)used.mask, w, STRICT); } #if defined(__CFA_WITH_VERIFY__) /* paranoid */ verify(lane.last_id == kernelTLS.this_processor->id); /* paranoid */ lane.last_id = -1u; #endif // For statistics, check the count before we release the lock #if !defined(__CFA_NO_STATISTICS__) int num = __atomic_load_n( &used.count, __ATOMIC_RELAXED ); #endif // Unlock and return __atomic_unlock(&lane.lock); // Update statistics #if !defined(__CFA_NO_STATISTICS__) tls.pick.pop.success++; tls.used.value += num; tls.used.count += 1; #endif // return the popped thread return thrd; } // Pop from the ready queue from a given cluster __attribute__((hot)) $thread * pop(struct cluster * cltr) with (cltr->ready_queue) { /* paranoid */ verify( lanes.count > 0 ); // As long as the list is not empty, try finding a lane that isn't empty and pop from it while( __atomic_load_n( &used.count, __ATOMIC_RELAXED ) != 0) { #if !defined(__CFA_READQ_NO_BITMASK__) // If using bit masks #if !defined(__CFA_NO_SCHED_STATS__) tls.pick.pop.maskrds++; #endif // Pick two lists at random unsigned ri = __tls_rand(); unsigned rj = __tls_rand(); // Find which __cfa_readyQ_mask_t the two lists belong unsigned num = ((__atomic_load_n( &lanes.count, __ATOMIC_RELAXED ) - 1) >> 6) + 1; unsigned wdxi = (ri >> 6u) % num; unsigned wdxj = (rj >> 6u) % num; // Get the actual __cfa_readyQ_mask_t size_t maski = __atomic_load_n( &used.mask[wdxi], __ATOMIC_RELAXED ); size_t maskj = __atomic_load_n( &used.mask[wdxj], __ATOMIC_RELAXED ); // If both of these masks are empty, retry if(maski == 0 && maskj == 0) continue; // Pick one of the non-zero bits in the masks and get the bit indexes unsigned bi = rand_bit(ri, maski); unsigned bj = rand_bit(rj, maskj); // some checks /* paranoid */ verifyf(bi < 64, "%zu %u", maski, bi); /* paranoid */ verifyf(bj < 64, "%zu %u", maskj, bj); // get the general list index unsigned i = bi | (wdxi << 6); unsigned j = bj | (wdxj << 6); // some more checks /* paranoid */ verifyf(i < lanes.count, "%u", wdxi << 6); /* paranoid */ verifyf(j < lanes.count, "%u", wdxj << 6); // try popping from the 2 picked lists struct $thread * thrd = try_pop(cltr, i, j); if(thrd) return thrd; #else // Pick two lists at random int i = __tls_rand() % __atomic_load_n( &lanes.count, __ATOMIC_RELAXED ); int j = __tls_rand() % __atomic_load_n( &lanes.count, __ATOMIC_RELAXED ); // try popping from the 2 picked lists struct $thread * thrd = try_pop(cltr, i, j); if(thrd) return thrd; #endif } // All lanes where empty return 0p return 0p; } //----------------------------------------------------------------------- static void check( __ready_queue_t & q ) with (q) { #if defined(__CFA_WITH_VERIFY__) { int idx = 0; for( w ; __cfa_lane_mask_size ) { for( b ; 8 * sizeof(__cfa_readyQ_mask_t) ) { bool is_empty = idx < lanes.count ? (ts(lanes.data[idx]) == 0) : true; bool should_be_empty = 0 == (used.mask[w] & (1z << b)); assertf(should_be_empty == is_empty, "Inconsistent list %d, mask expect : %d, actual is got %d", idx, should_be_empty, (bool)is_empty); assert(__cfa_max_lanes > idx); idx++; } } } { for( idx ; lanes.count ) { __intrusive_lane_t & sl = lanes.data[idx]; assert(!lanes.data[idx].lock); assert(head(sl)->link.prev == 0p ); assert(head(sl)->link.next->link.prev == head(sl) ); assert(tail(sl)->link.next == 0p ); assert(tail(sl)->link.prev->link.next == tail(sl) ); if(sl.before.link.ts == 0l) { assert(tail(sl)->link.prev == head(sl)); assert(head(sl)->link.next == tail(sl)); } else { assert(tail(sl)->link.prev != head(sl)); assert(head(sl)->link.next != tail(sl)); } } } #endif } // Call this function of the intrusive list was moved using memcpy // fixes the list so that the pointers back to anchors aren't left dangling static inline void fix(__intrusive_lane_t & ll) { // if the list is not empty then follow he pointer and fix its reverse if(!is_empty(ll)) { head(ll)->link.next->link.prev = head(ll); tail(ll)->link.prev->link.next = tail(ll); } // Otherwise just reset the list else { verify(tail(ll)->link.next == 0p); tail(ll)->link.prev = head(ll); head(ll)->link.next = tail(ll); verify(head(ll)->link.prev == 0p); } } // Grow the ready queue void ready_queue_grow (struct cluster * cltr) { // Lock the RWlock so no-one pushes/pops while we are changing the queue uint_fast32_t last_size = ready_mutate_lock( *cltr ); __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n"); // Make sure that everything is consistent /* paranoid */ check( cltr->ready_queue ); // grow the ready queue with( cltr->ready_queue ) { size_t ncount = lanes.count; // Check that we have some space left if(ncount + 4 >= __cfa_max_lanes) abort("Program attempted to create more than maximum number of Ready Queues (%zu)", __cfa_max_lanes); // increase count ncount += 4; // Allocate new array (uses realloc and memcpies the data) lanes.data = alloc(lanes.data, ncount); // Fix the moved data for( idx; (size_t)lanes.count ) { fix(lanes.data[idx]); } // Construct new data for( idx; (size_t)lanes.count ~ ncount) { (lanes.data[idx]){}; } // Update original lanes.count = ncount; // fields in 'used' don't need to change when growing } // Make sure that everything is consistent /* paranoid */ check( cltr->ready_queue ); __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n"); // Unlock the RWlock ready_mutate_unlock( *cltr, last_size ); } // Shrink the ready queue void ready_queue_shrink(struct cluster * cltr) { // Lock the RWlock so no-one pushes/pops while we are changing the queue uint_fast32_t last_size = ready_mutate_lock( *cltr ); __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n"); // Make sure that everything is consistent /* paranoid */ check( cltr->ready_queue ); with( cltr->ready_queue ) { // Make sure that the total thread count stays the same #if defined(__CFA_WITH_VERIFY__) size_t nthreads = 0; for( idx; (size_t)lanes.count ) { nthreads += lanes.data[idx].count; } #endif size_t ocount = lanes.count; // Check that we have some space left if(ocount < 8) abort("Program attempted to destroy more Ready Queues than were created"); // reduce the actual count so push doesn't use the old queues lanes.count -= 4; verify(ocount > lanes.count); // for printing count the number of displaced threads #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__) __attribute__((unused)) size_t displaced = 0; #endif // redistribute old data for( idx; (size_t)lanes.count ~ ocount) { // Lock is not strictly needed but makes checking invariants much easier __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock); verify(locked); // As long as we can pop from this lane to push the threads somewhere else in the queue while(!is_empty(lanes.data[idx])) { struct $thread * thrd; __attribute__((unused)) bool _; [thrd, _] = pop(lanes.data[idx]); push(cltr, thrd); // for printing count the number of displaced threads #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__) displaced++; #endif } mask_clear((__cfa_readyQ_mask_t *)used.mask, idx, NOCHECK); // Unlock the lane __atomic_unlock(&lanes.data[idx].lock); // TODO print the queue statistics here ^(lanes.data[idx]){}; } __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced); // recompute the used.count instead of maintaining it used.count = 0; for( i ; __cfa_lane_mask_size ) { used.count += __builtin_popcountl(used.mask[i]); } // Allocate new array (uses realloc and memcpies the data) lanes.data = alloc(lanes.data, lanes.count); // Fix the moved data for( idx; (size_t)lanes.count ) { fix(lanes.data[idx]); } // Make sure that the total thread count stayed the same #if defined(__CFA_WITH_VERIFY__) for( idx; (size_t)lanes.count ) { nthreads -= lanes.data[idx].count; } verifyf(nthreads == 0, "Shrinking changed number of threads"); #endif } // Make sure that everything is consistent /* paranoid */ check( cltr->ready_queue ); __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n"); // Unlock the RWlock ready_mutate_unlock( *cltr, last_size ); } //----------------------------------------------------------------------- #if !defined(__CFA_NO_STATISTICS__) void stats_tls_tally(struct cluster * cltr) with (cltr->ready_queue) { __atomic_fetch_add( &global_stats.pick.push.attempt, tls.pick.push.attempt, __ATOMIC_SEQ_CST ); __atomic_fetch_add( &global_stats.pick.push.success, tls.pick.push.success, __ATOMIC_SEQ_CST ); __atomic_fetch_add( &global_stats.pick.pop .maskrds, tls.pick.pop .maskrds, __ATOMIC_SEQ_CST ); __atomic_fetch_add( &global_stats.pick.pop .attempt, tls.pick.pop .attempt, __ATOMIC_SEQ_CST ); __atomic_fetch_add( &global_stats.pick.pop .success, tls.pick.pop .success, __ATOMIC_SEQ_CST ); __atomic_fetch_add( &global_stats.used.value, tls.used.value, __ATOMIC_SEQ_CST ); __atomic_fetch_add( &global_stats.used.count, tls.used.count, __ATOMIC_SEQ_CST ); } #endif