source: tests/concurrent/channels/pub_sub.cfa@ 2314aac

ast-experimental
Last change on this file since 2314aac was 9319a23, checked in by caparson <caparson@…>, 2 years ago

added channel tests that use new termination feature

  • Property mode set to 100644
File size: 3.3 KB
Line 
1#include <locks.hfa>
2#include <fstream.hfa>
3#include <stdio.h>
4#include <channel.hfa>
5#include <thread.hfa>
6#include <time.hfa>
7#include <string.h>
8#include <mutex_stmt.hfa>
9
10size_t total_operations = 0;
11size_t Processors = 1, Tasks = 4;
12
13typedef channel( size_t ) Channel;
14
15channel( int ) * barWait;
16channel( int ) * entryWait;
17int BarrierSize = 4;
18static inline void closeBarrier() {
19 close(*entryWait);
20 close(*barWait);
21}
22
23static inline void initBarrier() {
24 barWait = malloc();
25 entryWait = malloc();
26 (*barWait){ BarrierSize };
27 (*entryWait){ BarrierSize };
28 for ( j; BarrierSize )
29 insert( *entryWait, j );
30}
31
32static inline void deleteBarrier() {
33 delete(barWait);
34 delete(entryWait);
35}
36
37static inline void barrier() {
38 int ticket = remove( *entryWait );
39 if ( ticket == BarrierSize - 1 ) {
40 for ( j; BarrierSize - 1 )
41 insert( *barWait, j );
42 return;
43 }
44 ticket = remove( *barWait );
45
46 // last one out
47 if ( BarrierSize == 1 || ticket == BarrierSize - 2 ) {
48 for ( j; BarrierSize )
49 insert( *entryWait, j );
50 }
51}
52
53Channel * chans;
54owner_lock o;
55
56thread Task { size_t id; };
57static inline void ?{}( Task & p, size_t i, cluster & clu ) {
58 ((thread &)p){ clu };
59 p.id = i;
60}
61void main(Task & this) with(this) {
62 size_t runs = 0;
63 try {
64 for ( ;; ) {
65 // publish
66 for ( i; Tasks ) {
67 insert(chans[id], i);
68 }
69
70 // subscribe
71 for ( i; Tasks ) {
72 remove( chans[i] );
73 }
74 barrier();
75 runs++;
76 }
77 } catch ( channel_closed * e ) { }
78 lock(o);
79 total_operations += runs;
80 // sout | runs;
81 unlock(o);
82}
83
84
85int main( int argc, char * argv[] ) {
86 switch ( argc ) {
87 case 3:
88 if ( strcmp( argv[2], "d" ) != 0 ) { // default ?
89 Tasks = atoi( argv[2] );
90 if ( Tasks < 1 ) goto Usage;
91 } // if
92 case 2:
93 if ( strcmp( argv[1], "d" ) != 0 ) { // default ?
94 Processors = atoi( argv[1] );
95 if ( Processors < 1 ) goto Usage;
96 } // if
97 case 1: // use defaults
98 break;
99 default:
100 Usage:
101 sout | "Usage: " | argv[0]
102 | " [ processors (> 0) | 'd' (default " | Processors
103 | ") ] [ Tasks (> 0) | 'd' (default " | Tasks
104 | ") ]" ;
105 exit( EXIT_FAILURE );
106 } // switch
107 BarrierSize = Tasks;
108
109 size_t Clusters = 1;
110 // create a cluster
111 cluster clus[Clusters];
112 processor * proc[Processors];
113
114 // setup processors
115 for ( i; Processors )
116 (*(proc[i] = malloc())){clus[i % Clusters]};
117
118 // setup pub/sub chans
119 chans = aalloc( Tasks );
120 for ( i; Tasks )
121 chans[i]{ Tasks };
122
123 // setup barrier
124 initBarrier();
125
126 // sout | "Processors: " | Processors | " ProdsPerChan: " | Producers | " ConsPerChan: " | Consumers | "Channels: " | Channels | " Channel Size: " | ChannelSize;
127
128 sout | "start";
129 Task * t[Tasks];
130
131 // create tasks
132 for ( i; Tasks )
133 (*(t[i] = malloc())){ i, clus[i % Clusters] };
134
135 sleep(10`s);
136
137 closeBarrier();
138 for ( i; Tasks )
139 close( chans[i] );
140
141 for ( i; Tasks ) {
142 delete(t[i]);
143 }
144
145 deleteBarrier();
146
147 // sout | total_operations;
148
149 for ( i; Processors ) {
150 delete(proc[i]);
151 }
152 adelete( chans );
153 sout | "done";
154 return 0;
155}
Note: See TracBrowser for help on using the repository browser.