source: tests/concurrency/channels/churn.cfa @ 697c957

Last change on this file since 697c957 was c26bea2a, checked in by Peter A. Buhr <pabuhr@…>, 13 months ago

first attempt at renaming directory tests/concurrent to tests/concurrency to harmonize with other concurrency directory names

  • Property mode set to 100644
File size: 3.6 KB
Line 
1#include <locks.hfa>
2#include <fstream.hfa>
3#include <stdio.h>
4#include <string.h>
5#include <channel.hfa>
6#include <thread.hfa>
7#include <time.hfa>
8
9size_t Processors = 1, Channels = 4, Producers = 2, Consumers = 2, ChannelSize = 128;
10
11owner_lock o;
12
13size_t total_operations = 0;
14size_t cons_check = 0, prod_check = 0;
15
16channel( size_t ) * channels;
17thread Consumer {};
18
19void getRandArray( int * chanIndices ) {
20    for ( int i = 0; i < Channels; i++ ) {
21        chanIndices[i] = i;
22    }
23    for ( int i = 0; i < Channels; i++ ) {
24        int loc_1 = prng() % Channels;
25        int loc_2 = prng() % Channels;
26        int temp = chanIndices[ loc_1 ];
27        chanIndices[ loc_1 ] = chanIndices[ loc_2 ];
28        chanIndices[ loc_2 ] = temp;
29    }
30}
31
32void main(Consumer & this) {
33    size_t i = 0;
34    size_t runs = 0;
35    size_t my_check = 0;
36    int chanIndices[Channels];
37    getRandArray( chanIndices );
38    try {
39        for ( ;;i++ ) {
40            size_t j = remove( channels[ chanIndices[ i % Channels ] ] );
41            my_check = my_check ^ j;
42            runs++;
43        }
44    } catch ( channel_closed * e ) {}
45
46    // flush out rest of channels
47    for ( i; Channels ) {
48        try {
49            for ( ;; ) {
50                size_t j = remove( channels[ i ] );
51                my_check = my_check ^ j;
52                runs++;
53            }
54        } catchResume ( channel_closed * e ) {} // continue to remove until would block
55        catch ( channel_closed * e ) {}
56    }
57
58    lock(o);
59    total_operations += runs;
60    cons_check = cons_check ^ my_check;
61    // sout | "Cons: " | runs;
62    unlock(o);
63}
64
65thread Producer {};
66
67void main(Producer & this) {
68    size_t i = 0;
69    size_t runs = 0;
70    size_t my_check = 0;
71    int chanIndices[Channels];
72    getRandArray( chanIndices );
73    try {
74        for ( ;;i++ ) {
75            insert( channels[ chanIndices[ i % Channels ] ], i );
76            my_check = my_check ^ i;
77            runs++;
78        }
79    } catch ( channel_closed * e ) {}
80    lock(o);
81    total_operations += runs;
82    prod_check = prod_check ^ my_check;
83    // sout | "Prods: " | runs;
84    unlock(o);
85}
86
87
88int main( int argc, char *argv[] ) {
89    switch( argc ) {
90      case 4:
91                if ( strcmp( argv[3], "d" ) != 0 ) {                    // default ?
92                        if ( atoi( argv[3] ) < 1 ) goto Usage;
93                        ChannelSize = atoi( argv[3] );
94                } // if
95      case 3:
96                if ( strcmp( argv[2], "d" ) != 0 ) {                    // default ?
97                        if ( atoi( argv[2] ) < 1 ) goto Usage;
98                        Channels = atoi( argv[2] );
99                } // if
100      case 2:
101                if ( strcmp( argv[1], "d" ) != 0 ) {                    // default ?
102                        if ( atoi( argv[1] ) < 1 ) goto Usage;
103                        Processors = atoi( argv[1] );
104                } // if
105          case 1:                                                                                       // use defaults
106                break;
107          default:
108          Usage:
109                sout | "Usage: " | argv[0]
110             | " [ processors > 0 | d ]"
111             | " [ producers > 0 | d ]"
112             | " [ consumers > 0 | d ]"
113             | " [ channels > 0 | d ]";
114                exit( EXIT_FAILURE );
115    }
116    processor p[Processors - 1];
117    channels = aalloc( Channels );
118
119    // sout | "Processors: " | Processors | " Producers: " | Producers | " Consumers: " | Consumers | "Channels: " | Channels | " Channel Size: " | ChannelSize;
120
121    for ( i; Channels )
122        channels[i]{ ChannelSize };
123
124    sout | "start";
125    {   
126        Consumer c[Consumers];
127        {
128            Producer p[Producers];
129            sleep(10`s);
130            for ( i; Channels )
131                close( channels[i] );
132        }
133    }
134
135    adelete( channels );
136
137    if ( cons_check != prod_check )
138        sout | "CHECKSUM MISMATCH !!!";
139
140    // sout | total_operations;
141    // print_stats_now( *active_cluster(), CFA_STATS_READY_Q );
142
143    sout | "done";
144    return 0;
145}
Note: See TracBrowser for help on using the repository browser.