#include "rq_bench.hfa" #include Duration default_preemption() { return 0; } #define PRINT(...) __lehmer64_state_t lead_seed; volatile unsigned leader; volatile size_t lead_idx; bool exhaust = false; $thread * the_main; thread __attribute__((aligned(128))) MyThread { unsigned id; volatile size_t idx; bench_sem sem; size_t rechecks; }; void ?{}( MyThread & this, unsigned id ) { ((thread&)this){ bench_cluster }; this.id = id; this.idx = 0; this.rechecks = 0; } MyThread ** threads; static void waitgroup() { Time start = timeHiRes(); for(i; nthreads) { PRINT( sout | "Waiting for :" | i | "(" | threads[i]->idx | ")"; ) while( threads[i]->idx != lead_idx ) { Pause(); if( (timeHiRes() - start) > 5`s ) { serr | "Programs has been blocked for more than 5 secs"; exit(1); } } } PRINT( sout | "Waiting done"; ) } static void wakegroup(unsigned me) { if(!exhaust) return; for(i; nthreads) { if(i!= me) post( threads[i]->sem ); } } static void lead(MyThread & this) { this.idx = ++lead_idx; if(lead_idx > stop_count) { PRINT( sout | "Leader" | this.id | "done"; ) unpark( the_main ); return; } PRINT( sout | "Leader no" | this.idx| ":" | this.id; ) waitgroup(); unsigned nleader = __lehmer64( lead_seed ) % nthreads; __atomic_store_n( &leader, nleader, __ATOMIC_SEQ_CST ); wakegroup(this.id); } static void wait(MyThread & this) { yield(); if(lead_idx == this.idx) { this.rechecks++; return; } assert( (lead_idx - 1) == this.idx ); __atomic_add_fetch( &this.idx, 1, __ATOMIC_SEQ_CST ); if(exhaust) wait( this.sem ); else yield(); } void main(MyThread & this) { park(); unsigned me = this.id; for() { if(leader == me) { lead( this ); } else { wait( this ); } if(lead_idx > stop_count) break; } } // ================================================== int main(int argc, char * argv[]) { __lehmer64_state_t lead_seed = getpid(); for(10) __lehmer64( lead_seed ); unsigned nprocs = 2; cfa_option opt[] = { BENCH_OPT, { 'e', "exhaust", "Whether or not threads that have seen the new epoch should yield or park.", exhaust, parse_yesno} }; BENCH_OPT_PARSE("cforall transition benchmark"); if(clock_mode) { serr | "This benchmark doesn't support duration mode"; return 1; } if(nprocs < 2) { serr | "Must have at least 2 processors"; return 1; } lead_idx = 0; leader = __lehmer64( lead_seed ) % nthreads; size_t rechecks = 0; the_main = active_thread(); Time start, end; { BenchCluster bc = { nprocs }; { threads = alloc(nthreads); for(i; nthreads) { threads[i] = malloc(); (*threads[i]){ i }; } start = timeHiRes(); for(i; nthreads) { unpark(*threads[i]); } park(); end = timeHiRes(); for(i; nthreads) { post(threads[i]->sem); } for(i; nthreads) { MyThread & thrd = join(*threads[i]); PRINT( sout | i | "joined"; ) rechecks += thrd.rechecks; ^( *threads[i] ){}; free(threads[i]); } free(threads); } } sout | "Duration : " | ws(3, 3, unit(eng((end - start)`ds))) | 's'; sout | "Number of processors : " | nprocs; sout | "Number of threads : " | nthreads; sout | "Total Operations(ops) : " | stop_count; sout | "Threads parking on wait : " | (exhaust ? "yes" : "no"); sout | "Rechecking : " | rechecks; }