Changes in libcfa/src/executor.cfa [a51c0c0:cca568e]
- File:
-
- 1 edited
-
libcfa/src/executor.cfa (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/executor.cfa
ra51c0c0 rcca568e 4 4 // buffer. 5 5 6 #include <bits/containers.hfa> 6 7 #include <thread.hfa> 7 #include < containers/list.hfa>8 #include <stdio.h> 8 9 9 forall( dtype T | $dlistable(T, T) ) { 10 monitor Buffer { // unbounded buffer 11 dlist( T, T ) queue; // unbounded list of work requests 12 condition delay; 13 }; // Buffer 10 forall( dtype T ) 11 monitor Buffer { // unbounded buffer 12 __queue_t( T ) queue; // unbounded list of work requests 13 condition delay; 14 }; // Buffer 15 forall( dtype T | is_node(T) ) { 16 void insert( Buffer( T ) & mutex buf, T * elem ) with(buf) { 17 append( queue, elem ); // insert element into buffer 18 signal( delay ); // restart 19 } // insert 14 20 15 void insert( Buffer(T) & mutex buf, T * elem) with(buf) {16 dlist( T, T ) * qptr = &queue; // workaround https://cforall.uwaterloo.ca/trac/ticket/16617 insert_last( *qptr, *elem ); // insert element into buffer 18 signal( delay ); // restart 19 } // insert 21 T * remove( Buffer( T ) & mutex buf ) with(buf) { 22 if ( queue.head != 0 ) wait( delay ); // no request to process ? => wait 23 // return pop_head( queue ); 24 } // remove 25 } // distribution 20 26 21 T * remove( Buffer(T) & mutex buf ) with(buf) { 22 dlist( T, T ) * qptr = &queue; // workaround https://cforall.uwaterloo.ca/trac/ticket/166 23 // if ( (*qptr)`is_empty ) wait( delay ); // no request to process ? => wait 24 if ( (*qptr)`is_empty ) return 0p; // no request to process ? => wait 25 return &pop_first( *qptr ); 26 } // remove 27 } // forall 27 struct WRequest { // client request, no return 28 void (* action)( void ); 29 WRequest * next; // intrusive queue field 30 }; // WRequest 28 31 29 struct WRequest { // client request, no return 30 void (* action)( void ); 31 DLISTED_MGD_IMPL_IN(WRequest) 32 }; // WRequest 33 DLISTED_MGD_IMPL_OUT(WRequest) 34 35 void ?{}( WRequest & req ) with(req) { action = 0; } 36 void ?{}( WRequest & req, void (* action)( void ) ) with(req) { req.action = action; } 32 WRequest *& get_next( WRequest & this ) { return this.next; } 33 void ?{}( WRequest & req ) with(req) { action = 0; next = 0; } 34 void ?{}( WRequest & req, void (* action)( void ) ) with(req) { req.action = action; next = 0; } 37 35 bool stop( WRequest & req ) { return req.action == 0; } 38 36 void doit( WRequest & req ) { req.action(); } 39 37 40 // Each worker has its own set (when requests buffers > workers) of work buffers to reduce contention between client41 // a nd server, where work requests arrive and are distributed into buffers in a roughly round-robin order.38 // Each worker has its own work buffer to reduce contention between client and server. Hence, work requests arrive and 39 // are distributed into buffers in a roughly round-robin order. 42 40 43 41 thread Worker { 44 Buffer(WRequest) * requests; 45 WRequest * request; 46 unsigned int start, range; 42 Buffer( WRequest ) * requests; 43 unsigned int start, range; 47 44 }; // Worker 48 45 49 46 void main( Worker & w ) with(w) { 50 for ( int i = 0;; i = (i + 1) % range ) {51 request = remove( requests[i + start] );52 if ( ! request ) { yield(); continue; }53 if ( stop( *request ) ) break;54 doit( *request );55 delete( request );56 } // for47 for ( int i = 0;; i = (i + 1) % range ) { 48 WRequest * request = remove( requests[i + start] ); 49 if ( ! request ) { yield(); continue; } 50 if ( stop( *request ) ) break; 51 doit( *request ); 52 delete( request ); 53 } // for 57 54 } // Worker::main 58 55 59 void ?{}( Worker & worker, cluster * wc, Buffer( WRequest) * requests, unsigned int start, unsigned int range ) {60 ((thread &)worker){ *wc }; 61 worker.[requests, request, start, range] = [requests, 0p, start, range];56 void ?{}( Worker & worker, cluster * wc, Buffer( WRequest ) * requests, unsigned int start, unsigned int range ) { 57 (*get_thread(worker)){ *wc }; // create on given cluster 58 worker.[requests, start, range] = [requests, start, range]; 62 59 } // ?{} 63 60 64 WRequest * current_request( Worker & worker ) { return worker.request; }65 66 61 struct Executor { 67 cluster * cluster; // if workers execute on separate cluster 68 processor ** processors; // array of virtual processors adding parallelism for workers 69 Buffer(WRequest) * requests; // list of work requests 70 Worker ** workers; // array of workers executing work requests 71 unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues 72 bool sepClus; // use same or separate cluster for executor 73 unsigned int next; // demultiplexed across worker buffers 62 cluster * cluster; // if workers execute on separate cluster 63 processor ** processors; // array of virtual processors adding parallelism for workers 64 Buffer( WRequest ) * requests; // list of work requests 65 Worker ** workers; // array of workers executing work requests 66 unsigned int nprocessors, nworkers, nmailboxes; // number of mailboxes/workers/processor tasks 67 bool sepClus; // use same or separate cluster for executor 74 68 }; // Executor 75 69 70 static thread_local unsigned int next; // demultiplexed across worker buffers 76 71 unsigned int tickets( Executor & ex ) with(ex) { 77 //return uFetchAdd( next, 1 ) % nrqueues;78 return next++ % nrqueues;// no locking, interference randomizes72 //return uFetchAdd( next, 1 ) % nmailboxes; 73 return next++ % nmailboxes; // no locking, interference randomizes 79 74 } // tickets 80 75 81 void ?{}( Executor & ex, unsigned int np, unsigned int nw, unsigned int n r, bool sc = false ) with(ex) {82 [nprocessors, nworkers, nrqueues, sepClus] = [np, nw, nr, sc];83 assert( nrqueues >= nworkers );84 cluster = sepClus ? new( "Executor" ) : active_cluster();85 processors = aalloc( nprocessors );86 requests = anew( nrqueues );87 workers = aalloc( nworkers );76 void ?{}( Executor & ex, unsigned int np, unsigned int nw, unsigned int nm, bool sc = false ) with(ex) { 77 [nprocessors, nworkers, nmailboxes, sepClus] = [np, nw, nm, sc]; 78 assert( nmailboxes >= nworkers ); 79 cluster = sepClus ? new( "Executor" ) : active_cluster(); 80 processors = (processor **)anew( nprocessors ); 81 requests = anew( nmailboxes ); 82 workers = (Worker **)anew( nworkers ); 88 83 89 for ( i; nprocessors ) {90 processors[i] = new( *cluster );91 } // for84 for ( i; nprocessors ) { 85 processors[ i ] = new( *cluster ); 86 } // for 92 87 93 unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers; 94 // for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) { 95 for ( i; nworkers : start; 0u ~ @ ~ range : range; ) { 96 range = reqPerWorker + ( i < extras ? 1 : 0 ); 97 workers[i] = new( cluster, requests, start, range ); 98 } // for 88 unsigned int reqPerWorker = nmailboxes / nworkers, extras = nmailboxes % nworkers; 89 for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker + ( i < extras ? 1 : 0 ) ) { 90 workers[ i ] = new( cluster, requests, step, reqPerWorker + ( i < extras ? 1 : 0 ) ); 91 } // for 99 92 } // ?{} 100 93 101 94 void ?{}( Executor & ex, unsigned int nprocessors, unsigned int nworkers, bool sepClus = false ) { 102 ex{ nprocessors, nworkers, nworkers, sepClus };95 ex{ nprocessors, nworkers, nworkers, sepClus }; 103 96 } 104 97 void ?{}( Executor & ex, unsigned int nprocessors, bool sepClus = false ) { 105 ex{ nprocessors, nprocessors, nprocessors, sepClus };98 ex{ nprocessors, nprocessors, nprocessors, sepClus }; 106 99 } 107 void ?{}( Executor & ex ) { // special for current cluster, no processors added108 ex{ 0, active_cluster()->nprocessors, false };100 void ?{}( Executor & ex ) { // special for current cluster 101 ex{ 0, active_cluster()->nprocessors, false }; 109 102 } 110 103 void ^?{}( Executor & ex ) with(ex) { 111 // Add one sentinel per worker to stop them. Since in destructor, no new external work should be queued. Cannot112 // combine next two loops and only have a single sentinel because workers arrive in arbitrary order, so worker1 may 113 // take thesingle sentinel while waiting for worker 0 to end.104 // Add one sentinel per worker to stop them. Since in destructor, no new work should be queued. Cannot combine next 105 // two loops and only have a single sentinel because workers arrive in arbitrary order, so worker1 may take the 106 // single sentinel while waiting for worker 0 to end. 114 107 115 WRequest sentinel[nworkers];116 unsigned int reqPerWorker = nrqueues / nworkers;117 for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) {118 insert( requests[step], &sentinel[i] );// force eventually termination119 } // for120 for ( i; nworkers ) {121 delete( workers[i] );122 } // for123 for ( i; nprocessors ) {124 delete( processors[i] );125 } // for108 WRequest sentinel[nworkers]; 109 unsigned int reqPerWorker = nmailboxes / nworkers; 110 for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) { 111 insert( requests[step], &sentinel[i] ); // force eventually termination 112 } // for 113 for ( i; nworkers ) { 114 delete( workers[ i ] ); 115 } // for 116 for ( i; nprocessors ) { 117 delete( processors[ i ] ); 118 } // for 126 119 127 free( workers ); 128 // adelete( nrqueues, requests ); 129 for ( i; nrqueues ) ^?{}( requests[i] ); // FIX ME: problem with resolver 130 free( requests ); 131 free( processors ); 132 if ( sepClus ) { delete( cluster ); } 120 delete( workers ); 121 delete( requests ); 122 delete( processors ); 123 if ( sepClus ) { delete( cluster ); } 133 124 } // ^?{} 134 125 135 126 void send( Executor & ex, void (* action)( void ) ) { // asynchronous call, no return value 136 WRequest * node = new( action );137 insert( ex.requests[tickets( ex )], node );127 WRequest * node = new( action ); 128 insert( ex.requests[tickets( ex )], node ); 138 129 } // send 139 140 130 141 131 int counter = 0; 142 132 143 void work ( void ) {144 __atomic_add_fetch( &counter, 1, __ATOMIC_SEQ_CST );145 //fprintf( stderr, "workie\n" );133 void workie( void ) { 134 __atomic_add_fetch( &counter, 1, __ATOMIC_SEQ_CST ); 135 // fprintf( stderr, "workie\n" ); 146 136 } 147 137 148 int main( int argc, char * argv[] ) { 149 int times = 1_000_000; 150 if ( argc == 2 ) times = atoi( argv[1] ); 151 processor p[7]; 152 { 153 Executor exector; 154 for ( i; times ) { 155 send( exector, work ); 156 if ( i % 100 == 0 ) yield(); 157 } // for 158 } 159 printf( "%d\n", counter ); 138 int main() { 139 { 140 Executor exector; 141 for ( i; 3000 ) { 142 send( exector, workie ); 143 if ( i % 100 ) yield(); 144 } // for 145 } 146 printf( "%d\n", counter ); 160 147 } 161 148 162 149 // Local Variables: // 163 // tab-width: 4" //164 150 // compile-command: "cfa executor.cfa" // 165 151 // End: //
Note:
See TracChangeset
for help on using the changeset viewer.