source: tests/concurrent/channels/chan_big.cfa @ 4a962d8

ADTast-experimental
Last change on this file since 4a962d8 was 4a962d8, checked in by caparsons <caparson@…>, 17 months ago

added channel impl with basic test. Will expand impl and tests soon

  • Property mode set to 100644
File size: 5.5 KB
Line 
1#include <locks.hfa>
2#include <fstream.hfa>
3#include <stdio.h>
4#include <string.h>
5#include <channel.hfa>
6#include <thread.hfa>
7#include <time.hfa>
8#include <stats.hfa>
9size_t Processors = 10, Channels = 10, Producers = 40, Consumers = 40, ChannelSize = 128;
10
11owner_lock o;
12
13unsigned long long total_operations = 0;
14
15#define BIG 1
16
17struct bigObject {
18    size_t a;
19    size_t b;
20    size_t c;
21    size_t d;
22    size_t e;
23    size_t f;
24    size_t g;
25    size_t h;
26};
27
28void ?{}( bigObject & this, size_t i ) with(this) { a = i; b = i; c = i; d = i; e = i; f = i; g = i; h = i; }
29void ?{}( bigObject & this ) { this{0}; }
30
31#ifdef BIG
32typedef channel( bigObject ) Channel;
33#else
34typedef channel Channel;
35#endif
36
37Channel * channels;
38
39volatile bool cons_done = false, prod_done = false;
40size_t cons_check = 0, prod_check = 0;
41
42thread Consumer {
43    size_t i;
44};
45static inline void ?{}( Consumer & c, size_t i, cluster & clu ) {
46    ((thread &)c){ clu };
47    c.i = i;
48}
49void main(Consumer & this) {
50    unsigned long long runs = 0;
51    size_t my_check = 0;
52    for ( ;; ) {
53        if ( cons_done ) break;
54        #ifdef BIG
55        bigObject j = remove( channels[ this.i ] );
56        my_check = my_check ^ (j.a + j.b + j.c + j.d + j.d + j.e + j.f + j.g + j.h);
57        #else
58        size_t j = remove( channels[ this.i ] );
59        my_check = my_check ^ j;
60        #endif
61       
62        if ( !prod_done ) runs++;
63    }
64    lock(o);
65    total_operations += runs;
66    cons_check = cons_check ^ my_check;
67    // sout | "C: " | runs;
68    unlock(o);
69}
70
71thread Producer {
72    size_t i;
73};
74static inline void ?{}( Producer & p, size_t i, cluster & clu ) {
75    ((thread &)p){ clu };
76    p.i = i;
77}
78void main(Producer & this) {
79    unsigned long long runs = 0;
80    size_t my_check = 0;
81    for ( ;; ) {
82        if ( prod_done ) break;
83        #ifdef BIG
84        bigObject j{(size_t)runs};
85        insert( channels[ this.i ], j );
86        my_check = my_check ^ (j.a + j.b + j.c + j.d + j.d + j.e + j.f + j.g + j.h);
87        #else
88        insert( channels[ this.i ], (size_t)runs );
89        my_check = my_check ^ ((size_t)runs);
90        #endif
91        runs++;
92    }
93    lock(o);
94    total_operations += runs;
95    prod_check = prod_check ^ my_check;
96    // sout | "P: " | runs;
97    unlock(o);
98}
99
100
101int main( int argc, char *argv[] ) {
102    switch( argc ) {
103      case 6:
104                if ( strcmp( argv[5], "d" ) != 0 ) {                    // default ?
105                        if ( atoi( argv[5] ) < 1) goto Usage;
106                        ChannelSize = atoi( argv[5] );
107                } // if
108      case 5:
109                if ( strcmp( argv[4], "d" ) != 0 ) {                    // default ?
110                        if ( atoi( argv[4] ) < 1 ) goto Usage;
111                        Channels = atoi( argv[4] );
112                } // if
113      case 4:
114                if ( strcmp( argv[3], "d" ) != 0 ) {                    // default ?
115                        if ( atoi( argv[3] ) < 1 ) goto Usage;
116                        Consumers = atoi( argv[3] );
117                } // if
118      case 3:
119                if ( strcmp( argv[2], "d" ) != 0 ) {                    // default ?
120                        if ( atoi( argv[2] ) < 1 ) goto Usage;
121                        Producers = atoi( argv[2] );
122                } // if
123      case 2:
124                if ( strcmp( argv[1], "d" ) != 0 ) {                    // default ?
125                        if ( atoi( argv[1] ) < 1 ) goto Usage;
126                        Processors = atoi( argv[1] );
127                } // if
128          case 1:                                                                                       // use defaults
129                break;
130          default:
131          Usage:
132                sout | "Usage: " | argv[0]
133             | " [ processors > 0 | d ]"
134             | " [ ProdsPerChan > 0 | d ]"
135             | " [ ConsPerChan > 0 | d ]"
136             | " [ channels > 0 | d ]";
137                exit( EXIT_FAILURE );
138    }
139
140    size_t Clusters = 1;
141    // create a cluster
142    cluster clus[Clusters];
143    processor * proc[Processors];
144    for ( i; Processors ) {
145        (*(proc[i] = alloc())){clus[i % Clusters]};
146    }
147
148    channels = anew( Channels );
149
150    // sout | "Processors: " | Processors | " ProdsPerChan: " | Producers | " ConsPerChan: " | Consumers | "Channels: " | Channels | " Channel Size: " | ChannelSize;
151   
152    for ( i; Channels ) {
153        channels[i]{ ChannelSize };
154    }
155
156    sout | "start";
157    Consumer * c[Consumers * Channels];
158    Producer * p[Producers * Channels];
159
160    for ( i; Consumers * Channels ) {
161        (*(c[i] = alloc())){ i % Channels, clus[i % Clusters] };
162    }
163
164    for ( i; Producers * Channels ) {
165        (*(p[i] = alloc())){ i % Channels, clus[i % Clusters] };
166    }
167
168    sleep(10`s);
169    prod_done = true;
170
171    for ( i; Producers * Channels ) {
172        delete(p[i]);
173    }
174
175    sout | "prods";
176    cons_done = true;
177    for ( i; Channels ) {
178        // sout | get_count( channels[i] );
179        if ( get_count( channels[i] ) < Consumers ){
180            #ifdef BIG
181            bigObject b{0};
182            #endif
183            for ( j; Consumers ) {
184                #ifdef BIG
185                insert( channels[i], b );
186                #else
187                insert( channels[i], 0 );
188                #endif
189            }
190        }
191    }
192    sout | "cons";
193    for ( i; Consumers * Channels ) {
194        delete(c[i]);
195    }
196
197    sout | "flush";
198    for ( i; Channels ) {
199        for ( ;; ) {
200            if ( get_count( channels[i] ) > 0 ) {
201                #ifdef BIG
202                bigObject j = remove( channels[ i ] );
203                cons_check = cons_check ^ (j.a + j.b + j.c + j.d + j.d + j.e + j.f + j.g + j.h);
204                #else
205                size_t j = remove( channels[ i ] );
206                cons_check = cons_check ^ j;
207                #endif
208            } else break;
209        }
210    }
211
212    adelete( channels );
213    // sout | "total channel ops: " | total_operations;
214    if ( cons_check != prod_check )
215        sout | "CHECKSUM MISMATCH !!!";
216    // print_stats_now( *active_cluster(), CFA_STATS_READY_Q);
217
218    for ( i; Processors ) {
219        delete(proc[i]);
220    }
221    sout | "done";
222    return 0;
223}
Note: See TracBrowser for help on using the repository browser.