source: tests/concurrency/channels/churn.cfa@ e6128959

Last change on this file since e6128959 was 50be8af5, checked in by Peter A. Buhr <pabuhr@…>, 2 years ago

clean up command-line handling and I/O

  • Property mode set to 100644
File size: 3.6 KB
Line 
1#include <locks.hfa>
2#include <fstream.hfa>
3#include <stdio.h>
4#include <string.h>
5#include <channel.hfa>
6#include <thread.hfa>
7#include <time.hfa>
8
9ssize_t Processors = 1, Channels = 4, Producers = 2, Consumers = 2, ChannelSize = 128;
10
11owner_lock o;
12
13size_t total_operations = 0;
14size_t cons_check = 0, prod_check = 0;
15
16channel( size_t ) * channels;
17thread Consumer {};
18
19void getRandArray( int * chanIndices ) {
20 for ( int i = 0; i < Channels; i++ ) {
21 chanIndices[i] = i;
22 }
23 for ( int i = 0; i < Channels; i++ ) {
24 int loc_1 = prng() % Channels;
25 int loc_2 = prng() % Channels;
26 int temp = chanIndices[ loc_1 ];
27 chanIndices[ loc_1 ] = chanIndices[ loc_2 ];
28 chanIndices[ loc_2 ] = temp;
29 }
30}
31
32void main(Consumer & this) {
33 size_t i = 0;
34 size_t runs = 0;
35 size_t my_check = 0;
36 int chanIndices[Channels];
37 getRandArray( chanIndices );
38 try {
39 for ( ;;i++ ) {
40 size_t j = remove( channels[ chanIndices[ i % Channels ] ] );
41 my_check = my_check ^ j;
42 runs++;
43 }
44 } catch ( channel_closed * e ) {}
45
46 // flush out rest of channels
47 for ( i; Channels ) {
48 try {
49 for ( ;; ) {
50 size_t j = remove( channels[ i ] );
51 my_check = my_check ^ j;
52 runs++;
53 }
54 } catchResume ( channel_closed * e ) {} // continue to remove until would block
55 catch ( channel_closed * e ) {}
56 }
57
58 lock(o);
59 total_operations += runs;
60 cons_check = cons_check ^ my_check;
61 // sout | "Cons: " | runs;
62 unlock(o);
63}
64
65thread Producer {};
66
67void main(Producer & this) {
68 size_t i = 0;
69 size_t runs = 0;
70 size_t my_check = 0;
71 int chanIndices[Channels];
72 getRandArray( chanIndices );
73 try {
74 for ( ;;i++ ) {
75 insert( channels[ chanIndices[ i % Channels ] ], i );
76 my_check = my_check ^ i;
77 runs++;
78 }
79 } catch ( channel_closed * e ) {}
80 lock(o);
81 total_operations += runs;
82 prod_check = prod_check ^ my_check;
83 // sout | "Prods: " | runs;
84 unlock(o);
85}
86
87
88int main( int argc, char *argv[] ) {
89 switch( argc ) {
90 case 4:
91 if ( strcmp( argv[3], "d" ) != 0 ) { // default ?
92 ChannelSize = ato( argv[3] );
93 if ( ChannelSize < 1 ) fallthru default;
94 } // if
95 case 3:
96 if ( strcmp( argv[2], "d" ) != 0 ) { // default ?
97 Channels = ato( argv[2] );
98 if ( Channels < 1 ) fallthru default;
99 } // if
100 case 2:
101 if ( strcmp( argv[1], "d" ) != 0 ) { // default ?
102 Processors = ato( argv[1] );
103 if ( Processors < 1 ) fallthru default;
104 } // if
105 case 1: // use defaults
106 break;
107 default:
108 exit | "Usage: " | argv[0]
109 | " [ processors > 0 | d ]"
110 | " [ producers > 0 | d ]"
111 | " [ consumers > 0 | d ]"
112 | " [ channels > 0 | d ]";
113 }
114 processor p[Processors - 1];
115 channels = aalloc( Channels );
116
117 // sout | "Processors: " | Processors | " Producers: " | Producers | " Consumers: " | Consumers | "Channels: " | Channels | " Channel Size: " | ChannelSize;
118
119 for ( i; Channels )
120 channels[i]{ ChannelSize };
121
122 sout | "start";
123 {
124 Consumer c[Consumers];
125 {
126 Producer p[Producers];
127 sleep(10`s);
128 for ( i; Channels )
129 close( channels[i] );
130 }
131 }
132
133 adelete( channels );
134
135 if ( cons_check != prod_check )
136 sout | "CHECKSUM MISMATCH !!!";
137
138 // sout | total_operations;
139 // print_stats_now( *active_cluster(), CFA_STATS_READY_Q );
140
141 sout | "done";
142 return 0;
143}
Note: See TracBrowser for help on using the repository browser.