source: benchmark/readyQ/transfer.cfa @ 850aff1

ADTast-experimentalenumforall-pointer-decaypthread-emulationqualifiedEnum
Last change on this file since 850aff1 was adfd125, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Changed benchmark to use sout and commas.

  • Property mode set to 100644
File size: 3.4 KB
Line 
1#include "rq_bench.hfa"
2#include <fstream.hfa>
3#include <locale.h>
4
5Duration default_preemption() {
6        return 0;
7}
8
9#define PRINT(...)
10
11__uint128_t lead_seed;
12volatile unsigned leader;
13volatile size_t lead_idx;
14
15bool exhaust = false;
16
17thread$ * the_main;
18
19thread __attribute__((aligned(128))) MyThread {
20        unsigned id;
21        volatile size_t idx;
22        bench_sem sem;
23        size_t rechecks;
24};
25
26void ?{}( MyThread & this, unsigned id ) {
27        ((thread&)this){ bench_cluster };
28        this.id = id;
29        this.idx = 0;
30        this.rechecks = 0;
31}
32
33MyThread ** threads;
34
35static void waitgroup() {
36        Time start = timeHiRes();
37        for(i; nthreads) {
38                PRINT( sout | "Waiting for :" | i | "(" | threads[i]->idx | ")"; )
39                while( threads[i]->idx != lead_idx ) {
40                        Pause();
41                        if( (timeHiRes() - start) > 5`s ) {
42                                print_stats_now( bench_cluster, CFA_STATS_READY_Q | CFA_STATS_IO );
43                                serr | "Programs has been blocked for more than 5 secs";
44                                exit(1);
45                        }
46                }
47        }
48        PRINT( sout | "Waiting done"; )
49}
50
51static void wakegroup(unsigned me) {
52        if(!exhaust) return;
53
54        for(i; nthreads) {
55                if(i!= me) post( threads[i]->sem );
56        }
57}
58
59static void lead(MyThread & this) {
60        this.idx = ++lead_idx;
61        if(lead_idx > stop_count) {
62                PRINT( sout | "Leader" | this.id | "done"; )
63                unpark( the_main );
64                return;
65        }
66
67        PRINT( sout | "Leader no" | this.idx| ":" | this.id; )
68
69        waitgroup();
70
71        unsigned nleader = lehmer64( lead_seed ) % nthreads;
72        __atomic_store_n( &leader, nleader, __ATOMIC_SEQ_CST );
73
74        wakegroup(this.id);
75}
76
77static void wait(MyThread & this) {
78        yield();
79        if(lead_idx == this.idx) {
80                this.rechecks++;
81                return;
82        }
83
84        assert( (lead_idx - 1) == this.idx );
85        __atomic_add_fetch( &this.idx, 1, __ATOMIC_SEQ_CST );
86        if(exhaust) wait( this.sem );
87        else yield();
88}
89
90void main(MyThread & this) {
91        park();
92
93        unsigned me = this.id;
94
95        for() {
96                if(leader == me) {
97                        lead( this );
98                }
99                else {
100                        wait( this );
101                }
102                if(lead_idx > stop_count) break;
103        }
104}
105
106// ==================================================
107int main(int argc, char * argv[]) {
108        uint64_t lead_seed = getpid();
109        for(10) lehmer64( lead_seed );
110        unsigned nprocs = 2;
111
112        cfa_option opt[] = {
113                BENCH_OPT,
114                { 'e', "exhaust", "Whether or not threads that have seen the new epoch should park instead of yielding.", exhaust, parse_yesno}
115        };
116        BENCH_OPT_PARSE("cforall transition benchmark");
117
118        if(clock_mode) {
119                serr | "This benchmark doesn't support duration mode";
120                return 1;
121        }
122
123        if(nprocs < 2) {
124                serr | "Must have at least 2 processors";
125                return 1;
126        }
127
128        lead_idx = 0;
129        leader = lehmer64( lead_seed ) % nthreads;
130
131        size_t rechecks = 0;
132        the_main = active_thread();
133
134        Time start, end;
135        {
136                BenchCluster bc = { nprocs };
137                {
138                        threads = alloc(nthreads);
139                        for(i; nthreads) {
140                                threads[i] = malloc();
141                                (*threads[i]){
142                                        i
143                                };
144                        }
145
146                        start = timeHiRes();
147                        for(i; nthreads) {
148                                unpark(*threads[i]);
149                        }
150
151                        park();
152                        end = timeHiRes();
153
154                        for(i; nthreads) {
155                                post(threads[i]->sem);
156                        }
157
158                        for(i; nthreads) {
159                                MyThread & thrd = join(*threads[i]);
160                                PRINT( sout | i | "joined"; )
161                                rechecks += thrd.rechecks;
162                                ^( *threads[i] ){};
163                                free(threads[i]);
164                        }
165
166                        free(threads);
167                }
168        }
169
170        setlocale( LC_NUMERIC, getenv( "LANG" ) );
171        sout | "Duration (ms)           : " | ws(3, 3, unit(eng((end - start)`dms)));
172        sout | "Number of processors    : " | nprocs;
173        sout | "Number of threads       : " | nthreads;
174        sout | "Total Operations(ops)   : " | stop_count;
175        sout | "Threads parking on wait : " | (exhaust ? "yes" : "no");
176        sout | "Rechecking              : " | rechecks;
177
178
179}
Note: See TracBrowser for help on using the repository browser.