// // Cforall Version 1.0.0 Copyright (C) 2022 University of Waterloo // // The contents of this file are covered under the licence agreement in the // file "LICENCE" distributed with Cforall. // // concurrent/readyQ/leader_spin.cfa -- validates ready queue fairness // // Author : Thierry Delisle // Created On : Fri Apr 01 11:39:09 2022 // Last Modified By : // Last Modified On : // Update Count : // #include #include #include // Test validates that all cores being constantly active doesn't prevent starvation. // This test is very similar to the transfer benchmark, without all the benchmark bells and whistle. // Deactivate preemption because it could fix starvation issues. Duration default_preemption() { return 0; } PRNG lead_rng; volatile unsigned leader; volatile size_t lead_idx; const unsigned nthreads = 17; const unsigned stop_count = 327; thread$ * the_main; thread __attribute__((aligned(128))) MyThread { unsigned id; volatile size_t idx; }; void ?{}( MyThread & this, unsigned id ) { this.id = id; this.idx = 0; } MyThread ** threads; static void waitgroup() { for(i; nthreads) { while( threads[i]->idx != lead_idx ) { Pause(); } } } static void lead(MyThread & this) { this.idx = ++lead_idx; if(lead_idx > stop_count) { sout | "Leader done"; unpark( the_main ); return; } waitgroup(); unsigned nleader = prng( lead_rng, nthreads ); __atomic_store_n( &leader, nleader, __ATOMIC_SEQ_CST ); } static void wait(MyThread & this) { yield(); if(lead_idx == this.idx) { return; } assert( (lead_idx - 1) == this.idx ); __atomic_add_fetch( &this.idx, 1, __ATOMIC_SEQ_CST ); yield(); } void main(MyThread & this) { park(); unsigned me = this.id; for() { if(leader == me) { lead( this ); } else { wait( this ); } if(lead_idx > stop_count) break; } } // ================================================== int main(int argc, char * argv[]) { lead_idx = 0; leader = prng( lead_rng, nthreads ); the_main = active_thread(); processor procs[2]; { threads = alloc(nthreads); for(i; nthreads) { threads[i] = malloc(); (*threads[i]){ i }; } for(i; nthreads) unpark(*threads[i]); park(); for(i; nthreads) { MyThread * thrd = threads[i]; join(*thrd); ^( *thrd ){}; free(thrd); } free(threads); } sout | "done"; }