source: tests/concurrency/channels/pub_sub.cfa @ d923fca

Last change on this file since d923fca was d923fca, checked in by Andrew Beach <ajbeach@…>, 6 weeks ago

Clean-up the warnings of the concurrency tests. A lot of little test level fixes, the most interesting repeated one is some formally redundent fallthough statements. pthread_attr_test had to be rewritten because of library restrictions. Changed some types so they would always be pointer sized. There was a library change, there was a function that could not be implemented; I trust that it is included for a reason so I just put it in a comment. There is a change to the compiler, wait-until now uses goto. The labelled breaks were code generated as unlabelled breaks and although it worked out slipped through some checks. Finally, there is one warning that I cannot solve at this time so tests that produce it have been put in their own lax group.

  • Property mode set to 100644
File size: 3.3 KB
Line 
1#include <locks.hfa>
2#include <fstream.hfa>
3#include <stdio.h>
4#include <channel.hfa>
5#include <thread.hfa>
6#include <time.hfa>
7#include <string.h>
8#include <mutex_stmt.hfa>
9
10size_t total_operations = 0;
11size_t Processors = 1, Tasks = 4;
12
13typedef channel( size_t ) Channel;
14
15channel( int ) * barWait;
16channel( int ) * entryWait;
17int BarrierSize = 4;
18static inline void closeBarrier() {
19    close(*entryWait);
20    close(*barWait);
21}
22
23static inline void initBarrier() {
24    barWait = malloc();
25    entryWait = malloc();
26    (*barWait){ BarrierSize };
27    (*entryWait){ BarrierSize };
28    for ( j; BarrierSize )
29        insert( *entryWait, j );
30}
31
32static inline void deleteBarrier() {
33    delete(barWait);
34    delete(entryWait);
35}
36
37static inline void barrier() {
38    int ticket = remove( *entryWait );
39    if ( ticket == BarrierSize - 1 ) {
40                for ( j; BarrierSize - 1 )
41            insert( *barWait, j );
42        return;
43        }
44    ticket = remove( *barWait );
45
46        // last one out
47        if ( BarrierSize == 1 || ticket == BarrierSize - 2 ) {
48                for ( j; BarrierSize )
49            insert( *entryWait, j );
50        }
51}
52
53Channel * chans;
54owner_lock o;
55
56thread Task { size_t id; };
57static inline void ?{}( Task & p, size_t i, cluster & clu ) {
58    ((thread &)p){ clu };
59    p.id = i;
60}
61void main(Task & this) with(this) {
62    size_t runs = 0;
63    try {
64        for ( ;; ) {
65            // publish
66            for ( i; Tasks ) {
67                insert(chans[id], i);
68            }
69
70            // subscribe
71            for ( i; Tasks ) {
72                remove( chans[i] );
73            }
74            barrier();
75            runs++;
76        }
77    } catch ( channel_closed * e ) { }
78    lock(o);
79    total_operations += runs;
80    // sout | runs;
81    unlock(o);
82}
83
84
85int main( int argc, char * argv[] ) {
86    switch ( argc ) {
87          case 3:
88                if ( strcmp( argv[2], "d" ) != 0 ) {                    // default ?
89                        Tasks = ato( argv[2] );
90            if ( Tasks < 1 ) fallthrough default;
91                } // if
92                fallthrough;
93          case 2:
94                if ( strcmp( argv[1], "d" ) != 0 ) {                    // default ?
95                        Processors = ato( argv[1] );
96                        if ( Processors < 1 ) fallthrough default;
97                } // if
98                fallthrough;
99          case 1:                                                                                       // use defaults
100                break;
101          default:
102                exit | "Usage: " | argv[0]
103             | " [ processors (> 0) | 'd' (default " | Processors
104                         | ") ] [ Tasks (> 0) | 'd' (default " | Tasks
105                         | ") ]" ;
106        } // switch
107    BarrierSize = Tasks;
108
109    size_t Clusters = 1;
110    // create a cluster
111    cluster clus[Clusters];
112    processor * proc[Processors];
113
114    // setup processors
115    for ( i; Processors )
116        (*(proc[i] = malloc())){clus[i % Clusters]};
117
118    // setup pub/sub chans
119    chans = aalloc( Tasks );
120    for ( i; Tasks )
121        chans[i]{ Tasks };
122
123    // setup barrier
124    initBarrier();
125
126    // sout | "Processors: " | Processors | " ProdsPerChan: " | Producers | " ConsPerChan: " | Consumers | "Channels: " | Channels | " Channel Size: " | ChannelSize;
127
128    sout | "start";
129    Task * t[Tasks];
130
131    // create tasks
132    for ( i; Tasks )
133        (*(t[i] = malloc())){ i, clus[i % Clusters] };
134
135    sleep(10`s);
136
137    closeBarrier();
138    for ( i; Tasks )
139        close( chans[i] );
140
141    for ( i; Tasks ) {
142        delete(t[i]);
143    }
144
145    deleteBarrier();
146   
147    // sout | total_operations;
148
149    for ( i; Processors ) {
150        delete(proc[i]);
151    }
152    adelete( chans );
153    sout | "done";
154    return 0;
155}
Note: See TracBrowser for help on using the repository browser.