#include #include #include #include #include #include future( long long int ) F; const long long int val_to_deliver = 42; channel(long long int) A, C; multiple_acquisition_lock B; volatile long long int b_val = 0; volatile long long int old_b_val = -1; volatile bool done = false; long long int globalTotal = 0; void consume_b_val( long long int & myTotal ) { if ( b_val != old_b_val ) { myTotal += b_val; old_b_val++; } } void produce_b_val( long long int & myTotal ) { if ( b_val == old_b_val ) { myTotal += b_val; b_val++; } } thread Server1 {}; void main( Server1 & this ) { long long int a, b, c, i = 0, myTotal = 0; for( ;;i++ ) { when( i % 2 == 0 ) waituntil( a << A ) { myTotal += a; } or when( i % 4 < 2 ) waituntil( B ) { consume_b_val( myTotal ); } or waituntil( c << C ) { if ( c == -1 ) break; myTotal += c; } or when( i % 3 ) waituntil( timeout( 1`ms ) ) {} or when( i % 8 < 4 ) else {} } __atomic_fetch_add( &globalTotal, myTotal, __ATOMIC_SEQ_CST ); } thread Drainer {}; // ensures that the changing when states of Server1 don't result in a deadlock void main( Drainer & this ) { long long int a, b, c, myTotal = 0; for( ;; ) { waituntil( F ) { myTotal += get(F); reset( F ); } or waituntil( a << A ) { myTotal += a; } or waituntil( c << C ) { if ( c == -1 ) break; myTotal += c; } or waituntil( B ) { consume_b_val( myTotal ); } or waituntil( timeout( 100`ns ) ) { } } __atomic_fetch_add( &globalTotal, myTotal, __ATOMIC_SEQ_CST ); } thread Churner {}; // performs non-waituntil try insert/remove operations to add churn/interference void main( Churner & this ) { long long int out, myTotal = 0; bool success; while( !done ) { try_insert( A, 0 ); if ( try_lock( B ) ) { consume_b_val( myTotal ); unlock( B ); } mutex( B ) { consume_b_val( myTotal ); } try_insert( C, 0 ); [out, success] = try_remove( A ); if ( success ) myTotal += out; [out, success] = try_remove( C ); if ( success ) myTotal += out; } __atomic_fetch_add( &globalTotal, myTotal, __ATOMIC_SEQ_CST ); } size_t numtimes = 5000; size_t numServers = 3; int main( int argc, char * argv[] ) { if ( argc == 2 ) numtimes = atoi( argv[1] ); processor p[numServers + 2]; A{5}; C{5}; long long int total = 0; printf("start\n"); { Server1 s[numServers]; Drainer d; { Churner c; for( long long int j = 0; j < numtimes; j++ ) { when( j % 2 == 0 ) waituntil( A << j ) { total += j; } or when( j % 4 < 2 ) waituntil( B ) { produce_b_val( total ); } and when( j % 8 < 4 ) waituntil( C << j ) { total += j; } and waituntil( timeout( 1`ns ) ) {} if ( j == numtimes / 2 ) fulfil( F, val_to_deliver ); } done = true; printf("terminating churner\n"); } printf("waiting for empty channels\n"); while( get_count( A ) > 0 || get_count( C ) > 0 ) { } printf("sending sentinels\n"); for ( i; numServers + 1 ) insert( C, -1 ); printf("joining servers\n"); } if ( b_val == old_b_val ) total += b_val; // handle if last value wasn't produced if ( !available( F ) ) total += val_to_deliver; // handle if future was consumed if ( total != globalTotal ) printf("CHECKSUM MISMATCH!! Main thread got %lld, server sum is %lld\n", total, globalTotal); printf("done\n"); }