#include #include #include #include #include "channel.hfa" #include #include #include size_t Processors = 1, Channels = 4, Producers = 1, Consumers = 1, ChannelSize = 128; owner_lock o; size_t total_operations = 0; size_t cons_check = 0, prod_check = 0; channel( size_t ) * channels; thread Consumer {}; bool cons_done = false, prod_done = false; volatile int cons_done_count = 0; void getRandArray( int * chanIndices ) { for ( int i = 0; i < Channels; i++ ) { chanIndices[i] = i; } for ( int i = 0; i < Channels; i++ ) { int loc_1 = prng() % Channels; int loc_2 = prng() % Channels; int temp = chanIndices[ loc_1 ]; chanIndices[ loc_1 ] = chanIndices[ loc_2 ]; chanIndices[ loc_2 ] = temp; } } void main(Consumer & this) { size_t i = 0; size_t runs = 0; size_t my_check = 0; int chanIndices[Channels]; getRandArray( chanIndices ); for ( ;;i++ ) { if ( cons_done ) break; size_t j = remove( channels[ chanIndices[ i % Channels ] ] ); my_check = my_check ^ j; if ( !prod_done ) runs++; } lock(o); total_operations += runs; cons_done_count++; cons_check = cons_check ^ my_check; // sout | "Cons: " | runs; unlock(o); } thread Producer {}; void main(Producer & this) { size_t i = 0; size_t runs = 0; size_t my_check = 0; int chanIndices[Channels]; getRandArray( chanIndices ); for ( ;;i++ ) { if ( prod_done ) break; insert( channels[ chanIndices[ i % Channels ] ], i ); my_check = my_check ^ i; runs++; } lock(o); total_operations += runs; prod_check = prod_check ^ my_check; // sout | "Prods: " | runs; unlock(o); } int main( int argc, char *argv[] ) { switch( argc ) { case 4: if ( strcmp( argv[3], "d" ) != 0 ) { // default ? if ( atoi( argv[3] ) < 1 ) goto Usage; ChannelSize = atoi( argv[3] ); } // if case 3: if ( strcmp( argv[2], "d" ) != 0 ) { // default ? if ( atoi( argv[2] ) < 1 ) goto Usage; Channels = atoi( argv[2] ); } // if case 2: if ( strcmp( argv[1], "d" ) != 0 ) { // default ? if ( atoi( argv[1] ) < 1 ) goto Usage; Processors = atoi( argv[1] ); } // if case 1: // use defaults break; default: Usage: sout | "Usage: " | argv[0] | " [ processors > 0 | d ]" | " [ producers > 0 | d ]" | " [ consumers > 0 | d ]" | " [ channels > 0 | d ]"; exit( EXIT_FAILURE ); } processor p[Processors - 1]; channels = anew( Channels ); Producers = Processors / 2; Consumers = Processors / 2; // sout | "Processors: " | Processors | " Producers: " | Producers | " Consumers: " | Consumers | "Channels: " | Channels | " Channel Size: " | ChannelSize; for ( i; Channels ) { channels[i]{ ChannelSize }; } // sout | "start"; { Consumer c[Consumers]; { Producer p[Producers]; sleep(10`s); prod_done = true; // sout | "sleep"; } // sout | "prods"; cons_done = true; // for ( i; Channels ) { // if ( get_count( channels[i] ) < Consumers ){ // for ( j; Consumers ) insert( channels[i], 0 ); // } // } while( cons_done_count != Consumers ) { for ( i; Channels ) { if ( has_waiters( channels[i] ) ){ insert( channels[i], 0 ); } } } } // sout | "cons"; for ( i; Channels ) { for ( ;; ) { if ( get_count( channels[i] ) > 0 ) { size_t j = remove( channels[ i ] ); cons_check = cons_check ^ j; } else break; } } adelete( channels ); if ( cons_check != prod_check ) sout | "CHECKSUM MISMATCH !!!"; // print_stats_now( *active_cluster(), CFA_STATS_READY_Q); // for ( i; Processors ) { // delete(proc[i]); // } sout | total_operations; // print_stats_now( *active_cluster(), CFA_STATS_READY_Q ); return 0; }