#include #include #include #include #include #include #include #include size_t total_operations = 0; size_t Processors = 1, Tasks = 4; typedef channel( size_t ) Channel; channel( int ) * barWait; channel( int ) * entryWait; int BarrierSize = 4; static inline void closeBarrier() { close(*entryWait); close(*barWait); } static inline void initBarrier() { barWait = malloc(); entryWait = malloc(); (*barWait){ BarrierSize }; (*entryWait){ BarrierSize }; for ( j; BarrierSize ) insert( *entryWait, j ); } static inline void deleteBarrier() { delete(barWait); delete(entryWait); } static inline void barrier() { int ticket = remove( *entryWait ); if ( ticket == BarrierSize - 1 ) { for ( j; BarrierSize - 1 ) insert( *barWait, j ); return; } ticket = remove( *barWait ); // last one out if ( BarrierSize == 1 || ticket == BarrierSize - 2 ) { for ( j; BarrierSize ) insert( *entryWait, j ); } } Channel * chans; owner_lock o; thread Task { size_t id; }; static inline void ?{}( Task & p, size_t i, cluster & clu ) { ((thread &)p){ clu }; p.id = i; } void main(Task & this) with(this) { size_t runs = 0; try { for ( ;; ) { // publish for ( i; Tasks ) { insert(chans[id], i); } // subscribe for ( i; Tasks ) { remove( chans[i] ); } barrier(); runs++; } } catch ( channel_closed * e ) { } lock(o); total_operations += runs; // sout | runs; unlock(o); } int main( int argc, char * argv[] ) { switch ( argc ) { case 3: if ( strcmp( argv[2], "d" ) != 0 ) { // default ? Tasks = ato( argv[2] ); if ( Tasks < 1 ) fallthrough default; } // if case 2: if ( strcmp( argv[1], "d" ) != 0 ) { // default ? Processors = ato( argv[1] ); if ( Processors < 1 ) fallthrough default; } // if case 1: // use defaults break; default: exit | "Usage: " | argv[0] | " [ processors (> 0) | 'd' (default " | Processors | ") ] [ Tasks (> 0) | 'd' (default " | Tasks | ") ]" ; } // switch BarrierSize = Tasks; size_t Clusters = 1; // create a cluster cluster clus[Clusters]; processor * proc[Processors]; // setup processors for ( i; Processors ) (*(proc[i] = malloc())){clus[i % Clusters]}; // setup pub/sub chans chans = aalloc( Tasks ); for ( i; Tasks ) chans[i]{ Tasks }; // setup barrier initBarrier(); // sout | "Processors: " | Processors | " ProdsPerChan: " | Producers | " ConsPerChan: " | Consumers | "Channels: " | Channels | " Channel Size: " | ChannelSize; sout | "start"; Task * t[Tasks]; // create tasks for ( i; Tasks ) (*(t[i] = malloc())){ i, clus[i % Clusters] }; sleep(10`s); closeBarrier(); for ( i; Tasks ) close( chans[i] ); for ( i; Tasks ) { delete(t[i]); } deleteBarrier(); // sout | total_operations; for ( i; Processors ) { delete(proc[i]); } adelete( chans ); sout | "done"; return 0; }