#include #include #include #include channel(long long int) A, B, C, D, E, F; volatile long long int inserts = 0; volatile long long int removes = 0; thread Producer {}; void main( Producer & this ) { long long int my_inserts = 0; long long int A_i = 0, B_i = 0, C_i = 0, D_i = 0, E_i = 0, F_i = 0; try { for( long long int i = 0;;i++ ) { waituntil( A << i ) { A_i++; } and waituntil( B << i ) { B_i++; } and waituntil( C << i ) { C_i++; } and waituntil( D << i ) { D_i++; } and waituntil( E << i ) { E_i++; } and waituntil( F << i ) { F_i++; } } } catch ( channel_closed * e ) {} __atomic_fetch_add( &inserts, A_i + B_i + C_i + D_i + E_i + F_i, __ATOMIC_SEQ_CST ); } thread Consumer {}; void main( Consumer & this ) { long long int in, A_removes = 0, B_removes = 0, C_removes = 0, D_removes = 0, E_removes = 0, F_removes = 0; try { for( ;; ) { waituntil( remove(F) ) { F_removes++; } or waituntil( remove(E) ) { E_removes++; } or waituntil( remove(D) ) { D_removes++; } or waituntil( remove(C) ) { C_removes++; } or waituntil( remove(B) ) { B_removes++; } or waituntil( remove(A) ) { A_removes++; } } } catchResume ( channel_closed * e ) { } // continue to remove until would block catch ( channel_closed * e ) {} try { for( ;; ) waituntil( (in << A) ) { A_removes++; } } catchResume ( channel_closed * e ) {} // continue to remove until would block catch ( channel_closed * e ) {} try { for( ;; ) waituntil( (in << B) ) { B_removes++; } } catchResume ( channel_closed * e ) {} // continue to remove until would block catch ( channel_closed * e ) {} try { for( ;; ) waituntil( (in << C) ) { C_removes++; } } catchResume ( channel_closed * e ) {} // continue to remove until would block catch ( channel_closed * e ) {} try { for( ;; ) waituntil( (in << D) ) { D_removes++; } } catchResume ( channel_closed * e ) {} // continue to remove until would block catch ( channel_closed * e ) {} try { for( ;; ) waituntil( (in << E) ) { E_removes++; } } catchResume ( channel_closed * e ) {} // continue to remove until would block catch ( channel_closed * e ) {} try { for( ;; ) waituntil( (in << F) ) { F_removes++; } } catchResume ( channel_closed * e ) {} // continue to remove until would block catch ( channel_closed * e ) {} __atomic_fetch_add( &removes, A_removes + B_removes + C_removes + D_removes + E_removes + F_removes, __ATOMIC_SEQ_CST ); } size_t time = 3, num_times = 10, chan_size = 0, num_thds = 2; int main( int argc, char * argv[] ) { if ( argc == 2 ) time = atoi( argv[1] ); processor p[ num_thds - 1 ]; printf("Start\n"); for ( i; num_times ) { printf("%lu\n", i); A{chan_size}; B{chan_size}; C{chan_size}; D{chan_size}; E{chan_size}; F{chan_size}; { Producer p[ num_thds / 2 ]; Consumer c[ num_thds / 2 ]; sleep(time`s); close(A); close(B); close(C); close(D); close(E); close(F); } if ( inserts != removes ) { printf("\n"); printf("CHECKSUM MISMATCH!! Producer got: %lld, Consumer got: %lld\n", inserts, removes); assert(false); } ^A{}; ^B{}; ^C{}; ^D{}; ^E{}; ^F{}; inserts = 0; removes = 0; } A{5}; B{5}; C{5}; D{5}; E{5}; F{5}; printf("Done\n"); }