source: tests/concurrency/channels/churn.cfa @ 70670e7

Last change on this file since 70670e7 was 50be8af5, checked in by Peter A. Buhr <pabuhr@…>, 16 months ago

clean up command-line handling and I/O

  • Property mode set to 100644
File size: 3.6 KB
Line 
1#include <locks.hfa>
2#include <fstream.hfa>
3#include <stdio.h>
4#include <string.h>
5#include <channel.hfa>
6#include <thread.hfa>
7#include <time.hfa>
8
9ssize_t Processors = 1, Channels = 4, Producers = 2, Consumers = 2, ChannelSize = 128;
10
11owner_lock o;
12
13size_t total_operations = 0;
14size_t cons_check = 0, prod_check = 0;
15
16channel( size_t ) * channels;
17thread Consumer {};
18
19void getRandArray( int * chanIndices ) {
20    for ( int i = 0; i < Channels; i++ ) {
21        chanIndices[i] = i;
22    }
23    for ( int i = 0; i < Channels; i++ ) {
24        int loc_1 = prng() % Channels;
25        int loc_2 = prng() % Channels;
26        int temp = chanIndices[ loc_1 ];
27        chanIndices[ loc_1 ] = chanIndices[ loc_2 ];
28        chanIndices[ loc_2 ] = temp;
29    }
30}
31
32void main(Consumer & this) {
33    size_t i = 0;
34    size_t runs = 0;
35    size_t my_check = 0;
36    int chanIndices[Channels];
37    getRandArray( chanIndices );
38    try {
39        for ( ;;i++ ) {
40            size_t j = remove( channels[ chanIndices[ i % Channels ] ] );
41            my_check = my_check ^ j;
42            runs++;
43        }
44    } catch ( channel_closed * e ) {}
45
46    // flush out rest of channels
47    for ( i; Channels ) {
48        try {
49            for ( ;; ) {
50                size_t j = remove( channels[ i ] );
51                my_check = my_check ^ j;
52                runs++;
53            }
54        } catchResume ( channel_closed * e ) {} // continue to remove until would block
55        catch ( channel_closed * e ) {}
56    }
57
58    lock(o);
59    total_operations += runs;
60    cons_check = cons_check ^ my_check;
61    // sout | "Cons: " | runs;
62    unlock(o);
63}
64
65thread Producer {};
66
67void main(Producer & this) {
68    size_t i = 0;
69    size_t runs = 0;
70    size_t my_check = 0;
71    int chanIndices[Channels];
72    getRandArray( chanIndices );
73    try {
74        for ( ;;i++ ) {
75            insert( channels[ chanIndices[ i % Channels ] ], i );
76            my_check = my_check ^ i;
77            runs++;
78        }
79    } catch ( channel_closed * e ) {}
80    lock(o);
81    total_operations += runs;
82    prod_check = prod_check ^ my_check;
83    // sout | "Prods: " | runs;
84    unlock(o);
85}
86
87
88int main( int argc, char *argv[] ) {
89    switch( argc ) {
90      case 4:
91                if ( strcmp( argv[3], "d" ) != 0 ) {                    // default ?
92                        ChannelSize = ato( argv[3] );
93                        if ( ChannelSize < 1 ) fallthru default;
94                } // if
95      case 3:
96                if ( strcmp( argv[2], "d" ) != 0 ) {                    // default ?
97                        Channels = ato( argv[2] );
98                        if ( Channels < 1 ) fallthru default;
99                } // if
100      case 2:
101                if ( strcmp( argv[1], "d" ) != 0 ) {                    // default ?
102                        Processors = ato( argv[1] );
103                        if ( Processors < 1 ) fallthru default;
104                } // if
105          case 1:                                                                                       // use defaults
106                break;
107          default:
108                exit | "Usage: " | argv[0]
109             | " [ processors > 0 | d ]"
110             | " [ producers > 0 | d ]"
111             | " [ consumers > 0 | d ]"
112             | " [ channels > 0 | d ]";
113    }
114    processor p[Processors - 1];
115    channels = aalloc( Channels );
116
117    // sout | "Processors: " | Processors | " Producers: " | Producers | " Consumers: " | Consumers | "Channels: " | Channels | " Channel Size: " | ChannelSize;
118
119    for ( i; Channels )
120        channels[i]{ ChannelSize };
121
122    sout | "start";
123    {   
124        Consumer c[Consumers];
125        {
126            Producer p[Producers];
127            sleep(10`s);
128            for ( i; Channels )
129                close( channels[i] );
130        }
131    }
132
133    adelete( channels );
134
135    if ( cons_check != prod_check )
136        sout | "CHECKSUM MISMATCH !!!";
137
138    // sout | total_operations;
139    // print_stats_now( *active_cluster(), CFA_STATS_READY_Q );
140
141    sout | "done";
142    return 0;
143}
Note: See TracBrowser for help on using the repository browser.