source: benchmark/readyQ/transfer.cfa @ 65ef0cd

Last change on this file since 65ef0cd was aec2c022, checked in by Thierry Delisle <tdelisle@…>, 2 years ago

Clean-up the benchmarks a little

  • Property mode set to 100644
File size: 3.6 KB
Line 
1#include "rq_bench.hfa"
2#include <fstream.hfa>
3#include <locale.h>
4
5Duration default_preemption() {
6        return 0;
7}
8
9#define PRINT(...)
10
11__uint128_t lead_seed;
12volatile unsigned leader;
13volatile size_t lead_idx;
14
15bool exhaust = false;
16volatile bool estop = false;
17
18
19thread$ * the_main;
20
21thread __attribute__((aligned(128))) MyThread {
22        unsigned id;
23        volatile size_t idx;
24        bench_sem sem;
25        size_t rechecks;
26};
27
28void ?{}( MyThread & this, unsigned id ) {
29        ((thread&)this){ bench_cluster };
30        this.id = id;
31        this.idx = 0;
32        this.rechecks = 0;
33}
34
35MyThread ** threads;
36
37static void waitgroup() {
38        Time start = timeHiRes();
39        OUTER: for(i; nthreads) {
40                PRINT( sout | "Waiting for :" | i | "(" | threads[i]->idx | ")"; )
41                while( threads[i]->idx != lead_idx ) {
42                        Pause();
43                        if( (timeHiRes() - start) > 5`s ) {
44                                print_stats_now( bench_cluster, CFA_STATS_READY_Q | CFA_STATS_IO );
45                                serr | "Programs has been blocked for more than 5 secs";
46                                estop = true;
47                                unpark( the_main );
48                                break OUTER;
49                        }
50                }
51        }
52        PRINT( sout | "Waiting done"; )
53}
54
55static void wakegroup(unsigned me) {
56        if(!exhaust) return;
57
58        for(i; nthreads) {
59                if(i!= me) post( threads[i]->sem );
60        }
61}
62
63static void lead(MyThread & this) {
64        this.idx = ++lead_idx;
65        if(lead_idx > stop_count || estop) {
66                PRINT( sout | "Leader" | this.id | "done"; )
67                unpark( the_main );
68                return;
69        }
70
71        PRINT( sout | "Leader no" | this.idx| ":" | this.id; )
72
73        waitgroup();
74
75        unsigned nleader = lehmer64( lead_seed ) % nthreads;
76        __atomic_store_n( &leader, nleader, __ATOMIC_SEQ_CST );
77
78        wakegroup(this.id);
79}
80
81static void wait(MyThread & this) {
82        yield();
83        if(lead_idx == this.idx) {
84                this.rechecks++;
85                return;
86        }
87
88        assert( (lead_idx - 1) == this.idx );
89        __atomic_add_fetch( &this.idx, 1, __ATOMIC_SEQ_CST );
90        if(exhaust) wait( this.sem );
91        else yield();
92}
93
94void main(MyThread & this) {
95        park();
96
97        unsigned me = this.id;
98
99        for() {
100                if(leader == me) {
101                        lead( this );
102                }
103                else {
104                        wait( this );
105                }
106                if(lead_idx > stop_count || estop) break;
107        }
108}
109
110// ==================================================
111int main(int argc, char * argv[]) {
112        uint64_t lead_seed = getpid();
113        for(10) lehmer64( lead_seed );
114        unsigned nprocs = 2;
115
116        cfa_option opt[] = {
117                BENCH_OPT,
118                { 'e', "exhaust", "Whether or not threads that have seen the new epoch should park instead of yielding.", exhaust, parse_yesno}
119        };
120        BENCH_OPT_PARSE("cforall transition benchmark");
121
122        if(clock_mode) {
123                serr | "This benchmark doesn't support duration mode";
124                return 1;
125        }
126
127        if(nprocs < 2) {
128                serr | "Must have at least 2 processors";
129                return 1;
130        }
131
132        lead_idx = 0;
133        leader = lehmer64( lead_seed ) % nthreads;
134
135        size_t rechecks = 0;
136        the_main = active_thread();
137
138        Time start, end;
139        {
140                BenchCluster bc = { nprocs };
141                {
142                        threads = alloc(nthreads);
143                        for(i; nthreads) {
144                                threads[i] = malloc();
145                                (*threads[i]){
146                                        i
147                                };
148                        }
149
150                        start = timeHiRes();
151                        for(i; nthreads) {
152                                unpark(*threads[i]);
153                        }
154
155                        park();
156                        end = timeHiRes();
157
158                        for(i; nthreads) {
159                                post(threads[i]->sem);
160                        }
161
162                        for(i; nthreads) {
163                                MyThread & thrd = join(*threads[i]);
164                                PRINT( sout | i | "joined"; )
165                                rechecks += thrd.rechecks;
166                                ^( *threads[i] ){};
167                                free(threads[i]);
168                        }
169
170                        free(threads);
171                }
172        }
173
174        setlocale( LC_NUMERIC, getenv( "LANG" ) );
175        sout | "Duration (ms)           : " | ws(3, 3, unit(eng((end - start)`dms)));
176        sout | "Number of processors    : " | nprocs;
177        sout | "Number of threads       : " | nthreads;
178        sout | "Total Operations(ops)   : " | lead_idx - 1;
179        sout | "Threads parking on wait : " | (exhaust ? "yes" : "no");
180        sout | "Rechecking              : " | rechecks;
181        sout | "us per transfer         : " | (end - start)`dus / lead_idx;
182
183
184}
Note: See TracBrowser for help on using the repository browser.