source: benchmark/readyQ/transfer.cfa @ ebb6158

ADTast-experimentalenumforall-pointer-decaypthread-emulationqualifiedEnum
Last change on this file since ebb6158 was ebb6158, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Minor fixes to warnings, printing and ridiculous go/rust requirements.

  • Property mode set to 100644
File size: 3.4 KB
Line 
1#include "rq_bench.hfa"
2#include <fstream.hfa>
3
4Duration default_preemption() {
5        return 0;
6}
7
8#define PRINT(...)
9
10__lehmer64_state_t lead_seed;
11volatile unsigned leader;
12volatile size_t lead_idx;
13
14bool exhaust = false;
15
16thread$ * the_main;
17
18thread __attribute__((aligned(128))) MyThread {
19        unsigned id;
20        volatile size_t idx;
21        bench_sem sem;
22        size_t rechecks;
23};
24
25void ?{}( MyThread & this, unsigned id ) {
26        ((thread&)this){ bench_cluster };
27        this.id = id;
28        this.idx = 0;
29        this.rechecks = 0;
30}
31
32MyThread ** threads;
33
34static void waitgroup() {
35        Time start = timeHiRes();
36        for(i; nthreads) {
37                PRINT( sout | "Waiting for :" | i | "(" | threads[i]->idx | ")"; )
38                while( threads[i]->idx != lead_idx ) {
39                        Pause();
40                        if( (timeHiRes() - start) > 5`s ) {
41                                print_stats_now( bench_cluster, CFA_STATS_READY_Q | CFA_STATS_IO );
42                                serr | "Programs has been blocked for more than 5 secs";
43                                exit(1);
44                        }
45                }
46        }
47        PRINT( sout | "Waiting done"; )
48}
49
50static void wakegroup(unsigned me) {
51        if(!exhaust) return;
52
53        for(i; nthreads) {
54                if(i!= me) post( threads[i]->sem );
55        }
56}
57
58static void lead(MyThread & this) {
59        this.idx = ++lead_idx;
60        if(lead_idx > stop_count) {
61                PRINT( sout | "Leader" | this.id | "done"; )
62                unpark( the_main );
63                return;
64        }
65
66        PRINT( sout | "Leader no" | this.idx| ":" | this.id; )
67
68        waitgroup();
69
70        unsigned nleader = __lehmer64( lead_seed ) % nthreads;
71        __atomic_store_n( &leader, nleader, __ATOMIC_SEQ_CST );
72
73        wakegroup(this.id);
74}
75
76static void wait(MyThread & this) {
77        yield();
78        if(lead_idx == this.idx) {
79                this.rechecks++;
80                return;
81        }
82
83        assert( (lead_idx - 1) == this.idx );
84        __atomic_add_fetch( &this.idx, 1, __ATOMIC_SEQ_CST );
85        if(exhaust) wait( this.sem );
86        else yield();
87}
88
89void main(MyThread & this) {
90        park();
91
92        unsigned me = this.id;
93
94        for() {
95                if(leader == me) {
96                        lead( this );
97                }
98                else {
99                        wait( this );
100                }
101                if(lead_idx > stop_count) break;
102        }
103}
104
105// ==================================================
106int main(int argc, char * argv[]) {
107        __lehmer64_state_t lead_seed = getpid();
108        for(10) __lehmer64( lead_seed );
109        unsigned nprocs = 2;
110
111        cfa_option opt[] = {
112                BENCH_OPT,
113                { 'e', "exhaust", "Whether or not threads that have seen the new epoch should park instead of yielding.", exhaust, parse_yesno}
114        };
115        BENCH_OPT_PARSE("cforall transition benchmark");
116
117        if(clock_mode) {
118                serr | "This benchmark doesn't support duration mode";
119                return 1;
120        }
121
122        if(nprocs < 2) {
123                serr | "Must have at least 2 processors";
124                return 1;
125        }
126
127        lead_idx = 0;
128        leader = __lehmer64( lead_seed ) % nthreads;
129
130        size_t rechecks = 0;
131        the_main = active_thread();
132
133        Time start, end;
134        {
135                BenchCluster bc = { nprocs };
136                {
137                        threads = alloc(nthreads);
138                        for(i; nthreads) {
139                                threads[i] = malloc();
140                                (*threads[i]){
141                                        i
142                                };
143                        }
144
145                        start = timeHiRes();
146                        for(i; nthreads) {
147                                unpark(*threads[i]);
148                        }
149
150                        park();
151                        end = timeHiRes();
152
153                        for(i; nthreads) {
154                                post(threads[i]->sem);
155                        }
156
157                        for(i; nthreads) {
158                                MyThread & thrd = join(*threads[i]);
159                                PRINT( sout | i | "joined"; )
160                                rechecks += thrd.rechecks;
161                                ^( *threads[i] ){};
162                                free(threads[i]);
163                        }
164
165                        free(threads);
166                }
167        }
168
169        sout | "Duration                : " | ws(3, 3, unit(eng((end - start)`ds))) | 's';
170        sout | "Number of processors    : " | nprocs;
171        sout | "Number of threads       : " | nthreads;
172        sout | "Total Operations(ops)   : " | stop_count;
173        sout | "Threads parking on wait : " | (exhaust ? "yes" : "no");
174        sout | "Rechecking              : " | rechecks;
175
176
177}
Note: See TracBrowser for help on using the repository browser.