#include #include #include #include #include #include #include #include size_t total_operations = 0; size_t Processors = 1, Tasks = 1; typedef channel( size_t ) Channel; channel( int ) * barWait; channel( int ) * entryWait; int BarrierSize = 1; static inline void flushBarrier() { for ( j; BarrierSize ) { insert( *entryWait, -1 ); insert( *barWait, -1 ); } } static inline void initBarrier() { for ( j; BarrierSize ) insert( *entryWait, j ); } static inline void barrier() { int ticket = remove( *entryWait ); if ( ticket == -1 ) { insert( *entryWait, -1 ); return; } if ( ticket == BarrierSize - 1 ) { for ( j; BarrierSize - 1 ) insert( *barWait, j ); return; } ticket = remove( *barWait ); if ( ticket == -1 ) { insert( *barWait, -1 ); return; } // last one out if ( BarrierSize == 1 || ticket == BarrierSize - 2 ) { for ( j; BarrierSize ) insert( *entryWait, j ); } } Channel ** chans; owner_lock o; bool done = false; size_t tasks_done = 0; thread Task { size_t id; }; static inline void ?{}( Task & p, size_t i, cluster & clu ) { ((thread &)p){ clu }; p.id = i; } void main(Task & this) with(this) { size_t runs = 0; size_t my_id = id; for ( ;; ) { if ( done ) break; // publish for ( i; Tasks ) { insert(*chans[my_id], i); } // subscribe for ( i; Tasks ) { remove( *chans[i] ); } barrier(); runs++; } lock(o); total_operations += runs; // sout | "P: " | runs; unlock(o); } int main( int argc, char * argv[] ) { switch ( argc ) { case 3: if ( strcmp( argv[2], "d" ) != 0 ) { // default ? Tasks = atoi( argv[2] ); if ( Tasks < 1 ) goto Usage; } // if case 2: if ( strcmp( argv[1], "d" ) != 0 ) { // default ? Processors = atoi( argv[1] ); if ( Processors < 1 ) goto Usage; } // if case 1: // use defaults break; default: Usage: sout | "Usage: " | argv[0] | " [ processors (> 0) | 'd' (default " | Processors | ") ] [ Tasks (> 0) | 'd' (default " | Tasks | ") ]" ; exit( EXIT_FAILURE ); } // switch Tasks = Processors; BarrierSize = Tasks; size_t Clusters = 1; // create a cluster cluster clus[Clusters]; processor * proc[Processors]; for ( i; Processors ) { (*(proc[i] = malloc())){clus[i % Clusters]}; } chans = aalloc( Tasks ); for ( i; Tasks ) { chans[i] = malloc(); (*chans[i]){ 2 * Tasks }; } // setup barrier channel(int) entry{ 2 * BarrierSize }; channel(int) wait{ 2 * BarrierSize }; entryWait = &entry; barWait = &wait; initBarrier(); // sout | "Processors: " | Processors | " ProdsPerChan: " | Producers | " ConsPerChan: " | Consumers | "Channels: " | Channels | " Channel Size: " | ChannelSize; // sout | "start"; Task * t[Tasks]; for ( i; Tasks ) { (*(t[i] = malloc())){ i, clus[i % Clusters] }; } sleep(10`s); done = true; for ( i; Tasks ) for ( j; Tasks ) insert(*chans[i], j); flushBarrier(); for ( i; Tasks ) { delete(t[i]); } sout | total_operations; // print_stats_now( *active_cluster(), CFA_STATS_READY_Q); for ( i; Processors ) { delete(proc[i]); } adelete( chans ); // sout | "done"; return 0; }