source: doc/theses/colby_parsons_MMAth/benchmarks/channels/cfa/pub_sub.cfa @ fa5e1aa5

Last change on this file since fa5e1aa5 was 76a8400, checked in by caparson <caparson@…>, 20 months ago

added all cfa benchmarks, contend is the only one used in the thesis, but other may be used later so Im keeping them around

  • Property mode set to 100644
File size: 3.5 KB
Line 
1#include <locks.hfa>
2#include <fstream.hfa>
3#include <stdio.h>
4#include <channel.hfa>
5#include <thread.hfa>
6#include <time.hfa>
7#include <string.h>
8#include <mutex_stmt.hfa>
9
10size_t total_operations = 0;
11size_t Processors = 1, Tasks = 1;
12
13typedef channel( size_t ) Channel;
14
15channel( int ) * barWait;
16channel( int ) * entryWait;
17int BarrierSize = 1;
18static inline void flushBarrier() {
19    for ( j; BarrierSize ) {
20        insert( *entryWait, -1 );
21        insert( *barWait, -1 );
22    }
23}
24
25static inline void initBarrier() {
26    for ( j; BarrierSize )
27        insert( *entryWait, j );
28}
29
30static inline void barrier() {
31    int ticket = remove( *entryWait );
32    if ( ticket == -1 ) {
33                insert( *entryWait, -1 );
34                return;
35        }
36    if ( ticket == BarrierSize - 1 ) {
37                for ( j; BarrierSize - 1 )
38            insert( *barWait, j );
39        return;
40        }
41    ticket = remove( *barWait );
42    if ( ticket == -1 ) {
43                insert( *barWait, -1 );
44                return;
45        }
46
47        // last one out
48        if ( BarrierSize == 1 || ticket == BarrierSize - 2 ) {
49                for ( j; BarrierSize )
50            insert( *entryWait, j );
51        }
52}
53
54Channel ** chans;
55owner_lock o;
56
57bool done = false;
58size_t tasks_done = 0;
59
60thread Task { size_t id; };
61static inline void ?{}( Task & p, size_t i, cluster & clu ) {
62    ((thread &)p){ clu };
63    p.id = i;
64}
65void main(Task & this) with(this) {
66    size_t runs = 0;
67    size_t my_id = id;
68    for ( ;; ) {
69        if ( done ) break;
70
71        // publish
72        for ( i; Tasks ) {
73            insert(*chans[my_id], i);
74        }
75
76        // subscribe
77        for ( i; Tasks ) {
78            remove( *chans[i] );
79        }
80        barrier();
81        runs++;
82    }
83    lock(o);
84    total_operations += runs;
85    // sout | "P: " | runs;
86    unlock(o);
87}
88
89
90int main( int argc, char * argv[] ) {
91    switch ( argc ) {
92          case 3:
93                if ( strcmp( argv[2], "d" ) != 0 ) {                    // default ?
94                        Tasks = atoi( argv[2] );
95            if ( Tasks < 1 ) goto Usage;
96                } // if
97          case 2:
98                if ( strcmp( argv[1], "d" ) != 0 ) {                    // default ?
99                        Processors = atoi( argv[1] );
100                        if ( Processors < 1 ) goto Usage;
101                } // if
102          case 1:                                                                                       // use defaults
103                break;
104          default:
105          Usage:
106                sout | "Usage: " | argv[0]
107             | " [ processors (> 0) | 'd' (default " | Processors
108                         | ") ] [ Tasks (> 0) | 'd' (default " | Tasks
109                         | ") ]" ;
110                exit( EXIT_FAILURE );
111        } // switch
112    Tasks = Processors;
113    BarrierSize = Tasks;
114
115    size_t Clusters = 1;
116    // create a cluster
117    cluster clus[Clusters];
118    processor * proc[Processors];
119    for ( i; Processors ) {
120        (*(proc[i] = malloc())){clus[i % Clusters]};
121    }
122
123    chans = aalloc( Tasks );
124    for ( i; Tasks ) {
125        chans[i] = malloc();
126        (*chans[i]){ 2 * Tasks };
127    }
128       
129
130    // setup barrier
131    channel(int) entry{ 2 * BarrierSize };
132    channel(int) wait{ 2 * BarrierSize };
133    entryWait = &entry;
134    barWait = &wait;
135    initBarrier();
136
137    // sout | "Processors: " | Processors | " ProdsPerChan: " | Producers | " ConsPerChan: " | Consumers | "Channels: " | Channels | " Channel Size: " | ChannelSize;
138
139    // sout | "start";
140    Task * t[Tasks];
141
142    for ( i; Tasks ) {
143        (*(t[i] = malloc())){ i, clus[i % Clusters] };
144    }
145
146    sleep(10`s);
147    done = true;
148
149    for ( i; Tasks )
150        for ( j; Tasks )
151            insert(*chans[i], j);
152
153    flushBarrier();
154
155    for ( i; Tasks ) {
156        delete(t[i]);
157    }
158   
159    sout | total_operations;
160    // print_stats_now( *active_cluster(), CFA_STATS_READY_Q);
161
162    for ( i; Processors ) {
163        delete(proc[i]);
164    }
165    adelete( chans );
166    // sout | "done";
167    return 0;
168}
Note: See TracBrowser for help on using the repository browser.