#include #include #include #include #include #include #include #include size_t Sets = 1, ChannelSize = 100, Channels = 2; channel(size_t) * chans; size_t globalTotal = 0; int cons_counter = 0, prod_counter = 0; thread SelectConsumer {}; void main( SelectConsumer & this ) { size_t val, i = 0; try { for(;; i++ ) { waituntil( val << chans[0] ) {} or waituntil( val << chans[1] ) {} } } catch( channel_closed * e ) {} __atomic_fetch_add( &globalTotal, i, __ATOMIC_SEQ_CST ); } thread SelectProducer {}; void main( SelectProducer & this ) { try { for( size_t i = 0;; i++ ) { waituntil( i >> chans[0] ) {} or waituntil( i >> chans[1] ) {} } } catch( channel_closed * e ) {} } thread Consumer {}; void main( Consumer & this ) { int idx = __atomic_fetch_add( &cons_counter, 1, __ATOMIC_SEQ_CST ) % Channels; size_t val, i = 0; try { for(;; i++ ) { remove( chans[idx] ); } } catch( channel_closed * e ) {} __atomic_fetch_add( &globalTotal, i, __ATOMIC_SEQ_CST ); } thread Producer {}; void main( Producer & this ) { int idx = __atomic_fetch_add( &prod_counter, 1, __ATOMIC_SEQ_CST ) % Channels; try { for( size_t i = 0;; i++ ) { insert( chans[idx], i ); } } catch( channel_closed * e ) {} } int main( int argc, char * argv[] ) { switch ( argc ) { case 3: if ( strcmp( argv[2], "d" ) != 0 ) { // default ? ChannelSize = atoi( argv[2] ); } // if case 2: if ( strcmp( argv[1], "d" ) != 0 ) { // default ? Sets = atoi( argv[1] ); if ( Sets < 1 ) goto Usage; } // if case 1: // use defaults break; default: Usage: sout | "Usage: " | argv[0] | " [ sets (> 0) | 'd' (default " | Sets | ") ] [ channel size (>= 0) | 'd' (default " | ChannelSize | ") ]" ; exit( EXIT_FAILURE ); } // switch processor p[Sets * 2 + Sets * Channels * 2 - 1]; chans = aalloc( Channels ); for ( i; Channels ) chans[i]{ ChannelSize }; { Producer p[Sets * Channels]; SelectProducer sp[Sets]; Consumer c[Sets * Channels]; SelectConsumer sc[Sets]; sleep(10`s); for ( i; Channels ) close( chans[i] ); } adelete( chans ); printf("%zu\n", globalTotal); }