// // Cforall Version 1.0.0 Copyright (C) 2022 University of Waterloo // // The contents of this file are covered under the licence agreement in the // file "LICENCE" distributed with Cforall. // // cluster.cfa -- file that includes helpers for subsystem that need cluster wide support // // Author : Thierry Delisle // Created On : Fri Mar 11 12:39:24 2022 // Last Modified By : // Last Modified On : // Update Count : // #define __cforall_thread__ #include "bits/defs.hfa" #include "device/cpu.hfa" #include "kernel/cluster.hfa" #include "kernel/private.hfa" #include "stdlib.hfa" #include "limits.hfa" #include "math.hfa" #include "ready_subqueue.hfa" #include "io/types.hfa" #include #include extern "C" { #include // __NR_xxx } // No overriden function, no environment variable, no define // fall back to a magic number #ifndef __CFA_MAX_PROCESSORS__ #define __CFA_MAX_PROCESSORS__ 1024 #endif #if !defined(__CFA_NO_STATISTICS__) #define __STATS(...) __VA_ARGS__ #else #define __STATS(...) #endif // returns the maximum number of processors the RWLock support __attribute__((weak)) unsigned __max_processors() libcfa_public { const char * max_cores_s = getenv("CFA_MAX_PROCESSORS"); if(!max_cores_s) { __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n"); return __CFA_MAX_PROCESSORS__; } char * endptr = 0p; long int max_cores_l = strtol(max_cores_s, &endptr, 10); if(max_cores_l < 1 || max_cores_l > 65535) { __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l); return __CFA_MAX_PROCESSORS__; } if('\0' != *endptr) { __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s); return __CFA_MAX_PROCESSORS__; } return max_cores_l; } //======================================================================= // Cluster wide reader-writer lock //======================================================================= void ?{}(__scheduler_RWLock_t & this) { this.lock.max = __max_processors(); this.lock.alloc = 0; this.lock.ready = 0; this.lock.data = alloc(this.lock.max); this.lock.write_lock = false; /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.lock.alloc), &this.lock.alloc)); /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.lock.ready), &this.lock.ready)); } void ^?{}(__scheduler_RWLock_t & this) { free(this.lock.data); } //======================================================================= // Lock-Free registering/unregistering of threads unsigned register_proc_id( void ) with(__scheduler_lock.lock) { bool * handle = (bool *)&kernelTLS().sched_lock; // Step - 1 : check if there is already space in the data uint_fast32_t s = ready; // Check among all the ready for(uint_fast32_t i = 0; i < s; i++) { bool * volatile * cell = (bool * volatile *)&data[i]; // Cforall is bugged and the double volatiles causes problems /* paranoid */ verify( handle != *cell ); bool * null = 0p; // Re-write every loop since compare thrashes it if( __atomic_load_n(cell, (int)__ATOMIC_RELAXED) == null && __atomic_compare_exchange_n( cell, &null, handle, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { /* paranoid */ verify(i < ready); /* paranoid */ verify( (kernelTLS().sched_id = i, true) ); return i; } } if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock.lock.max); // Step - 2 : F&A to get a new spot in the array. uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST); if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock.lock.max); // Step - 3 : Mark space as used and then publish it. data[n] = handle; while() { unsigned copy = n; if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n && __atomic_compare_exchange_n(&ready, ©, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) break; Pause(); } // Return new spot. /* paranoid */ verify(n < ready); /* paranoid */ verify( (kernelTLS().sched_id = n, true) ); return n; } void unregister_proc_id( unsigned id ) with(__scheduler_lock.lock) { /* paranoid */ verify(id < ready); /* paranoid */ verify(id == kernelTLS().sched_id); /* paranoid */ verify(data[id] == &kernelTLS().sched_lock); bool * volatile * cell = (bool * volatile *)&data[id]; // Cforall is bugged and the double volatiles causes problems __atomic_store_n(cell, 0p, __ATOMIC_RELEASE); } //----------------------------------------------------------------------- // Writer side : acquire when changing the ready queue, e.g. adding more // queues or removing them. uint_fast32_t ready_mutate_lock( void ) with(__scheduler_lock.lock) { /* paranoid */ verify( ! __preemption_enabled() ); // Step 1 : lock global lock // It is needed to avoid processors that register mid Critical-Section // to simply lock their own lock and enter. __atomic_acquire( &write_lock ); // Make sure we won't deadlock ourself // Checking before acquiring the writer lock isn't safe // because someone else could have locked us. /* paranoid */ verify( ! kernelTLS().sched_lock ); // Step 2 : lock per-proc lock // Processors that are currently being registered aren't counted // but can't be in read_lock or in the critical section. // All other processors are counted uint_fast32_t s = ready; for(uint_fast32_t i = 0; i < s; i++) { volatile bool * llock = data[i]; if(llock) __atomic_acquire( llock ); } /* paranoid */ verify( ! __preemption_enabled() ); return s; } void ready_mutate_unlock( uint_fast32_t last_s ) with(__scheduler_lock.lock) { /* paranoid */ verify( ! __preemption_enabled() ); // Step 1 : release local locks // This must be done while the global lock is held to avoid // threads that where created mid critical section // to race to lock their local locks and have the writer // immidiately unlock them // Alternative solution : return s in write_lock and pass it to write_unlock for(uint_fast32_t i = 0; i < last_s; i++) { volatile bool * llock = data[i]; if(llock) __atomic_store_n(llock, (bool)false, __ATOMIC_RELEASE); } // Step 2 : release global lock /*paranoid*/ assert(true == write_lock); __atomic_store_n(&write_lock, (bool)false, __ATOMIC_RELEASE); /* paranoid */ verify( ! __preemption_enabled() ); } //======================================================================= // Cluster growth static const unsigned __readyq_single_shard = 2; void ?{}(__timestamp_t & this) { this.t.tv = 0; this.t.ma = 0; } void ^?{}(__timestamp_t &) {} //----------------------------------------------------------------------- // Check that all the intrusive queues in the data structure are still consistent static void check_readyQ( cluster * cltr ) with (cltr->sched) { #if defined(__CFA_WITH_VERIFY__) { const unsigned lanes_count = readyQ.count; for( idx ; lanes_count ) { __intrusive_lane_t & sl = readyQ.data[idx]; assert(!readyQ.data[idx].l.lock); if(is_empty(sl)) { assert( sl.l.anchor.next == 0p ); assert( sl.l.anchor.ts == MAX ); assert( mock_head(sl) == sl.l.prev ); } else { assert( sl.l.anchor.next != 0p ); assert( sl.l.anchor.ts != MAX ); assert( mock_head(sl) != sl.l.prev ); } } } #endif } // Call this function of the intrusive list was moved using memcpy // fixes the list so that the pointers back to anchors aren't left dangling static inline void fix(__intrusive_lane_t & ll) { if(is_empty(ll)) { verify(ll.l.anchor.next == 0p); ll.l.prev = mock_head(ll); } } static void assign_list(unsigned & valrq, unsigned & valio, dlist(struct processor) & list, unsigned count) { struct processor * it = &list`first; for(unsigned i = 0; i < count; i++) { /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count); it->rdq.id = valrq; it->rdq.target = UINT_MAX; valrq += __shard_factor.readyq; #if defined(CFA_HAVE_LINUX_IO_URING_H) it->io.ctx->cq.id = valio; it->io.target = UINT_MAX; valio += __shard_factor.io; #endif it = &(*it)`next; } } static void reassign_cltr_id(struct cluster * cltr) { unsigned prefrq = 0; unsigned prefio = 0; assign_list(prefrq, prefio, cltr->procs.actives, cltr->procs.total - cltr->procs.idle); assign_list(prefrq, prefio, cltr->procs.idles , cltr->procs.idle ); } #if defined(CFA_HAVE_LINUX_IO_URING_H) static void assign_io(io_context$ ** data, size_t count, dlist(struct processor) & list) { struct processor * it = &list`first; while(it) { /* paranoid */ verifyf( it, "Unexpected null iterator\n"); /* paranoid */ verifyf( it->io.ctx->cq.id < count, "Processor %p has id %u above count %zu\n", it, it->rdq.id, count); data[it->io.ctx->cq.id] = it->io.ctx; it = &(*it)`next; } } static void reassign_cltr_io(struct cluster * cltr) { assign_io(cltr->sched.io.data, cltr->sched.io.count, cltr->procs.actives); assign_io(cltr->sched.io.data, cltr->sched.io.count, cltr->procs.idles ); } #else static void reassign_cltr_io(struct cluster *) {} #endif static void fix_times( __timestamp_t * volatile & tscs, unsigned count ) { tscs = alloc(count, tscs`realloc); for(i; count) { tscs[i].t.tv = rdtscl(); tscs[i].t.ma = 0; } } // Grow the ready queue void ready_queue_grow(struct cluster * cltr) { int target = cltr->procs.total; /* paranoid */ verify( ready_mutate_islocked() ); __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n"); // Make sure that everything is consistent /* paranoid */ check_readyQ( cltr ); // Find new count // Make sure we always have atleast 1 list size_t ocount = cltr->sched.readyQ.count; size_t ncount = max(target * __shard_factor.readyq, __readyq_single_shard); // Do we have to do anything? if( ocount != ncount ) { // grow the ready queue with( cltr->sched ) { // Allocate new array (uses realloc and memcpies the data) readyQ.data = alloc( ncount, readyQ.data`realloc ); // Fix the moved data for( idx; ocount ) { fix(readyQ.data[idx]); } // Construct new data for( idx; ocount ~ ncount) { (readyQ.data[idx]){}; } // Update original count readyQ.count = ncount; } fix_times(cltr->sched.readyQ.tscs, cltr->sched.readyQ.count); } // Fix the io times cltr->sched.io.count = target * __shard_factor.io; fix_times(cltr->sched.io.tscs, cltr->sched.io.count); // realloc the caches cltr->sched.caches = alloc( target, cltr->sched.caches`realloc ); // reassign the clusters. reassign_cltr_id(cltr); cltr->sched.io.data = alloc( cltr->sched.io.count, cltr->sched.io.data`realloc ); reassign_cltr_io(cltr); // Make sure that everything is consistent /* paranoid */ check_readyQ( cltr ); // /* paranoid */ verify( (target == 0) == (cltr->sched.caches == 0p) ); __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n"); /* paranoid */ verify( ready_mutate_islocked() ); } // Shrink the ready queue void ready_queue_shrink(struct cluster * cltr) { /* paranoid */ verify( ready_mutate_islocked() ); __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n"); // Make sure that everything is consistent /* paranoid */ check_readyQ( cltr ); int target = cltr->procs.total; with( cltr->sched ) { // Remember old count size_t ocount = readyQ.count; // Find new count // Make sure we always have atleast 1 list size_t ncount = max(target * __shard_factor.readyq, __readyq_single_shard); /* paranoid */ verifyf( ocount >= ncount, "Error in shrinking size calculation, %zu >= %zu", ocount, ncount ); /* paranoid */ verifyf( ncount == target * __shard_factor.readyq || ncount == __readyq_single_shard, /* paranoid */ "Error in shrinking size calculation, expected %u or %u, got %zu", target * __shard_factor.readyq, __readyq_single_shard, ncount ); readyQ.count = ncount; // for printing count the number of displaced threads #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__) __attribute__((unused)) size_t displaced = 0; #endif // redistribute old data for( idx; ncount ~ ocount) { // Lock is not strictly needed but makes checking invariants much easier __attribute__((unused)) bool locked = __atomic_try_acquire(&readyQ.data[idx].l.lock); verify(locked); // As long as we can pop from this lane to push the threads somewhere else in the queue while(!is_empty(readyQ.data[idx])) { struct thread$ * thrd; unsigned long long _; [thrd, _] = pop(readyQ.data[idx]); push(cltr, thrd, true); // for printing count the number of displaced threads #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__) displaced++; #endif } // Unlock the lane __atomic_unlock(&readyQ.data[idx].l.lock); // TODO print the queue statistics here ^(readyQ.data[idx]){}; } __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced); // Allocate new array (uses realloc and memcpies the data) readyQ.data = alloc( ncount, readyQ.data`realloc ); // Fix the moved data for( idx; ncount ) { fix(readyQ.data[idx]); } fix_times(readyQ.tscs, ncount); } cltr->sched.caches = alloc( target, cltr->sched.caches`realloc ); // Fix the io times cltr->sched.io.count = target * __shard_factor.io; fix_times(cltr->sched.io.tscs, cltr->sched.io.count); reassign_cltr_id(cltr); cltr->sched.io.data = alloc( cltr->sched.io.count, cltr->sched.io.data`realloc ); reassign_cltr_io(cltr); // Make sure that everything is consistent // /* paranoid */ verify( (target == 0) == (cltr->sched.caches == 0p) ); /* paranoid */ check_readyQ( cltr ); __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n"); /* paranoid */ verify( ready_mutate_islocked() ); } void ready_queue_close(struct cluster * cltr) { free( cltr->sched.readyQ.data ); free( cltr->sched.readyQ.tscs ); cltr->sched.readyQ.data = 0p; cltr->sched.readyQ.tscs = 0p; cltr->sched.readyQ.count = 0; free( cltr->sched.io.tscs ); free( cltr->sched.caches ); } #define nested_offsetof(type, field) ((off_t)(&(((type*)0)-> field))) // Ctor void ?{}( __intrusive_lane_t & this ) { this.l.lock = false; this.l.prev = mock_head(this); this.l.anchor.next = 0p; this.l.anchor.ts = MAX; #if !defined(__CFA_NO_STATISTICS__) this.l.cnt = 0; #endif // We add a boat-load of assertions here because the anchor code is very fragile /* paranoid */ _Static_assert( offsetof( thread$, rdy_link ) == nested_offsetof(__intrusive_lane_t, l.anchor) ); /* paranoid */ verify( offsetof( thread$, rdy_link ) == nested_offsetof(__intrusive_lane_t, l.anchor) ); /* paranoid */ verify( ((uintptr_t)( mock_head(this) ) + offsetof( thread$, rdy_link )) == (uintptr_t)(&this.l.anchor) ); /* paranoid */ verify( &mock_head(this)->rdy_link.next == &this.l.anchor.next ); /* paranoid */ verify( &mock_head(this)->rdy_link.ts == &this.l.anchor.ts ); /* paranoid */ verify( mock_head(this)->rdy_link.next == 0p ); /* paranoid */ verify( mock_head(this)->rdy_link.ts == MAX ); /* paranoid */ verify( mock_head(this) == this.l.prev ); /* paranoid */ verify( __alignof__(__intrusive_lane_t) == 64 ); /* paranoid */ verify( __alignof__(this) == 64 ); /* paranoid */ verifyf( ((intptr_t)(&this) % 64) == 0, "Expected address to be aligned %p %% 64 == %zd", &this, ((intptr_t)(&this) % 64) ); } #undef nested_offsetof // Dtor is trivial void ^?{}( __intrusive_lane_t & this ) { // Make sure the list is empty /* paranoid */ verify( this.l.anchor.next == 0p ); /* paranoid */ verify( this.l.anchor.ts == MAX ); /* paranoid */ verify( mock_head(this) == this.l.prev ); }