// // Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo // // The contents of this file are covered under the licence agreement in the // file "LICENCE" distributed with Cforall. // // ready_queue.cfa -- // // Author : Thierry Delisle // Created On : Mon Nov dd 16:29:18 2019 // Last Modified By : // Last Modified On : // Update Count : // #define __cforall_thread__ #include "bits/defs.hfa" #include "kernel_private.hfa" #define _GNU_SOURCE #include "stdlib.hfa" static const size_t cache_line_size = 64; static inline unsigned __max_processors_fallback() { #ifdef __CFA_MAX_PROCESSORS__ return __CFA_MAX_PROCESSORS__; #else // No overriden function, no environment variable, no define // fall back to a magic number return 128; #endif } __attribute__((weak)) unsigned __max_processors() { const char * max_cores_s = getenv("CFA_MAX_PROCESSORS"); if(!max_cores_s) { __cfaabi_dbg_print_nolock("No CFA_MAX_PROCESSORS in ENV"); return __max_processors_fallback(); } char * endptr = 0p; long int max_cores_l = strtol(max_cores_s, &endptr, 10); if(max_cores_l < 1 || max_cores_l > 65535) { __cfaabi_dbg_print_nolock("CFA_MAX_PROCESSORS out of range : %ld", max_cores_l); return __max_processors_fallback(); } if('\0' != *endptr) { __cfaabi_dbg_print_nolock("CFA_MAX_PROCESSORS not a decimal number : %s", max_cores_s); return __max_processors_fallback(); } return max_cores_l; } static inline unsigned rand_bit(unsigned rnum, size_t mask) { verify(sizeof(mask) == 8); unsigned bit = mask ? rnum % __builtin_popcountl(mask) : 0; #if !defined(__BMI2__) uint64_t v = mask; // Input value to find position with rank r. unsigned int r = bit + 1;// Input: bit's desired rank [1-64]. unsigned int s; // Output: Resulting position of bit with rank r [1-64] uint64_t a, b, c, d; // Intermediate temporaries for bit count. unsigned int t; // Bit count temporary. // Do a normal parallel bit count for a 64-bit integer, // but store all intermediate steps. a = v - ((v >> 1) & ~0UL/3); b = (a & ~0UL/5) + ((a >> 2) & ~0UL/5); c = (b + (b >> 4)) & ~0UL/0x11; d = (c + (c >> 8)) & ~0UL/0x101; t = (d >> 32) + (d >> 48); // Now do branchless select! s = 64; s -= ((t - r) & 256) >> 3; r -= (t & ((t - r) >> 8)); t = (d >> (s - 16)) & 0xff; s -= ((t - r) & 256) >> 4; r -= (t & ((t - r) >> 8)); t = (c >> (s - 8)) & 0xf; s -= ((t - r) & 256) >> 5; r -= (t & ((t - r) >> 8)); t = (b >> (s - 4)) & 0x7; s -= ((t - r) & 256) >> 6; r -= (t & ((t - r) >> 8)); t = (a >> (s - 2)) & 0x3; s -= ((t - r) & 256) >> 7; r -= (t & ((t - r) >> 8)); t = (v >> (s - 1)) & 0x1; s -= ((t - r) & 256) >> 8; return s - 1; #else uint64_t picked = _pdep_u64(1ul << bit, mask); return picked ? __builtin_ctzl(picked) : 0; #endif } static inline __cfa_readyQ_mask_t readyQ_mask_full () { return (8 * sizeof(__cfa_readyQ_mask_t)) - 1; } static inline __cfa_readyQ_mask_t readyQ_mask_shit_length() { return (8 * sizeof(__cfa_readyQ_mask_t)) - __builtin_clzl(readyQ_mask_full()); } static inline [__cfa_readyQ_mask_t, __cfa_readyQ_mask_t] extract(__cfa_readyQ_mask_t idx) { __cfa_readyQ_mask_t word = idx >> readyQ_mask_shit_length(); __cfa_readyQ_mask_t bit = idx & readyQ_mask_full(); return [bit, word]; } //======================================================================= // Cluster wide reader-writer lock //======================================================================= void ?{}(__clusterRWLock_t & this) { this.max = __max_processors(); this.alloc = 0; this.ready = 0; this.lock = false; this.data = alloc(this.max); /*paranoid*/ verify( 0 == (((uintptr_t)(this.data )) % 64) ); /*paranoid*/ verify( 0 == (((uintptr_t)(this.data + 1)) % 64) ); /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc)); /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready)); } void ^?{}(__clusterRWLock_t & this) { free(this.data); } void ?{}( __processor_id & this, struct processor * proc ) { this.handle = proc; this.lock = false; } //======================================================================= // Lock-Free registering/unregistering of threads unsigned doregister( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) { // Step - 1 : check if there is already space in the data uint_fast32_t s = ready; // Check among all the ready for(uint_fast32_t i = 0; i < s; i++) { processor * null = 0p; // Re-write every loop since compare thrashes it if( __atomic_load_n(&data[i].handle, (int)__ATOMIC_RELAXED) == null && __atomic_compare_exchange_n( &data[i].handle, &null, proc, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { /*paranoid*/ verify(i < ready); /*paranoid*/ verify(__alignof__(data[i]) == cache_line_size); /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0); return i; } } if(max <= alloc) abort("Trying to create more than %ud processors", cltr->ready_lock.max); // Step - 2 : F&A to get a new spot in the array. uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST); if(max <= n) abort("Trying to create more than %ud processors", cltr->ready_lock.max); // Step - 3 : Mark space as used and then publish it. __processor_id * storage = (__processor_id *)&data[n]; (*storage){ proc }; while(true) { unsigned copy = n; if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n && __atomic_compare_exchange_n(&ready, ©, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) break; asm volatile("pause"); } // Return new spot. /*paranoid*/ verify(n < ready); /*paranoid*/ verify(__alignof__(data[n]) == cache_line_size); /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0); return n; } void unregister( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) { unsigned id = proc->id; /*paranoid*/ verify(id < ready); /*paranoid*/ verify(proc == __atomic_load_n(&data[id].handle, __ATOMIC_RELAXED)); __atomic_store_n(&data[id].handle, 0p, __ATOMIC_RELEASE); } //----------------------------------------------------------------------- // Writer side : acquire when changing the ready queue, e.g. adding more // queues or removing them. uint_fast32_t ready_mutate_lock( struct cluster & cltr ) with(cltr.ready_lock) { // Step 1 : lock global lock // It is needed to avoid processors that register mid Critical-Section // to simply lock their own lock and enter. __atomic_acquire( &lock ); // Step 2 : lock per-proc lock // Processors that are currently being registered aren't counted // but can't be in read_lock or in the critical section. // All other processors are counted uint_fast32_t s = ready; for(uint_fast32_t i = 0; i < s; i++) { __atomic_acquire( &data[i].lock ); } return s; } void ready_mutate_unlock( struct cluster & cltr, uint_fast32_t last_s ) with(cltr.ready_lock) { // Step 1 : release local locks // This must be done while the global lock is held to avoid // threads that where created mid critical section // to race to lock their local locks and have the writer // immidiately unlock them // Alternative solution : return s in write_lock and pass it to write_unlock for(uint_fast32_t i = 0; i < last_s; i++) { verify(data[i].lock); __atomic_store_n(&data[i].lock, (bool)false, __ATOMIC_RELEASE); } // Step 2 : release global lock /*paranoid*/ assert(true == lock); __atomic_store_n(&lock, (bool)false, __ATOMIC_RELEASE); } //======================================================================= // Intrusive Queue used by ready queue //======================================================================= // Get the head pointer (one before the first element) from the anchor static inline thread_desc * head(const __intrusive_ready_queue_t & this) { thread_desc * rhead = (thread_desc *)( (uintptr_t)( &this.before ) - offsetof( thread_desc, link ) ); /* paranoid */ verify(rhead); return rhead; } // Get the tail pointer (one after the last element) from the anchor static inline thread_desc * tail(const __intrusive_ready_queue_t & this) { thread_desc * rtail = (thread_desc *)( (uintptr_t)( &this.after ) - offsetof( thread_desc, link ) ); /* paranoid */ verify(rtail); return rtail; } // Ctor void ?{}( __intrusive_ready_queue_t & this ) { this.lock = false; this.last_id = -1u; this.before.link.prev = 0p; this.before.link.next = tail(this); this.before.link.ts = 0; this.after .link.prev = head(this); this.after .link.next = 0p; this.after .link.ts = 0; #if !defined(__CFA_NO_SCHED_STATS__) this.stat.diff = 0; this.stat.push = 0; this.stat.pop = 0; #endif // We add a boat-load of assertions here because the anchor code is very fragile /* paranoid */ verify(((uintptr_t)( head(this) ) + offsetof( thread_desc, link )) == (uintptr_t)(&this.before)); /* paranoid */ verify(((uintptr_t)( tail(this) ) + offsetof( thread_desc, link )) == (uintptr_t)(&this.after )); /* paranoid */ verify(head(this)->link.prev == 0p ); /* paranoid */ verify(head(this)->link.next == tail(this) ); /* paranoid */ verify(tail(this)->link.next == 0p ); /* paranoid */ verify(tail(this)->link.prev == head(this) ); /* paranoid */ verify(&head(this)->link.prev == &this.before.link.prev ); /* paranoid */ verify(&head(this)->link.next == &this.before.link.next ); /* paranoid */ verify(&tail(this)->link.prev == &this.after .link.prev ); /* paranoid */ verify(&tail(this)->link.next == &this.after .link.next ); /* paranoid */ verify(sizeof(__intrusive_ready_queue_t) == 128); /* paranoid */ verify(sizeof(this) == 128); /* paranoid */ verify(__alignof__(__intrusive_ready_queue_t) == 128); /* paranoid */ verify(__alignof__(this) == 128); /* paranoid */ verifyf(((intptr_t)(&this) % 128) == 0, "Expected address to be aligned %p %% 128 == %zd", &this, ((intptr_t)(&this) % 128)); /* paranoid */ verifyf(readyQ_mask_shit_length() == 6 , "%zu", readyQ_mask_shit_length()); /* paranoid */ verifyf(readyQ_mask_full() == 63, "%zu", readyQ_mask_full()); } // Dtor is trivial void ^?{}( __intrusive_ready_queue_t & this ) { // Make sure the list is empty /* paranoid */ verify(head(this)->link.prev == 0p ); /* paranoid */ verify(head(this)->link.next == tail(this) ); /* paranoid */ verify(tail(this)->link.next == 0p ); /* paranoid */ verify(tail(this)->link.prev == head(this) ); } bool push(__intrusive_ready_queue_t & this, thread_desc * node) { verify(this.lock); verify(node->link.ts != 0); verify(node->link.next == 0p); verify(node->link.prev == 0p); if(this.before.link.ts == 0l) { verify(tail(this)->link.next == 0p); verify(tail(this)->link.prev == head(this)); verify(head(this)->link.next == tail(this)); verify(head(this)->link.prev == 0p); } // Get the relevant nodes locally thread_desc * tail = tail(this); thread_desc * prev = tail->link.prev; // Do the push node->link.next = tail; node->link.prev = prev; prev->link.next = node; tail->link.prev = node; // Update stats #ifndef __CFA_NO_SCHED_STATS__ this.stat.diff++; this.stat.push++; #endif verify(node->link.next == tail(this)); // Check if the queue used to be empty if(this.before.link.ts == 0l) { this.before.link.ts = node->link.ts; verify(node->link.prev == head(this)); return true; } return false; } [thread_desc *, bool] pop(__intrusive_ready_queue_t & this) { verify(this.lock); verify(this.before.link.ts != 0ul); thread_desc * head = head(this); thread_desc * tail = tail(this); thread_desc * node = head->link.next; thread_desc * next = node->link.next; if(node == tail) { verify(false); verify(this.before.link.ts == 0ul); verify(tail(this)->link.next == 0p); verify(tail(this)->link.prev == head(this)); verify(head(this)->link.next == tail(this)); verify(head(this)->link.prev == 0p); return [0p, false]; } /* paranoid */ verify(node); head->link.next = next; next->link.prev = head; #ifndef __CFA_NO_SCHED_STATS__ this.stat.diff--; this.stat.pop ++; #endif if(next == tail) { this.before.link.ts = 0ul; verify(tail(this)->link.next == 0p); verify(tail(this)->link.prev == head(this)); verify(head(this)->link.next == tail(this)); verify(head(this)->link.prev == 0p); node->link.[next, prev] = 0p; return [node, true]; } else { verify(next->link.ts != 0); this.before.link.ts = next->link.ts; verify(this.before.link.ts != 0); node->link.[next, prev] = 0p; return [node, false]; } } static inline unsigned long long ts(__intrusive_ready_queue_t & this) { return this.before.link.ts; } //======================================================================= // Cforall Reqdy Queue used by ready queue //======================================================================= static __attribute__((aligned(128))) thread_local struct { struct { struct { size_t attempt; size_t success; } push; struct { size_t maskrds; size_t attempt; size_t success; } pop; } pick; struct { size_t value; size_t count; } full; } tls = { /* pick */{ /* push */{ 0, 0 }, /* pop */{ 0, 0, 0 }, }, /* full */{ 0, 0 } }; //----------------------------------------------------------------------- void ?{}(__ready_queue_t & this) with (this) { empty.count = 0; for( i ; __cfa_readyQ_mask_size ) { empty.mask[i] = 0; } list.data = alloc(4); for( i; 4 ) { (list.data[i]){}; } list.count = 4; #if !defined(__CFA_NO_STATISTICS__) global_stats.pick.push.attempt = 0; global_stats.pick.push.success = 0; global_stats.pick.pop .maskrds = 0; global_stats.pick.pop .attempt = 0; global_stats.pick.pop .success = 0; global_stats.full.value = 0; global_stats.full.count = 0; #endif } void ^?{}(__ready_queue_t & this) with (this) { verify( 4 == list .count ); verify( 0 == empty.count ); for( i; 4 ) { ^(list.data[i]){}; } free(list.data); #if defined(__CFA_WITH_VERIFY__) for( i ; __cfa_readyQ_mask_size ) { assert( 0 == empty.mask[i] ); } #endif } //----------------------------------------------------------------------- __attribute__((hot)) bool push(struct cluster * cltr, struct thread_desc * thrd) with (cltr->ready_queue) { thrd->link.ts = rdtscl(); while(true) { // Pick a random list unsigned i = tls_rand() % list.count; #if !defined(__CFA_NO_STATISTICS__) tls.pick.push.attempt++; #endif // If we can't lock it retry if( !__atomic_try_acquire( &list.data[i].lock ) ) continue; verify(list.data[i].last_id == -1u); list.data[i].last_id = kernelTLS.this_processor->id; __attribute__((unused)) size_t num = __atomic_load_n( &empty.count, __ATOMIC_RELAXED ); bool first = false; verify( list.data[i].last_id == kernelTLS.this_processor->id ); verify( list.data[i].lock ); // Actually push it if(push(list.data[i], thrd)) { size_t ret = __atomic_fetch_add( &empty.count, 1z, __ATOMIC_SEQ_CST); first = (ret == 0); __cfa_readyQ_mask_t word; __cfa_readyQ_mask_t bit; [bit, word] = extract(i); verifyf((empty.mask[word] & (1ull << bit)) == 0, "Before set %llu:%llu (%u), %llx & %llx", word, bit, i, empty.mask[word], (1ull << bit)); __attribute__((unused)) bool ret = bts(&empty.mask[word], bit); verify(!(bool)ret); verifyf((empty.mask[word] & (1ull << bit)) != 0, "After set %llu:%llu (%u), %llx & %llx", word, bit, i, empty.mask[word], (1ull << bit)); } verify(empty.count <= (int)list.count); verify( list.data[i].last_id == kernelTLS.this_processor->id ); verify( list.data[i].lock ); // Unlock and return list.data[i].last_id = -1u; __atomic_unlock( &list.data[i].lock ); #if !defined(__CFA_NO_STATISTICS__) tls.pick.push.success++; tls.full.value += num; tls.full.count += 1; #endif return first; } } //----------------------------------------------------------------------- static struct thread_desc * try_pop(struct cluster * cltr, unsigned i, unsigned j) with (cltr->ready_queue) { #if !defined(__CFA_NO_STATISTICS__) tls.pick.pop.attempt++; #endif // Pick the bet list int w = i; if( __builtin_expect(ts(list.data[j]) != 0, true) ) { w = (ts(list.data[i]) < ts(list.data[j])) ? i : j; } __intrusive_ready_queue_t & list = list.data[w]; // If list looks empty retry if( ts(list) == 0 ) return 0p; // If we can't get the lock retry if( !__atomic_try_acquire(&list.lock) ) return 0p; verify(list.last_id == -1u); list.last_id = kernelTLS.this_processor->id; verify(list.last_id == kernelTLS.this_processor->id); __attribute__((unused)) int num = __atomic_load_n( &empty.count, __ATOMIC_RELAXED ); // If list is empty, unlock and retry if( ts(list) == 0 ) { list.last_id = -1u; __atomic_unlock(&list.lock); return 0p; } { __cfa_readyQ_mask_t word; __cfa_readyQ_mask_t bit; [bit, word] = extract(w); verify((empty.mask[word] & (1ull << bit)) != 0); } verify(list.last_id == kernelTLS.this_processor->id); verify(list.lock); // Actually pop the list struct thread_desc * thrd; bool emptied; [thrd, emptied] = pop(list); verify(thrd); verify(list.last_id == kernelTLS.this_processor->id); verify(list.lock); if(emptied) { __atomic_fetch_sub( &empty.count, 1z, __ATOMIC_SEQ_CST); __cfa_readyQ_mask_t word; __cfa_readyQ_mask_t bit; [bit, word] = extract(w); verify((empty.mask[word] & (1ull << bit)) != 0); __attribute__((unused)) bool ret = btr(&empty.mask[word], bit); verify(ret); verify((empty.mask[word] & (1ull << bit)) == 0); } verify(list.lock); // Unlock and return list.last_id = -1u; __atomic_unlock(&list.lock); verify(empty.count >= 0); #if !defined(__CFA_NO_STATISTICS__) tls.pick.pop.success++; tls.full.value += num; tls.full.count += 1; #endif return thrd; } __attribute__((hot)) thread_desc * pop(struct cluster * cltr) with (cltr->ready_queue) { verify( list.count > 0 ); while( __atomic_load_n( &empty.count, __ATOMIC_RELAXED ) != 0) { #if !defined(__CFA_READQ_NO_BITMASK__) tls.pick.pop.maskrds++; unsigned i, j; { #if !defined(__CFA_NO_SCHED_STATS__) tls.pick.pop.maskrds++; #endif // Pick two lists at random unsigned num = ((__atomic_load_n( &list.count, __ATOMIC_RELAXED ) - 1) >> 6) + 1; unsigned ri = tls_rand(); unsigned rj = tls_rand(); unsigned wdxi = (ri >> 6u) % num; unsigned wdxj = (rj >> 6u) % num; size_t maski = __atomic_load_n( &empty.mask[wdxi], __ATOMIC_RELAXED ); size_t maskj = __atomic_load_n( &empty.mask[wdxj], __ATOMIC_RELAXED ); if(maski == 0 && maskj == 0) continue; unsigned bi = rand_bit(ri, maski); unsigned bj = rand_bit(rj, maskj); verifyf(bi < 64, "%zu %u", maski, bi); verifyf(bj < 64, "%zu %u", maskj, bj); i = bi | (wdxi << 6); j = bj | (wdxj << 6); verifyf(i < list.count, "%u", wdxi << 6); verifyf(j < list.count, "%u", wdxj << 6); } struct thread_desc * thrd = try_pop(cltr, i, j); if(thrd) return thrd; #else // Pick two lists at random int i = tls_rand() % __atomic_load_n( &list.count, __ATOMIC_RELAXED ); int j = tls_rand() % __atomic_load_n( &list.count, __ATOMIC_RELAXED ); struct thread_desc * thrd = try_pop(cltr, i, j); if(thrd) return thrd; #endif } return 0p; } //----------------------------------------------------------------------- static void check( __ready_queue_t & q ) with (q) { #if defined(__CFA_WITH_VERIFY__) { int idx = 0; for( w ; __cfa_readyQ_mask_size ) { for( b ; 8 * sizeof(__cfa_readyQ_mask_t) ) { bool is_empty = idx < list.count ? (ts(list.data[idx]) == 0) : true; bool should_be_empty = 0 == (empty.mask[w] & (1z << b)); assertf(should_be_empty == is_empty, "Inconsistent list %d, mask expect : %d, actual is got %d", idx, should_be_empty, (bool)is_empty); assert(__cfa_max_readyQs > idx); idx++; } } } { for( idx ; list.count ) { __intrusive_ready_queue_t & sl = list.data[idx]; assert(!list.data[idx].lock); assert(head(sl)->link.prev == 0p ); assert(head(sl)->link.next->link.prev == head(sl) ); assert(tail(sl)->link.next == 0p ); assert(tail(sl)->link.prev->link.next == tail(sl) ); if(sl.before.link.ts == 0l) { assert(tail(sl)->link.next == 0p); assert(tail(sl)->link.prev == head(sl)); assert(head(sl)->link.next == tail(sl)); assert(head(sl)->link.prev == 0p); } } } #endif } // Call this function of the intrusive list was moved using memcpy // fixes the list so that the pointers back to anchors aren't left // dangling static inline void fix(__intrusive_ready_queue_t & ll) { // if the list is not empty then follow he pointer // and fix its reverse if(ll.before.link.ts != 0l) { head(ll)->link.next->link.prev = head(ll); tail(ll)->link.prev->link.next = tail(ll); } // Otherwise just reset the list else { tail(ll)->link.next = 0p; tail(ll)->link.prev = head(ll); head(ll)->link.next = tail(ll); head(ll)->link.prev = 0p; } } void ready_queue_grow (struct cluster * cltr) { uint_fast32_t last_size = ready_mutate_lock( *cltr ); check( cltr->ready_queue ); with( cltr->ready_queue ) { size_t ncount = list.count; // Check that we have some space left if(ncount + 4 >= __cfa_max_readyQs) abort("Program attempted to create more than maximum number of Ready Queues (%zu)", __cfa_max_readyQs); ncount += 4; // Allocate new array list.data = alloc(list.data, ncount); // Fix the moved data for( idx; (size_t)list.count ) { fix(list.data[idx]); } // Construct new data for( idx; (size_t)list.count ~ ncount) { (list.data[idx]){}; } // Update original list.count = ncount; // fields in empty don't need to change } // Make sure that everything is consistent check( cltr->ready_queue ); ready_mutate_unlock( *cltr, last_size ); } void ready_queue_shrink(struct cluster * cltr) { uint_fast32_t last_size = ready_mutate_lock( *cltr ); with( cltr->ready_queue ) { size_t ocount = list.count; // Check that we have some space left if(ocount < 8) abort("Program attempted to destroy more Ready Queues than were created"); list.count -= 4; // redistribute old data verify(ocount > list.count); for( idx; (size_t)list.count ~ ocount) { // This is not strictly needed but makes checking invariants much easier bool locked = __atomic_try_acquire(&list.data[idx].lock); verify(locked); while(0 != ts(list.data[idx])) { struct thread_desc * thrd; __attribute__((unused)) bool _; [thrd, _] = pop(list.data[idx]); verify(thrd); push(cltr, thrd); } __atomic_unlock(&list.data[idx].lock); // TODO print the queue statistics here ^(list.data[idx]){}; } // clear the now unused masks { __cfa_readyQ_mask_t fword, fbit, lword, lbit; [fbit, fword] = extract(ocount); [lbit, lword] = extract(list.count); // For now assume that all queues where coverd by the same bitmask // This is highly probable as long as grow and shrink use groups of 4 // exclusively verify(fword == lword); __cfa_readyQ_mask_t clears = ~0; for( b ; fbit ~ lbit ) { clears ^= 1 << b; } empty.mask[fword] &= clears; } // Allocate new array list.data = alloc(list.data, list.count); // Fix the moved data for( idx; (size_t)list.count ) { fix(list.data[idx]); } } // Make sure that everything is consistent check( cltr->ready_queue ); ready_mutate_unlock( *cltr, last_size ); } //----------------------------------------------------------------------- #if !defined(__CFA_NO_STATISTICS__) void stats_tls_tally(struct cluster * cltr) with (cltr->ready_queue) { __atomic_fetch_add( &global_stats.pick.push.attempt, tls.pick.push.attempt, __ATOMIC_SEQ_CST ); __atomic_fetch_add( &global_stats.pick.push.success, tls.pick.push.success, __ATOMIC_SEQ_CST ); __atomic_fetch_add( &global_stats.pick.pop .maskrds, tls.pick.pop .maskrds, __ATOMIC_SEQ_CST ); __atomic_fetch_add( &global_stats.pick.pop .attempt, tls.pick.pop .attempt, __ATOMIC_SEQ_CST ); __atomic_fetch_add( &global_stats.pick.pop .success, tls.pick.pop .success, __ATOMIC_SEQ_CST ); __atomic_fetch_add( &global_stats.full.value, tls.full.value, __ATOMIC_SEQ_CST ); __atomic_fetch_add( &global_stats.full.count, tls.full.count, __ATOMIC_SEQ_CST ); } #endif