#include #include #include #include #include "channel.hfa" #include #include #include size_t Processors = 1, Channels = 4, Producers = 2, Consumers = 2, ChannelSize = 128; owner_lock o; size_t total_operations = 0; size_t cons_check = 0, prod_check = 0; channel( size_t ) * channels; thread Consumer {}; void getRandArray( int * chanIndices ) { for ( int i = 0; i < Channels; i++ ) { chanIndices[i] = i; } for ( int i = 0; i < Channels; i++ ) { int loc_1 = prng() % Channels; int loc_2 = prng() % Channels; int temp = chanIndices[ loc_1 ]; chanIndices[ loc_1 ] = chanIndices[ loc_2 ]; chanIndices[ loc_2 ] = temp; } } void main(Consumer & this) { size_t i = 0; size_t runs = 0; size_t my_check = 0; int chanIndices[Channels]; getRandArray( chanIndices ); try { for ( ;;i++ ) { size_t j = remove( channels[ chanIndices[ i % Channels ] ] ); my_check = my_check ^ j; runs++; } } catch ( channel_closed * e ) {} // flush out rest of channels for ( i; Channels ) { try { for ( ;; ) { size_t j = remove( channels[ i ] ); my_check = my_check ^ j; runs++; } } catchResume ( channel_closed * e ) {} // continue to remove until would block catch ( channel_closed * e ) {} } lock(o); total_operations += runs; cons_check = cons_check ^ my_check; // sout | "Cons: " | runs; unlock(o); } thread Producer {}; void main(Producer & this) { size_t i = 0; size_t runs = 0; size_t my_check = 0; int chanIndices[Channels]; getRandArray( chanIndices ); try { for ( ;;i++ ) { insert( channels[ chanIndices[ i % Channels ] ], i ); my_check = my_check ^ i; runs++; } } catch ( channel_closed * e ) {} lock(o); total_operations += runs; prod_check = prod_check ^ my_check; // sout | "Prods: " | runs; unlock(o); } int main( int argc, char *argv[] ) { switch( argc ) { case 4: if ( strcmp( argv[3], "d" ) != 0 ) { // default ? if ( atoi( argv[3] ) < 1 ) goto Usage; ChannelSize = atoi( argv[3] ); } // if case 3: if ( strcmp( argv[2], "d" ) != 0 ) { // default ? if ( atoi( argv[2] ) < 1 ) goto Usage; Channels = atoi( argv[2] ); } // if case 2: if ( strcmp( argv[1], "d" ) != 0 ) { // default ? if ( atoi( argv[1] ) < 1 ) goto Usage; Processors = atoi( argv[1] ); } // if case 1: // use defaults break; default: Usage: sout | "Usage: " | argv[0] | " [ processors > 0 | d ]" | " [ producers > 0 | d ]" | " [ consumers > 0 | d ]" | " [ channels > 0 | d ]"; exit( EXIT_FAILURE ); } processor p[Processors - 1]; channels = aalloc( Channels ); // sout | "Processors: " | Processors | " Producers: " | Producers | " Consumers: " | Consumers | "Channels: " | Channels | " Channel Size: " | ChannelSize; for ( i; Channels ) channels[i]{ ChannelSize }; sout | "start"; { Consumer c[Consumers]; { Producer p[Producers]; sleep(10`s); for ( i; Channels ) close( channels[i] ); } } adelete( channels ); if ( cons_check != prod_check ) sout | "CHECKSUM MISMATCH !!!"; // sout | total_operations; // print_stats_now( *active_cluster(), CFA_STATS_READY_Q ); sout | "done"; return 0; }