source: tests/concurrency/waituntil/channel_close.cfa @ ec22220

Last change on this file since ec22220 was ec22220, checked in by Peter A. Buhr <pabuhr@…>, 8 days ago

formatting

  • Property mode set to 100644
File size: 2.8 KB
Line 
1#include <fstream.hfa>
2#include <thread.hfa>
3#include <channel.hfa>
4#include <time.hfa>
5
6channel(ssize_t) A, B;
7
8volatile size_t inserts = 0, removes = 0;
9
10thread Producer {};
11void main( Producer & this ) {
12    try {
13        for( size_t i; 0~@ ) {
14            waituntil( A << i ) { inserts++; }
15            and waituntil( B << i ) { inserts++; }
16        }
17    } catch ( channel_closed * e ) {}
18}
19
20bool useAnd = false;
21thread Consumer {}; // ensures that the changing when states of Server1 don't result in a deadlock
22void main( Consumer & this ) {
23    ssize_t in, in2, A_removes = 0, B_removes = 0;
24
25    try {
26        for () {
27            if ( useAnd ) {
28                waituntil( (in << A) ) { __atomic_thread_fence( __ATOMIC_SEQ_CST ); assert( A_removes == in ); A_removes++; removes++; }
29                and waituntil( (in2 << B) ) { __atomic_thread_fence( __ATOMIC_SEQ_CST ); assert( B_removes == in2 ); B_removes++; removes++; }
30                continue;
31            }
32            waituntil( (in << A) ) { __atomic_thread_fence( __ATOMIC_SEQ_CST ); assert( A_removes == in ); A_removes++; removes++; }
33            or waituntil( (in << B) ) { __atomic_thread_fence( __ATOMIC_SEQ_CST ); ( B_removes == in ); B_removes++; removes++; }
34        }
35    } catchResume ( channel_closed * e ) {                              // continue to remove until would block
36        } catch ( channel_closed * e ) {}
37
38    try {
39        for ()
40            waituntil( (in << A) ) { __atomic_thread_fence( __ATOMIC_SEQ_CST ); assert( A_removes == in ); A_removes++; removes++; }
41    } catchResume ( channel_closed * e ) {                              // continue to remove until would block
42        } catch ( channel_closed * e ) {}
43
44    try {
45        for ()
46            waituntil( (in << B) ) { __atomic_thread_fence( __ATOMIC_SEQ_CST ); assert( B_removes == in ); B_removes++; removes++; }
47    } catchResume ( channel_closed * e ) {                              // continue to remove until would block
48        } catch ( channel_closed * e ) {}
49}
50
51int main( int argc, char * argv[] ) {
52        size_t time = 5;
53
54    if ( argc == 2 )
55        time = atoi( argv[1] );
56
57    processor p[2];
58    A{5};
59    B{5};
60
61    sout | "start OR";
62    {
63        Producer p;
64        Consumer c;
65        sleep(time`s);
66        sout | "done sleep";
67        sout | "closing A";
68        close( A );
69        sout | "closing B";
70        close( B );
71    }
72    if ( inserts != removes )
73        sout | "CHECKSUM MISMATCH!! Producer got:" | inserts | ", Consumer got:" | removes;
74    sout | "done";
75    ^A{};
76    ^B{};
77
78    useAnd = true;
79    inserts = removes = 0;
80    A{5};
81    B{5};
82    sout | "start AND";
83    {
84        Producer p;
85        Consumer c;
86        sleep( time`s );
87        sout | "done sleep";
88        sout | "closing A";
89        close( A );
90        sout | "closing B";
91        close( B );
92    }
93    if ( inserts != removes )
94        sout | "CHECKSUM MISMATCH!! Producer got:" | inserts | ", Consumer got:" | removes;
95    sout | "done";
96}
Note: See TracBrowser for help on using the repository browser.