#include "rq_bench.hfa" #include #include Duration default_preemption() { return 0; } #define PRINT(...) __uint128_t lead_seed; volatile unsigned leader; volatile size_t lead_idx; bool exhaust = false; volatile bool estop = false; thread$ * the_main; thread __attribute__((aligned(128))) MyThread { unsigned id; volatile size_t idx; bench_sem sem; size_t rechecks; }; void ?{}( MyThread & this, unsigned id ) { ((thread&)this){ bench_cluster }; this.id = id; this.idx = 0; this.rechecks = 0; } MyThread ** threads; static void waitgroup() { Time start = timeHiRes(); OUTER: for(i; nthreads) { PRINT( sout | "Waiting for :" | i | "(" | threads[i]->idx | ")"; ) while( threads[i]->idx != lead_idx ) { Pause(); if( (timeHiRes() - start) > 5`s ) { print_stats_now( bench_cluster, CFA_STATS_READY_Q | CFA_STATS_IO ); serr | "Programs has been blocked for more than 5 secs"; estop = true; unpark( the_main ); break OUTER; } } } PRINT( sout | "Waiting done"; ) } static void wakegroup(unsigned me) { if(!exhaust) return; for(i; nthreads) { if(i!= me) post( threads[i]->sem ); } } static void lead(MyThread & this) { this.idx = ++lead_idx; if(lead_idx > stop_count || estop) { PRINT( sout | "Leader" | this.id | "done"; ) unpark( the_main ); return; } PRINT( sout | "Leader no" | this.idx| ":" | this.id; ) waitgroup(); unsigned nleader = lehmer64( lead_seed ) % nthreads; __atomic_store_n( &leader, nleader, __ATOMIC_SEQ_CST ); wakegroup(this.id); } static void wait(MyThread & this) { yield(); if(lead_idx == this.idx) { this.rechecks++; return; } assert( (lead_idx - 1) == this.idx ); __atomic_add_fetch( &this.idx, 1, __ATOMIC_SEQ_CST ); if(exhaust) wait( this.sem ); else yield(); } void main(MyThread & this) { park(); unsigned me = this.id; for() { if(leader == me) { lead( this ); } else { wait( this ); } if(lead_idx > stop_count || estop) break; } } // ================================================== int main(int argc, char * argv[]) { uint64_t lead_seed = getpid(); for(10) lehmer64( lead_seed ); unsigned nprocs = 2; cfa_option opt[] = { BENCH_OPT, { 'e', "exhaust", "Whether or not threads that have seen the new epoch should park instead of yielding.", exhaust, parse_yesno} }; BENCH_OPT_PARSE("cforall transition benchmark"); if(clock_mode) { serr | "This benchmark doesn't support duration mode"; return 1; } if(nprocs < 2) { serr | "Must have at least 2 processors"; return 1; } lead_idx = 0; leader = lehmer64( lead_seed ) % nthreads; size_t rechecks = 0; the_main = active_thread(); Time start, end; { BenchCluster bc = { nprocs }; { threads = alloc(nthreads); for(i; nthreads) { threads[i] = malloc(); (*threads[i]){ i }; } start = timeHiRes(); for(i; nthreads) { unpark(*threads[i]); } park(); end = timeHiRes(); for(i; nthreads) { post(threads[i]->sem); } for(i; nthreads) { MyThread & thrd = join(*threads[i]); PRINT( sout | i | "joined"; ) rechecks += thrd.rechecks; ^( *threads[i] ){}; free(threads[i]); } free(threads); } } setlocale( LC_NUMERIC, getenv( "LANG" ) ); sout | "Duration (ms) : " | ws(3, 3, unit(eng((end - start)`dms))); sout | "Number of processors : " | nprocs; sout | "Number of threads : " | nthreads; sout | "Total Operations(ops) : " | lead_idx - 1; sout | "Threads parking on wait : " | (exhaust ? "yes" : "no"); sout | "Rechecking : " | rechecks; sout | "us per transfer : " | (end - start)`dus / lead_idx; }