source: tests/concurrency/channels/pub_sub.cfa@ b28ce93

Last change on this file since b28ce93 was d923fca, checked in by Andrew Beach <ajbeach@…>, 7 months ago

Clean-up the warnings of the concurrency tests. A lot of little test level fixes, the most interesting repeated one is some formally redundent fallthough statements. pthread_attr_test had to be rewritten because of library restrictions. Changed some types so they would always be pointer sized. There was a library change, there was a function that could not be implemented; I trust that it is included for a reason so I just put it in a comment. There is a change to the compiler, wait-until now uses goto. The labelled breaks were code generated as unlabelled breaks and although it worked out slipped through some checks. Finally, there is one warning that I cannot solve at this time so tests that produce it have been put in their own lax group.

  • Property mode set to 100644
File size: 3.3 KB
Line 
1#include <locks.hfa>
2#include <fstream.hfa>
3#include <stdio.h>
4#include <channel.hfa>
5#include <thread.hfa>
6#include <time.hfa>
7#include <string.h>
8#include <mutex_stmt.hfa>
9
10size_t total_operations = 0;
11size_t Processors = 1, Tasks = 4;
12
13typedef channel( size_t ) Channel;
14
15channel( int ) * barWait;
16channel( int ) * entryWait;
17int BarrierSize = 4;
18static inline void closeBarrier() {
19 close(*entryWait);
20 close(*barWait);
21}
22
23static inline void initBarrier() {
24 barWait = malloc();
25 entryWait = malloc();
26 (*barWait){ BarrierSize };
27 (*entryWait){ BarrierSize };
28 for ( j; BarrierSize )
29 insert( *entryWait, j );
30}
31
32static inline void deleteBarrier() {
33 delete(barWait);
34 delete(entryWait);
35}
36
37static inline void barrier() {
38 int ticket = remove( *entryWait );
39 if ( ticket == BarrierSize - 1 ) {
40 for ( j; BarrierSize - 1 )
41 insert( *barWait, j );
42 return;
43 }
44 ticket = remove( *barWait );
45
46 // last one out
47 if ( BarrierSize == 1 || ticket == BarrierSize - 2 ) {
48 for ( j; BarrierSize )
49 insert( *entryWait, j );
50 }
51}
52
53Channel * chans;
54owner_lock o;
55
56thread Task { size_t id; };
57static inline void ?{}( Task & p, size_t i, cluster & clu ) {
58 ((thread &)p){ clu };
59 p.id = i;
60}
61void main(Task & this) with(this) {
62 size_t runs = 0;
63 try {
64 for ( ;; ) {
65 // publish
66 for ( i; Tasks ) {
67 insert(chans[id], i);
68 }
69
70 // subscribe
71 for ( i; Tasks ) {
72 remove( chans[i] );
73 }
74 barrier();
75 runs++;
76 }
77 } catch ( channel_closed * e ) { }
78 lock(o);
79 total_operations += runs;
80 // sout | runs;
81 unlock(o);
82}
83
84
85int main( int argc, char * argv[] ) {
86 switch ( argc ) {
87 case 3:
88 if ( strcmp( argv[2], "d" ) != 0 ) { // default ?
89 Tasks = ato( argv[2] );
90 if ( Tasks < 1 ) fallthrough default;
91 } // if
92 fallthrough;
93 case 2:
94 if ( strcmp( argv[1], "d" ) != 0 ) { // default ?
95 Processors = ato( argv[1] );
96 if ( Processors < 1 ) fallthrough default;
97 } // if
98 fallthrough;
99 case 1: // use defaults
100 break;
101 default:
102 exit | "Usage: " | argv[0]
103 | " [ processors (> 0) | 'd' (default " | Processors
104 | ") ] [ Tasks (> 0) | 'd' (default " | Tasks
105 | ") ]" ;
106 } // switch
107 BarrierSize = Tasks;
108
109 size_t Clusters = 1;
110 // create a cluster
111 cluster clus[Clusters];
112 processor * proc[Processors];
113
114 // setup processors
115 for ( i; Processors )
116 (*(proc[i] = malloc())){clus[i % Clusters]};
117
118 // setup pub/sub chans
119 chans = aalloc( Tasks );
120 for ( i; Tasks )
121 chans[i]{ Tasks };
122
123 // setup barrier
124 initBarrier();
125
126 // sout | "Processors: " | Processors | " ProdsPerChan: " | Producers | " ConsPerChan: " | Consumers | "Channels: " | Channels | " Channel Size: " | ChannelSize;
127
128 sout | "start";
129 Task * t[Tasks];
130
131 // create tasks
132 for ( i; Tasks )
133 (*(t[i] = malloc())){ i, clus[i % Clusters] };
134
135 sleep(10`s);
136
137 closeBarrier();
138 for ( i; Tasks )
139 close( chans[i] );
140
141 for ( i; Tasks ) {
142 delete(t[i]);
143 }
144
145 deleteBarrier();
146
147 // sout | total_operations;
148
149 for ( i; Processors ) {
150 delete(proc[i]);
151 }
152 adelete( chans );
153 sout | "done";
154 return 0;
155}
Note: See TracBrowser for help on using the repository browser.