source: tests/concurrent/channels/churn.cfa@ bb7422a

ADT ast-experimental
Last change on this file since bb7422a was 3d08cea, checked in by caparsons <caparson@…>, 2 years ago

Removed unneeded include. Should fix failing channels/churn test.

  • Property mode set to 100644
File size: 3.6 KB
Line 
1#include <locks.hfa>
2#include <fstream.hfa>
3#include <stdio.h>
4#include <string.h>
5#include <channel.hfa>
6#include <thread.hfa>
7#include <time.hfa>
8
9size_t Processors = 1, Channels = 4, Producers = 2, Consumers = 2, ChannelSize = 128;
10
11owner_lock o;
12
13size_t total_operations = 0;
14size_t cons_check = 0, prod_check = 0;
15
16channel( size_t ) * channels;
17thread Consumer {};
18
19void getRandArray( int * chanIndices ) {
20 for ( int i = 0; i < Channels; i++ ) {
21 chanIndices[i] = i;
22 }
23 for ( int i = 0; i < Channels; i++ ) {
24 int loc_1 = prng() % Channels;
25 int loc_2 = prng() % Channels;
26 int temp = chanIndices[ loc_1 ];
27 chanIndices[ loc_1 ] = chanIndices[ loc_2 ];
28 chanIndices[ loc_2 ] = temp;
29 }
30}
31
32void main(Consumer & this) {
33 size_t i = 0;
34 size_t runs = 0;
35 size_t my_check = 0;
36 int chanIndices[Channels];
37 getRandArray( chanIndices );
38 try {
39 for ( ;;i++ ) {
40 size_t j = remove( channels[ chanIndices[ i % Channels ] ] );
41 my_check = my_check ^ j;
42 runs++;
43 }
44 } catch ( channel_closed * e ) {}
45
46 // flush out rest of channels
47 for ( i; Channels ) {
48 try {
49 for ( ;; ) {
50 size_t j = remove( channels[ i ] );
51 my_check = my_check ^ j;
52 runs++;
53 }
54 } catchResume ( channel_closed * e ) {} // continue to remove until would block
55 catch ( channel_closed * e ) {}
56 }
57
58 lock(o);
59 total_operations += runs;
60 cons_check = cons_check ^ my_check;
61 // sout | "Cons: " | runs;
62 unlock(o);
63}
64
65thread Producer {};
66
67void main(Producer & this) {
68 size_t i = 0;
69 size_t runs = 0;
70 size_t my_check = 0;
71 int chanIndices[Channels];
72 getRandArray( chanIndices );
73 try {
74 for ( ;;i++ ) {
75 insert( channels[ chanIndices[ i % Channels ] ], i );
76 my_check = my_check ^ i;
77 runs++;
78 }
79 } catch ( channel_closed * e ) {}
80 lock(o);
81 total_operations += runs;
82 prod_check = prod_check ^ my_check;
83 // sout | "Prods: " | runs;
84 unlock(o);
85}
86
87
88int main( int argc, char *argv[] ) {
89 switch( argc ) {
90 case 4:
91 if ( strcmp( argv[3], "d" ) != 0 ) { // default ?
92 if ( atoi( argv[3] ) < 1 ) goto Usage;
93 ChannelSize = atoi( argv[3] );
94 } // if
95 case 3:
96 if ( strcmp( argv[2], "d" ) != 0 ) { // default ?
97 if ( atoi( argv[2] ) < 1 ) goto Usage;
98 Channels = atoi( argv[2] );
99 } // if
100 case 2:
101 if ( strcmp( argv[1], "d" ) != 0 ) { // default ?
102 if ( atoi( argv[1] ) < 1 ) goto Usage;
103 Processors = atoi( argv[1] );
104 } // if
105 case 1: // use defaults
106 break;
107 default:
108 Usage:
109 sout | "Usage: " | argv[0]
110 | " [ processors > 0 | d ]"
111 | " [ producers > 0 | d ]"
112 | " [ consumers > 0 | d ]"
113 | " [ channels > 0 | d ]";
114 exit( EXIT_FAILURE );
115 }
116 processor p[Processors - 1];
117 channels = aalloc( Channels );
118
119 // sout | "Processors: " | Processors | " Producers: " | Producers | " Consumers: " | Consumers | "Channels: " | Channels | " Channel Size: " | ChannelSize;
120
121 for ( i; Channels )
122 channels[i]{ ChannelSize };
123
124 sout | "start";
125 {
126 Consumer c[Consumers];
127 {
128 Producer p[Producers];
129 sleep(10`s);
130 for ( i; Channels )
131 close( channels[i] );
132 }
133 }
134
135 adelete( channels );
136
137 if ( cons_check != prod_check )
138 sout | "CHECKSUM MISMATCH !!!";
139
140 // sout | total_operations;
141 // print_stats_now( *active_cluster(), CFA_STATS_READY_Q );
142
143 sout | "done";
144 return 0;
145}
Note: See TracBrowser for help on using the repository browser.