source: tests/concurrency/futures/select_future.cfa @ 41882628

Last change on this file since 41882628 was c26bea2a, checked in by Peter A. Buhr <pabuhr@…>, 18 months ago

first attempt at renaming directory tests/concurrent to tests/concurrency to harmonize with other concurrency directory names

  • Property mode set to 100644
File size: 3.1 KB
Line 
1#include <thread.hfa>
2#include <future.hfa>
3#include <concurrency/barrier.hfa>
4
5enum {NFUTURES = 10};
6
7thread Server {
8        int pending, done, iteration;
9        future(int) * request;
10};
11
12void ?{}( Server & this ) {
13        ((thread&)this){"Server Thread"};
14        this.pending = 0;
15        this.done = 0;
16        this.iteration = 0;
17        this.request = 0p;
18}
19
20void ^?{}( Server & mutex this ) {
21        assert(this.pending == 0);
22        this.request = 0p;
23}
24
25void init( Server & this , future(int) * f ) {
26        this.request = f;
27}
28
29void call( Server & mutex this ) {
30        this.pending++;
31}
32
33void finish( Server & mutex this ) {
34        this.done++;
35}
36
37void main( Server & this ) {
38        MAIN_LOOP:
39        for() {
40                waitfor( ^?{} : this ) {
41                        break;
42                }
43                or waitfor( call: this ) {
44                        if (this.pending != NFUTURES) { continue MAIN_LOOP; }
45
46                        this.pending = 0;
47                        fulfil( *this.request, this.iteration );
48                        this.iteration++;
49
50                        for(NFUTURES) {
51                                waitfor( finish: this );
52                        }
53
54                        reset( *this.request );
55                        this.done = 0;
56                }
57        }
58
59}
60
61Server * the_server;
62thread Worker {};
63void ?{}(Worker & this) {
64        ((thread&)this){"Worker Thread"};
65}
66
67future(int) * shared_future;
68
69void thrash(void) {
70        volatile int locals[250];
71        for(i; 250) {
72                locals[i] = 0xdeadbeef;
73        }
74}
75
76void work(int num) {
77        call( *the_server );
78        int res = get( *shared_future );
79        if( res != num ) abort();
80        finish( *the_server );
81}
82
83void main( Worker & ) {
84        for (i; 10) {
85                thrash();
86                work(i);
87                thrash();
88        }
89}
90
91thread Worker2 {};
92
93semaphore before{0};
94semaphore after_server{0};
95semaphore after_worker{0};
96
97void work2( int num ) {
98    P( before );
99        int res = get( *shared_future );
100        if( res != num ) abort();
101        V( after_server );
102    P( after_worker );
103}
104
105void main( Worker2 & ) {
106        for (i; 10) {
107                thrash();
108                work2(i);
109                thrash();
110        }
111}
112
113thread Server2 {};
114
115void main( Server2 & ) {
116        for (i; 10) {
117                fulfil( *shared_future , i );
118        V( before, NFUTURES );
119        for ( i; NFUTURES ) P( after_server );
120        reset( *shared_future );
121        V( after_worker, NFUTURES );
122        }
123}
124
125barrier bar = { NFUTURES + 1 };
126
127thread Worker3 {};
128
129void work3( int num ) {
130    [int, bool] tt;
131    do {
132        tt = try_get( *shared_future );
133    } while ( ! tt.1 );
134        if( tt.0 != num ) abort();
135        V( after_server );
136    block(bar);
137}
138
139void main( Worker3 & ) {
140        for (i; 10) {
141                thrash();
142                work3(i);
143                thrash();
144        }
145}
146
147thread Server3 {};
148
149void main( Server3 & ) {
150        for (i; 10) {
151                fulfil( *shared_future , i );
152        for ( i; NFUTURES ) P( after_server );
153        reset( *shared_future );
154        block(bar);
155        }
156}
157
158int main() {
159        printf( "start 1: blocking path future test\n" );
160        processor procs[11];
161        shared_future = new();
162        {
163                Server server;
164                the_server = &server;
165                init(server, shared_future);
166                {
167                        Worker workers[NFUTURES];
168                }
169        }
170        delete( shared_future );
171        printf( "done 1\n" );
172
173    printf( "start 2: nonblocking path future test\n" );
174    shared_future = new();
175
176    {
177        Server2 server;
178                {
179                        Worker2 workers[NFUTURES];
180                }
181        }
182
183    delete( shared_future );
184        printf( "done 2\n" );
185
186    printf( "start 3: try_get future test\n" );
187    shared_future = new();
188
189    {
190        Worker3 workers[NFUTURES];
191                {
192                        Server3 server;
193                }
194        }
195
196    delete( shared_future );
197        printf( "done 3\n" );
198}
Note: See TracBrowser for help on using the repository browser.