source: tests/concurrent/channels/parallel_harness.hfa @ 42b739d7

ADTast-experimental
Last change on this file since 42b739d7 was 42b739d7, checked in by caparsons <caparson@…>, 22 months ago

fixed termination synchronization in the channel benchmark to get rid of deadlock case

  • Property mode set to 100644
File size: 4.9 KB
Line 
1#include <locks.hfa>
2#include <fstream.hfa>
3#include <stdio.h>
4#include <string.h>
5#include <channel.hfa>
6#include <thread.hfa>
7#include <time.hfa>
8#include <stats.hfa>
9
10// user defines this
11// #define BIG 1
12
13owner_lock o;
14
15unsigned long long total_operations = 0;
16
17struct bigObject {
18    size_t a;
19    size_t b;
20    size_t c;
21    size_t d;
22    size_t e;
23    size_t f;
24    size_t g;
25    size_t h;
26};
27
28void ?{}( bigObject & this, size_t i ) with(this) { a = i; b = i; c = i; d = i; e = i; f = i; g = i; h = i; }
29void ?{}( bigObject & this ) { this{0}; }
30
31#ifdef BIG
32typedef channel( bigObject ) Channel;
33#else
34typedef channel( size_t ) Channel;
35#endif
36
37Channel * channels;
38
39volatile bool cons_done = false, prod_done = false;
40volatile int cons_done_count = 0;
41size_t cons_check = 0, prod_check = 0;
42
43thread Consumer {
44    size_t i;
45};
46static inline void ?{}( Consumer & c, size_t i, cluster & clu ) {
47    ((thread &)c){ clu };
48    c.i = i;
49}
50void main(Consumer & this) {
51    unsigned long long runs = 0;
52    size_t my_check = 0;
53    for ( ;; ) {
54        if ( cons_done ) break;
55        #ifdef BIG
56        bigObject j = remove( channels[ this.i ] );
57        my_check = my_check ^ (j.a + j.b + j.c + j.d + j.d + j.e + j.f + j.g + j.h);
58        #else
59        size_t j = remove( channels[ this.i ] );
60        my_check = my_check ^ j;
61        #endif
62       
63        if ( !prod_done ) runs++;
64    }
65    lock(o);
66    total_operations += runs;
67    cons_done_count++;
68    cons_check = cons_check ^ my_check;
69    // sout | "C: " | runs;
70    unlock(o);
71}
72
73thread Producer {
74    size_t i;
75};
76static inline void ?{}( Producer & p, size_t i, cluster & clu ) {
77    ((thread &)p){ clu };
78    p.i = i;
79}
80void main(Producer & this) {
81    unsigned long long runs = 0;
82    size_t my_check = 0;
83    for ( ;; ) {
84        if ( prod_done ) break;
85        #ifdef BIG
86        bigObject j{(size_t)runs};
87        insert( channels[ this.i ], j );
88        my_check = my_check ^ (j.a + j.b + j.c + j.d + j.d + j.e + j.f + j.g + j.h);
89        #else
90        insert( channels[ this.i ], (size_t)runs );
91        my_check = my_check ^ ((size_t)runs);
92        #endif
93        runs++;
94    }
95    lock(o);
96    total_operations += runs;
97    prod_check = prod_check ^ my_check;
98    // sout | "P: " | runs;
99    unlock(o);
100}
101
102
103int test( size_t Processors, size_t Channels, size_t Producers, size_t Consumers, size_t ChannelSize ) {
104    size_t Clusters = 1;
105    // create a cluster
106    cluster clus[Clusters];
107    processor * proc[Processors];
108    for ( i; Processors ) {
109        (*(proc[i] = alloc())){clus[i % Clusters]};
110    }
111
112    channels = anew( Channels );
113
114    // sout | "Processors: " | Processors | " ProdsPerChan: " | Producers | " ConsPerChan: " | Consumers | "Channels: " | Channels | " Channel Size: " | ChannelSize;
115   
116    for ( i; Channels ) {
117        channels[i]{ ChannelSize };
118    }
119
120    sout | "start";
121    Consumer * c[Consumers * Channels];
122    Producer * p[Producers * Channels];
123
124    for ( i; Consumers * Channels ) {
125        (*(c[i] = alloc())){ i % Channels, clus[i % Clusters] };
126    }
127
128    for ( i; Producers * Channels ) {
129        (*(p[i] = alloc())){ i % Channels, clus[i % Clusters] };
130    }
131
132    sleep(1`s);
133    prod_done = true;
134
135    for ( i; Producers * Channels ) {
136        delete(p[i]);
137    }
138
139    sout | "prods";
140    cons_done = true;
141    while( cons_done_count != Consumers * Channels ) {
142        for ( i; Channels ) {
143            if ( has_waiting_consumers( channels[i] ) ){
144                #ifdef BIG
145                bigObject b{0};
146                insert( channels[i], b );
147                #else
148                insert( channels[i], 0 );
149                #endif
150            }
151        }
152       
153    }
154    // for ( i; Channels ) {
155    //     // sout | get_count( channels[i] );
156    //     if ( get_count( channels[i] ) < Consumers ){
157    //         #ifdef BIG
158    //         bigObject b{0};
159    //         #endif
160    //         for ( j; Consumers ) {
161    //             #ifdef BIG
162    //             insert( channels[i], b );
163    //             #else
164    //             insert( channels[i], 0 );
165    //             #endif
166    //         }
167    //     }
168    // }
169    sout | "cons";
170    for ( i; Consumers * Channels ) {
171        delete(c[i]);
172    }
173
174    sout | "flush";
175    for ( i; Channels ) {
176        for ( ;; ) {
177            if ( get_count( channels[i] ) > 0 ) {
178                #ifdef BIG
179                bigObject j = remove( channels[ i ] );
180                cons_check = cons_check ^ (j.a + j.b + j.c + j.d + j.d + j.e + j.f + j.g + j.h);
181                #else
182                size_t j = remove( channels[ i ] );
183                cons_check = cons_check ^ j;
184                #endif
185            } else break;
186        }
187    }
188
189    adelete( channels );
190    // sout | "total channel ops: " | total_operations;
191    if ( cons_check != prod_check )
192        sout | "CHECKSUM MISMATCH !!!";
193    // print_stats_now( *active_cluster(), CFA_STATS_READY_Q);
194
195    for ( i; Processors ) {
196        delete(proc[i]);
197    }
198    sout | "done";
199    return 0;
200}
Note: See TracBrowser for help on using the repository browser.