source: tests/concurrency/channels/churn.cfa@ b28ce93

Last change on this file since b28ce93 was d923fca, checked in by Andrew Beach <ajbeach@…>, 7 months ago

Clean-up the warnings of the concurrency tests. A lot of little test level fixes, the most interesting repeated one is some formally redundent fallthough statements. pthread_attr_test had to be rewritten because of library restrictions. Changed some types so they would always be pointer sized. There was a library change, there was a function that could not be implemented; I trust that it is included for a reason so I just put it in a comment. There is a change to the compiler, wait-until now uses goto. The labelled breaks were code generated as unlabelled breaks and although it worked out slipped through some checks. Finally, there is one warning that I cannot solve at this time so tests that produce it have been put in their own lax group.

  • Property mode set to 100644
File size: 3.6 KB
Line 
1#include <locks.hfa>
2#include <fstream.hfa>
3#include <stdio.h>
4#include <string.h>
5#include <channel.hfa>
6#include <thread.hfa>
7#include <time.hfa>
8
9ssize_t Processors = 1, Channels = 4, Producers = 2, Consumers = 2, ChannelSize = 128;
10
11owner_lock o;
12
13size_t total_operations = 0;
14size_t cons_check = 0, prod_check = 0;
15
16channel( size_t ) * channels;
17thread Consumer {};
18
19void getRandArray( int * chanIndices ) {
20 for ( int i = 0; i < Channels; i++ ) {
21 chanIndices[i] = i;
22 }
23 for ( int i = 0; i < Channels; i++ ) {
24 int loc_1 = prng() % Channels;
25 int loc_2 = prng() % Channels;
26 int temp = chanIndices[ loc_1 ];
27 chanIndices[ loc_1 ] = chanIndices[ loc_2 ];
28 chanIndices[ loc_2 ] = temp;
29 }
30}
31
32void main(Consumer &) {
33 size_t i = 0;
34 size_t runs = 0;
35 size_t my_check = 0;
36 int chanIndices[Channels];
37 getRandArray( chanIndices );
38 try {
39 for ( ;;i++ ) {
40 size_t j = remove( channels[ chanIndices[ i % Channels ] ] );
41 my_check = my_check ^ j;
42 runs++;
43 }
44 } catch ( channel_closed * e ) {}
45
46 // flush out rest of channels
47 for ( i; Channels ) {
48 try {
49 for ( ;; ) {
50 size_t j = remove( channels[ i ] );
51 my_check = my_check ^ j;
52 runs++;
53 }
54 } catchResume ( channel_closed * e ) {} // continue to remove until would block
55 catch ( channel_closed * e ) {}
56 }
57
58 lock(o);
59 total_operations += runs;
60 cons_check = cons_check ^ my_check;
61 // sout | "Cons: " | runs;
62 unlock(o);
63}
64
65thread Producer {};
66
67void main(Producer &) {
68 size_t i = 0;
69 size_t runs = 0;
70 size_t my_check = 0;
71 int chanIndices[Channels];
72 getRandArray( chanIndices );
73 try {
74 for ( ;;i++ ) {
75 insert( channels[ chanIndices[ i % Channels ] ], i );
76 my_check = my_check ^ i;
77 runs++;
78 }
79 } catch ( channel_closed * e ) {}
80 lock(o);
81 total_operations += runs;
82 prod_check = prod_check ^ my_check;
83 // sout | "Prods: " | runs;
84 unlock(o);
85}
86
87
88int main( int argc, char *argv[] ) {
89 switch( argc ) {
90 case 4:
91 if ( strcmp( argv[3], "d" ) != 0 ) { // default ?
92 ChannelSize = ato( argv[3] );
93 if ( ChannelSize < 1 ) fallthrough default;
94 } // if
95 fallthrough;
96 case 3:
97 if ( strcmp( argv[2], "d" ) != 0 ) { // default ?
98 Channels = ato( argv[2] );
99 if ( Channels < 1 ) fallthrough default;
100 } // if
101 fallthrough;
102 case 2:
103 if ( strcmp( argv[1], "d" ) != 0 ) { // default ?
104 Processors = ato( argv[1] );
105 if ( Processors < 1 ) fallthrough default;
106 } // if
107 fallthrough;
108 case 1: // use defaults
109 break;
110 default:
111 exit | "Usage: " | argv[0]
112 | " [ processors > 0 | d ]"
113 | " [ producers > 0 | d ]"
114 | " [ consumers > 0 | d ]"
115 | " [ channels > 0 | d ]";
116 }
117 processor p[Processors - 1];
118 channels = aalloc( Channels );
119
120 // sout | "Processors: " | Processors | " Producers: " | Producers | " Consumers: " | Consumers | "Channels: " | Channels | " Channel Size: " | ChannelSize;
121
122 for ( i; Channels )
123 channels[i]{ ChannelSize };
124
125 sout | "start";
126 {
127 Consumer c[Consumers];
128 {
129 Producer p[Producers];
130 sleep(10`s);
131 for ( i; Channels )
132 close( channels[i] );
133 }
134 }
135
136 adelete( channels );
137
138 if ( cons_check != prod_check )
139 sout | "CHECKSUM MISMATCH !!!";
140
141 // sout | total_operations;
142 // print_stats_now( *active_cluster(), CFA_STATS_READY_Q );
143
144 sout | "done";
145 return 0;
146}
Note: See TracBrowser for help on using the repository browser.