Changes in libcfa/src/executor.cfa [8d462e5:a51c0c0]
- File:
-
- 1 edited
-
libcfa/src/executor.cfa (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/executor.cfa
r8d462e5 ra51c0c0 4 4 // buffer. 5 5 6 #include <thread.hfa> 6 7 #include <containers/list.hfa> 7 #include <thread.hfa>8 #include <stdio.h>9 8 10 struct WRequest { // client request, no return 11 void (* action)( void ); 12 DLISTED_MGD_IMPL_IN(WRequest) 9 forall( dtype T | $dlistable(T, T) ) { 10 monitor Buffer { // unbounded buffer 11 dlist( T, T ) queue; // unbounded list of work requests 12 condition delay; 13 }; // Buffer 14 15 void insert( Buffer(T) & mutex buf, T * elem ) with(buf) { 16 dlist( T, T ) * qptr = &queue; // workaround https://cforall.uwaterloo.ca/trac/ticket/166 17 insert_last( *qptr, *elem ); // insert element into buffer 18 signal( delay ); // restart 19 } // insert 20 21 T * remove( Buffer(T) & mutex buf ) with(buf) { 22 dlist( T, T ) * qptr = &queue; // workaround https://cforall.uwaterloo.ca/trac/ticket/166 23 // if ( (*qptr)`is_empty ) wait( delay ); // no request to process ? => wait 24 if ( (*qptr)`is_empty ) return 0p; // no request to process ? => wait 25 return &pop_first( *qptr ); 26 } // remove 27 } // forall 28 29 struct WRequest { // client request, no return 30 void (* action)( void ); 31 DLISTED_MGD_IMPL_IN(WRequest) 13 32 }; // WRequest 14 33 DLISTED_MGD_IMPL_OUT(WRequest) … … 19 38 void doit( WRequest & req ) { req.action(); } 20 39 21 monitor WRBuffer { // unbounded buffer 22 dlist( WRequest, WRequest ) queue; // unbounded list of work requests 23 condition delay; 24 }; // WRBuffer 25 26 void insert( WRBuffer & mutex buf, WRequest * elem ) with(buf) { 27 insert_last( queue, *elem ); // insert element into buffer 28 signal( delay ); // restart 29 } // insert 30 31 WRequest * remove( WRBuffer & mutex buf ) with(buf) { 32 if ( queue`is_empty ) wait( delay ); // no request to process ? => wait 33 return & pop_first( queue ); 34 } // remove 35 36 // Each worker has its own work buffer to reduce contention between client and server. Hence, work requests arrive and 37 // are distributed into buffers in a roughly round-robin order. 40 // Each worker has its own set (when requests buffers > workers) of work buffers to reduce contention between client 41 // and server, where work requests arrive and are distributed into buffers in a roughly round-robin order. 38 42 39 43 thread Worker { 40 WRBuffer * requests; 41 unsigned int start, range; 44 Buffer(WRequest) * requests; 45 WRequest * request; 46 unsigned int start, range; 42 47 }; // Worker 43 48 44 49 void main( Worker & w ) with(w) { 45 for ( int i = 0;; i = (i + 1) % range ) {46 WRequest *request = remove( requests[i + start] );47 if ( ! request ) { yield(); continue; }48 if ( stop( *request ) ) break;49 doit( *request );50 delete( request );51 } // for50 for ( int i = 0;; i = (i + 1) % range ) { 51 request = remove( requests[i + start] ); 52 if ( ! request ) { yield(); continue; } 53 if ( stop( *request ) ) break; 54 doit( *request ); 55 delete( request ); 56 } // for 52 57 } // Worker::main 53 58 54 void ?{}( Worker & worker, cluster * wc, WRBuffer* requests, unsigned int start, unsigned int range ) {55 (*get_thread(worker)){ *wc }; // create on given cluster 56 worker.[requests, start, range] = [requests, start, range];59 void ?{}( Worker & worker, cluster * wc, Buffer(WRequest) * requests, unsigned int start, unsigned int range ) { 60 ((thread &)worker){ *wc }; 61 worker.[requests, request, start, range] = [requests, 0p, start, range]; 57 62 } // ?{} 58 63 64 WRequest * current_request( Worker & worker ) { return worker.request; } 65 59 66 struct Executor { 60 cluster * cluster; // if workers execute on separate cluster 61 processor ** processors; // array of virtual processors adding parallelism for workers 62 WRBuffer * requests; // list of work requests 63 Worker ** workers; // array of workers executing work requests 64 unsigned int nprocessors, nworkers, nmailboxes; // number of mailboxes/workers/processor tasks 65 bool sepClus; // use same or separate cluster for executor 67 cluster * cluster; // if workers execute on separate cluster 68 processor ** processors; // array of virtual processors adding parallelism for workers 69 Buffer(WRequest) * requests; // list of work requests 70 Worker ** workers; // array of workers executing work requests 71 unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues 72 bool sepClus; // use same or separate cluster for executor 73 unsigned int next; // demultiplexed across worker buffers 66 74 }; // Executor 67 75 68 static thread_local unsigned int next; // demultiplexed across worker buffers69 76 unsigned int tickets( Executor & ex ) with(ex) { 70 //return uFetchAdd( next, 1 ) % nmailboxes;71 return next++ % nmailboxes;// no locking, interference randomizes77 //return uFetchAdd( next, 1 ) % nrqueues; 78 return next++ % nrqueues; // no locking, interference randomizes 72 79 } // tickets 73 80 74 void ?{}( Executor & ex, unsigned int np, unsigned int nw, unsigned int n m, bool sc = false ) with(ex) {75 [nprocessors, nworkers, nmailboxes, sepClus] = [np, nw, nm, sc];76 assert( nmailboxes >= nworkers );77 cluster = sepClus ? new( "Executor" ) : active_cluster();78 processors = (processor **)anew( nprocessors );79 requests = (WRBuffer *)anew( nmailboxes );80 workers = (Worker **)anew( nworkers );81 void ?{}( Executor & ex, unsigned int np, unsigned int nw, unsigned int nr, bool sc = false ) with(ex) { 82 [nprocessors, nworkers, nrqueues, sepClus] = [np, nw, nr, sc]; 83 assert( nrqueues >= nworkers ); 84 cluster = sepClus ? new( "Executor" ) : active_cluster(); 85 processors = aalloc( nprocessors ); 86 requests = anew( nrqueues ); 87 workers = aalloc( nworkers ); 81 88 82 for ( i; nprocessors ) {83 processors[ i] = new( *cluster );84 } // for89 for ( i; nprocessors ) { 90 processors[i] = new( *cluster ); 91 } // for 85 92 86 unsigned int reqPerWorker = nmailboxes / nworkers, extras = nmailboxes % nworkers; 87 for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker + ( i < extras ? 1 : 0 ) ) { 88 workers[ i ] = new( cluster, requests, step, reqPerWorker + ( i < extras ? 1 : 0 ) ); 89 } // for 93 unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers; 94 // for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) { 95 for ( i; nworkers : start; 0u ~ @ ~ range : range; ) { 96 range = reqPerWorker + ( i < extras ? 1 : 0 ); 97 workers[i] = new( cluster, requests, start, range ); 98 } // for 90 99 } // ?{} 91 100 92 101 void ?{}( Executor & ex, unsigned int nprocessors, unsigned int nworkers, bool sepClus = false ) { 93 ex{ nprocessors, nworkers, nworkers, sepClus };102 ex{ nprocessors, nworkers, nworkers, sepClus }; 94 103 } 95 104 void ?{}( Executor & ex, unsigned int nprocessors, bool sepClus = false ) { 96 ex{ nprocessors, nprocessors, nprocessors, sepClus };105 ex{ nprocessors, nprocessors, nprocessors, sepClus }; 97 106 } 98 void ?{}( Executor & ex ) { // special for current cluster99 ex{ 0, active_cluster()->nprocessors, false };107 void ?{}( Executor & ex ) { // special for current cluster, no processors added 108 ex{ 0, active_cluster()->nprocessors, false }; 100 109 } 101 110 void ^?{}( Executor & ex ) with(ex) { 102 // Add one sentinel per worker to stop them. Since in destructor, no new work should be queued. Cannot combine next103 // two loops and only have a single sentinel because workers arrive in arbitrary order, so worker1 may take the 104 //single sentinel while waiting for worker 0 to end.111 // Add one sentinel per worker to stop them. Since in destructor, no new external work should be queued. Cannot 112 // combine next two loops and only have a single sentinel because workers arrive in arbitrary order, so worker1 may 113 // take the single sentinel while waiting for worker 0 to end. 105 114 106 WRequest sentinel[nworkers];107 unsigned int reqPerWorker = nmailboxes / nworkers;108 for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) {109 insert( requests[step], &sentinel[i] );// force eventually termination110 } // for111 for ( i; nworkers ) {112 delete( workers[ i] );113 } // for114 for ( i; nprocessors ) {115 delete( processors[ i] );116 } // for115 WRequest sentinel[nworkers]; 116 unsigned int reqPerWorker = nrqueues / nworkers; 117 for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) { 118 insert( requests[step], &sentinel[i] ); // force eventually termination 119 } // for 120 for ( i; nworkers ) { 121 delete( workers[i] ); 122 } // for 123 for ( i; nprocessors ) { 124 delete( processors[i] ); 125 } // for 117 126 118 delete( workers ); 119 delete( requests ); 120 delete( processors ); 121 if ( sepClus ) { delete( cluster ); } 127 free( workers ); 128 // adelete( nrqueues, requests ); 129 for ( i; nrqueues ) ^?{}( requests[i] ); // FIX ME: problem with resolver 130 free( requests ); 131 free( processors ); 132 if ( sepClus ) { delete( cluster ); } 122 133 } // ^?{} 123 134 124 135 void send( Executor & ex, void (* action)( void ) ) { // asynchronous call, no return value 125 WRequest * node = new( action );126 insert( ex.requests[tickets( ex )], node );136 WRequest * node = new( action ); 137 insert( ex.requests[tickets( ex )], node ); 127 138 } // send 139 128 140 129 141 int counter = 0; 130 142 131 void work ie( void ) {132 __atomic_add_fetch( &counter, 1, __ATOMIC_SEQ_CST );133 //fprintf( stderr, "workie\n" );143 void work( void ) { 144 __atomic_add_fetch( &counter, 1, __ATOMIC_SEQ_CST ); 145 // fprintf( stderr, "workie\n" ); 134 146 } 135 147 136 int main( ) {137 { 138 Executor exector;139 for ( i; 3000 ) {140 send( exector, workie );141 if ( i % 100 == 0 ) {142 // fprintf( stderr, "%d\n", i ); 143 yield();144 } 145 } // for146 }147 printf( "%d\n", counter );148 int main( int argc, char * argv[] ) { 149 int times = 1_000_000; 150 if ( argc == 2 ) times = atoi( argv[1] ); 151 processor p[7]; 152 { 153 Executor exector; 154 for ( i; times ) { 155 send( exector, work ); 156 if ( i % 100 == 0 ) yield(); 157 } // for 158 } 159 printf( "%d\n", counter ); 148 160 } 149 161 150 162 // Local Variables: // 163 // tab-width: 4" // 151 164 // compile-command: "cfa executor.cfa" // 152 165 // End: //
Note:
See TracChangeset
for help on using the changeset viewer.