source: tests/concurrency/channels/contend.cfa@ 0497b6ba

Last change on this file since 0497b6ba was 50be8af5, checked in by Peter A. Buhr <pabuhr@…>, 2 years ago

clean up command-line handling and I/O

  • Property mode set to 100644
File size: 3.6 KB
Line 
1#include <locks.hfa>
2#include <fstream.hfa>
3#include <stdio.h>
4#include <channel.hfa>
5#include <thread.hfa>
6#include <time.hfa>
7#include <string.h>
8
9owner_lock o;
10
11size_t total_operations = 0;
12
13typedef channel( size_t ) Channel;
14
15Channel * channels;
16
17size_t cons_check = 0, prod_check = 0;
18
19thread Consumer {
20 size_t i;
21};
22static inline void ?{}( Consumer & c, size_t i, cluster & clu ) {
23 ((thread &)c){ clu };
24 c.i = i;
25}
26void main(Consumer & this) {
27 size_t runs = 0;
28 size_t my_check = 0;
29 try {
30 for ( ;; ) {
31 size_t j = remove( channels[ this.i ] );
32 my_check = my_check ^ j;
33 runs++;
34 }
35 } catchResume ( channel_closed * e ) {} // continue to remove until would block
36 catch ( channel_closed * e ) {}
37 lock(o);
38 total_operations += runs;
39 cons_check = cons_check ^ my_check;
40 // sout | "C: " | runs;
41 unlock(o);
42}
43
44thread Producer {
45 size_t i;
46};
47static inline void ?{}( Producer & p, size_t i, cluster & clu ) {
48 ((thread &)p){ clu };
49 p.i = i;
50}
51void main(Producer & this) {
52 size_t runs = 0;
53 size_t my_check = 0;
54 try {
55 for ( ;; ) {
56 insert( channels[ this.i ], (size_t)runs );
57 my_check = my_check ^ ((size_t)runs);
58 runs++;
59 }
60 } catch ( channel_closed * e ) {}
61 lock(o);
62 total_operations += runs;
63 prod_check = prod_check ^ my_check;
64 // sout | "P: " | runs;
65 unlock(o);
66}
67
68static inline int test( size_t Processors, size_t Channels, size_t Producers, size_t Consumers, size_t ChannelSize ) {
69 size_t Clusters = Channels;
70 // create a cluster
71 cluster clus[Clusters];
72 processor * proc[Processors];
73 for ( i; Processors ) {
74 (*(proc[i] = malloc())){clus[i % Clusters]};
75 }
76
77 channels = aalloc( Channels );
78
79 // sout | "Processors: " | Processors | " ProdsPerChan: " | Producers | " ConsPerChan: " | Consumers | "Channels: " | Channels | " Channel Size: " | ChannelSize;
80
81 for ( i; Channels ) {
82 channels[i]{ ChannelSize };
83 }
84
85 sout | "start";
86 Consumer * c[Consumers * Channels];
87 Producer * p[Producers * Channels];
88
89 for ( j; Channels ) {
90 for ( i; Producers ) {
91 (*(p[i] = malloc())){ j, clus[j % Clusters] };
92 }
93
94 for ( i; Consumers ) {
95 (*(c[i] = malloc())){ j, clus[j % Clusters] };
96 }
97 }
98
99 sleep(10`s);
100
101 for ( i; Channels )
102 close( channels[i] );
103
104 for ( i; Producers * Channels ) {
105 delete(p[i]);
106 }
107 for ( i; Consumers * Channels ) {
108 delete(c[i]);
109 }
110
111 adelete( channels );
112 // sout | total_operations;
113 if ( cons_check != prod_check )
114 sout | "CHECKSUM MISMATCH !!!";
115 // print_stats_now( *active_cluster(), CFA_STATS_READY_Q);
116
117 for ( i; Processors ) {
118 delete(proc[i]);
119 }
120 sout | "done";
121 return 0;
122}
123
124int main( int argc, char * argv[] ) {
125 size_t Processors = 1, Channels = 1, Producers = 4, Consumers = 4, ChannelSize = 128;
126 switch ( argc ) {
127 case 3:
128 if ( strcmp( argv[2], "d" ) != 0 ) { // default ?
129 ChannelSize = ato( argv[2] );
130 if ( ChannelSize < 1 ) fallthru default;
131 } // if
132 case 2:
133 if ( strcmp( argv[1], "d" ) != 0 ) { // default ?
134 Processors = ato( argv[1] );
135 if ( Processors < 1 ) fallthru default;
136 } // if
137 case 1: // use defaults
138 break;
139 default:
140 exit | "Usage: " | argv[0]
141 | " [ processors (> 0) | 'd' (default " | Processors
142 | ") ] [ channel size (>= 0) | 'd' (default " | ChannelSize
143 | ") ]" ;
144 } // switch
145
146 test(Processors, Channels, Producers, Consumers, ChannelSize);
147}
Note: See TracBrowser for help on using the repository browser.