source: tests/concurrency/channels/contend.cfa @ ded6c2a6

ast-experimental
Last change on this file since ded6c2a6 was c26bea2a, checked in by Peter A. Buhr <pabuhr@…>, 18 months ago

first attempt at renaming directory tests/concurrent to tests/concurrency to harmonize with other concurrency directory names

  • Property mode set to 100644
File size: 3.6 KB
Line 
1#include <locks.hfa>
2#include <fstream.hfa>
3#include <stdio.h>
4#include <channel.hfa>
5#include <thread.hfa>
6#include <time.hfa>
7#include <string.h>
8
9owner_lock o;
10
11size_t total_operations = 0;
12
13typedef channel( size_t ) Channel;
14
15Channel * channels;
16
17size_t cons_check = 0, prod_check = 0;
18
19thread Consumer {
20    size_t i;
21};
22static inline void ?{}( Consumer & c, size_t i, cluster & clu ) {
23    ((thread &)c){ clu };
24    c.i = i;
25}
26void main(Consumer & this) {
27    size_t runs = 0;
28    size_t my_check = 0;
29    try {
30        for ( ;; ) {
31            size_t j = remove( channels[ this.i ] );
32            my_check = my_check ^ j;
33            runs++;
34        }
35    } catchResume ( channel_closed * e ) {} // continue to remove until would block
36    catch ( channel_closed * e ) {}
37    lock(o);
38    total_operations += runs;
39    cons_check = cons_check ^ my_check;
40    // sout | "C: " | runs;
41    unlock(o);
42}
43
44thread Producer {
45    size_t i;
46};
47static inline void ?{}( Producer & p, size_t i, cluster & clu ) {
48    ((thread &)p){ clu };
49    p.i = i;
50}
51void main(Producer & this) {
52    size_t runs = 0;
53    size_t my_check = 0;
54    try {
55        for ( ;; ) {
56            insert( channels[ this.i  ], (size_t)runs );
57            my_check = my_check ^ ((size_t)runs);
58            runs++;
59        }
60    } catch ( channel_closed * e ) {}
61    lock(o);
62    total_operations += runs;
63    prod_check = prod_check ^ my_check;
64    // sout | "P: " | runs;
65    unlock(o);
66}
67
68static inline int test( size_t Processors, size_t Channels, size_t Producers, size_t Consumers, size_t ChannelSize ) {
69    size_t Clusters = Channels;
70    // create a cluster
71    cluster clus[Clusters];
72    processor * proc[Processors];
73    for ( i; Processors ) {
74        (*(proc[i] = malloc())){clus[i % Clusters]};
75    }
76
77    channels = aalloc( Channels );
78
79    // sout | "Processors: " | Processors | " ProdsPerChan: " | Producers | " ConsPerChan: " | Consumers | "Channels: " | Channels | " Channel Size: " | ChannelSize;
80   
81    for ( i; Channels ) {
82        channels[i]{ ChannelSize };
83    }
84
85    sout | "start";
86    Consumer * c[Consumers * Channels];
87    Producer * p[Producers * Channels];
88
89    for ( j; Channels ) {
90        for ( i; Producers ) {
91            (*(p[i] = malloc())){ j, clus[j % Clusters] };
92        }
93
94        for ( i; Consumers ) {
95            (*(c[i] = malloc())){ j, clus[j % Clusters] };
96        }
97    }
98
99    sleep(10`s);
100
101    for ( i; Channels )
102        close( channels[i] );
103
104    for ( i; Producers * Channels ) {
105        delete(p[i]);
106    }
107    for ( i; Consumers * Channels ) {
108        delete(c[i]);
109    }
110
111    adelete( channels );
112    // sout | total_operations;
113    if ( cons_check != prod_check )
114        sout | "CHECKSUM MISMATCH !!!";
115    // print_stats_now( *active_cluster(), CFA_STATS_READY_Q);
116
117    for ( i; Processors ) {
118        delete(proc[i]);
119    }
120    sout | "done";
121    return 0;
122}
123
124int main( int argc, char * argv[] ) {
125    size_t Processors = 1, Channels = 1, Producers = 4, Consumers = 4, ChannelSize = 128;
126    switch ( argc ) {
127          case 3:
128                if ( strcmp( argv[2], "d" ) != 0 ) {                    // default ?
129                        ChannelSize = atoi( argv[2] );
130                } // if
131          case 2:
132                if ( strcmp( argv[1], "d" ) != 0 ) {                    // default ?
133                        Processors = atoi( argv[1] );
134                        if ( Processors < 1 ) goto Usage;
135                } // if
136          case 1:                                                                                       // use defaults
137                break;
138          default:
139          Usage:
140                sout | "Usage: " | argv[0]
141             | " [ processors (> 0) | 'd' (default " | Processors
142                         | ") ] [ channel size (>= 0) | 'd' (default " | ChannelSize
143                         | ") ]" ;
144                exit( EXIT_FAILURE );
145        } // switch
146    test(Processors, Channels, Producers, Consumers, ChannelSize);
147}
Note: See TracBrowser for help on using the repository browser.