#include "rq_bench.hfa" struct Result { uint64_t count; uint64_t dmigs; uint64_t gmigs; }; // ================================================== thread __attribute__((aligned(128))) MyThread { struct MyData * volatile data; struct { struct MySpot ** ptr; size_t len; } spots; bench_sem sem; Result result; bool share; size_t cnt; processor * ttid; size_t id; }; uint64_t moved(MyThread & this, processor * ttid) { if(this.ttid == ttid) { return 0; } this.ttid = ttid; return 1; } // ================================================== struct __attribute__((aligned(128))) MyData { uint64_t _p1[16]; // padding uint64_t * data; size_t len; processor * ttid; size_t id; uint64_t _p2[16]; // padding }; void ?{}(MyData & this, size_t id, size_t size) { this.len = size; this.data = alloc(this.len, 128`align); this.ttid = active_processor(); this.id = id; for(i; this.len) { this.data[i] = 0; } } uint64_t moved(MyData & this, processor * ttid) { if(this.ttid == ttid) { return 0; } this.ttid = ttid; return 1; } __attribute__((noinline)) void access(MyData & this, size_t idx) { size_t l = this.len; this.data[idx % l] += 1; } // ================================================== // Atomic object where a single thread can wait // May exchanges data struct __attribute__((aligned(128))) MySpot { MyThread * volatile ptr; size_t id; uint64_t _p1[16]; // padding }; void ?{}(MySpot & this, size_t id) { this.ptr = 0p; this.id = id; } // Main handshake of the code // Single seat, first thread arriving waits // Next threads unblocks current one and blocks in its place // if share == true, exchange data in the process bool put( MySpot & this, MyThread & ctx, MyData * data, bool share) { // Attempt to CAS our context into the seat for() { MyThread * expected = this.ptr; if (expected == 1p) { // Seat is closed, return return true; } if (__atomic_compare_exchange_n(&this.ptr, &expected, &ctx, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { if(expected) { if(share) { expected->data = data; } post( expected->sem ); } break; // We got the seat } } // Block once on the seat wait(ctx.sem); // Someone woke us up, get the new data return false; } // Shutdown the spot // Wake current thread and mark seat as closed void release( MySpot & this ) { MyThread * val = __atomic_exchange_n(&this.ptr, 1p, __ATOMIC_SEQ_CST); if (!val) { return; } // Someone was there, release them post( val->sem ); } // ================================================== // Do some work by accessing 'cnt' cells in the array __attribute__((noinline)) void work(MyData & data, size_t cnt, uint64_t & state) { for (cnt) { access(data, __xorshift64(state)); } } void main(MyThread & this) { uint64_t state = thread_rand(); // Wait for start wait(this.sem); // Main loop for() { // Touch our current data, write to invalidate remote cache lines work(*this.data, this.cnt, state); // Wait on a random spot uint64_t idx = __xorshift64(state) % this.spots.len; bool closed = put(*this.spots.ptr[idx], this, this.data, this.share); // Check if the experiment is over if (closed) break; if ( clock_mode && stop) break; if (!clock_mode && this.result.count >= stop_count) break; // Check everything is consistent verify(this.data); // write down progress and check migrations processor * ttid = active_processor(); this.result.count += 1; this.result.gmigs += moved(this, ttid); this.result.dmigs += moved(*this.data, ttid); } __atomic_fetch_add(&threads_left, -1, __ATOMIC_SEQ_CST); } void ?{}( MyThread & this, MyData * data, MySpot ** spots, size_t spot_len, size_t cnt, bool share, size_t id) { ((thread&)this){ bench_cluster }; this.data = data; this.spots.ptr = spots; this.spots.len = spot_len; (this.sem){}; this.result.count = 0; this.result.gmigs = 0; this.result.dmigs = 0; this.share = share; this.cnt = cnt; this.ttid = active_processor(); this.id = id; } // ================================================== int main(int argc, char * argv[]) { unsigned wsize = 2; unsigned wcnt = 2; unsigned nspots = 0; bool share = false; cfa_option opt[] = { BENCH_OPT, { 'n', "nspots", "Number of spots where threads sleep (nthreads - nspots are active at the same time)", nspots}, { 'w', "worksize", "Size of the array for each threads, in words (64bit)", wsize}, { 'c', "workcnt" , "Number of words to touch when working (random pick, cells can be picked more than once)", wcnt }, { 's', "share" , "Pass the work data to the next thread when blocking", share, parse_truefalse } }; BENCH_OPT_PARSE("cforall cycle benchmark"); unsigned long long global_count = 0; unsigned long long global_gmigs = 0; unsigned long long global_dmigs = 0; if( nspots == 0 ) { nspots = nthreads - nprocs; } Time start, end; { MyData * data_arrays[nthreads]; for(i; nthreads) { data_arrays[i] = malloc(); (*data_arrays[i]){ i, wsize }; } MySpot * spots[nspots]; for(i; nspots) { spots[i] = malloc(); (*spots[i]){ i }; } BenchCluster bc = { nprocs }; threads_left = nprocs; { MyThread * threads[nthreads]; for(i; nthreads) { threads[i] = malloc(); (*threads[i]){ data_arrays[i], spots, nspots, wcnt, share, i }; } bool is_tty = isatty(STDOUT_FILENO); start = getTimeNsec(); for(i; nthreads) { post( threads[i]->sem ); } wait(start, is_tty); stop = true; end = getTimeNsec(); printf("\nDone\n"); for(i; nthreads) { post( threads[i]->sem ); MyThread & thrd = join( *threads[i] ); global_count += thrd.result.count; global_gmigs += thrd.result.gmigs; global_dmigs += thrd.result.dmigs; } for(i; nthreads) { ^( *threads[i] ){}; free( threads[i] ); } } for(i; nthreads) { ^( *data_arrays[i] ){}; free( data_arrays[i] ); } for(i; nspots) { ^( *spots[i] ){}; free( spots[i] ); } } printf("Duration (ms) : %'lf\n", (end - start)`dms); printf("Number of processors : %'d\n", nprocs); printf("Number of threads : %'d\n", nthreads); printf("Total Operations(ops) : %'15llu\n", global_count); printf("Work size (64bit words): %'15u\n", wsize); printf("Total Operations(ops) : %'15llu\n", global_count); printf("Total G Migrations : %'15llu\n", global_gmigs); printf("Total D Migrations : %'15llu\n", global_dmigs); printf("Ops per second : %'18.2lf\n", ((double)global_count) / (end - start)`ds); printf("ns per ops : %'18.2lf\n", (end - start)`dns / global_count); printf("Ops per threads : %'15llu\n", global_count / nthreads); printf("Ops per procs : %'15llu\n", global_count / nprocs); printf("Ops/sec/procs : %'18.2lf\n", (((double)global_count) / nprocs) / (end - start)`ds); printf("ns per ops/procs : %'18.2lf\n", (end - start)`dns / (global_count / nprocs)); fflush(stdout); }