Changeset 089ee6b
- Timestamp:
- May 24, 2020, 5:05:13 PM (4 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- a51c0c0
- Parents:
- cbbd8fd7
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/executor.cfa
rcbbd8fd7 r089ee6b 6 6 #include <containers/list.hfa> 7 7 #include <thread.hfa> 8 #include <stdio.h> 8 //#include <malloc.h> // trace 9 9 10 struct WRequest { // client request, no return 11 void (* action)( void ); 12 DLISTED_MGD_IMPL_IN(WRequest) 10 forall( dtype T | $dlistable(T, T) ) { 11 monitor Buffer { // unbounded buffer 12 dlist( T, T ) queue; // unbounded list of work requests 13 condition delay; 14 }; // Buffer 15 16 void insert( Buffer(T) & mutex buf, T * elem ) with(buf) { 17 dlist( T, T ) * qptr = &queue; // workaround https://cforall.uwaterloo.ca/trac/ticket/166 18 insert_last( *qptr, *elem ); // insert element into buffer 19 signal( delay ); // restart 20 } // insert 21 22 T * remove( Buffer(T) & mutex buf ) with(buf) { 23 dlist( T, T ) * qptr = &queue; // workaround https://cforall.uwaterloo.ca/trac/ticket/166 24 if ( (*qptr)`is_empty ) wait( delay ); // no request to process ? => wait 25 return &pop_first( *qptr ); 26 } // remove 27 28 // void ?{}( Buffer(T) & ) {} 29 // void ^?{}( Buffer(T) & mutex ) {} 30 } // forall 31 32 struct WRequest { // client request, no return 33 void (* action)( void ); 34 DLISTED_MGD_IMPL_IN(WRequest) 13 35 }; // WRequest 14 36 DLISTED_MGD_IMPL_OUT(WRequest) … … 19 41 void doit( WRequest & req ) { req.action(); } 20 42 21 monitor WRBuffer { // unbounded buffer 22 dlist( WRequest, WRequest ) queue; // unbounded list of work requests 23 condition delay; 24 }; // WRBuffer 25 26 void insert( WRBuffer & mutex buf, WRequest * elem ) with(buf) { 27 insert_last( queue, *elem ); // insert element into buffer 28 signal( delay ); // restart 29 } // insert 30 31 WRequest * remove( WRBuffer & mutex buf ) with(buf) { 32 if ( queue`is_empty ) wait( delay ); // no request to process ? => wait 33 return & pop_first( queue ); 34 } // remove 35 36 // Each worker has its own work buffer to reduce contention between client and server. Hence, work requests arrive and 37 // are distributed into buffers in a roughly round-robin order. 43 // Each worker has its own set (when requests buffers > workers) of work buffers to reduce contention between client 44 // and server, where work requests arrive and are distributed into buffers in a roughly round-robin order. 38 45 39 46 thread Worker { 40 WRBuffer * requests; 41 unsigned int start, range; 47 Buffer(WRequest) * requests; 48 WRequest * request; 49 unsigned int start, range; 42 50 }; // Worker 43 51 44 52 void main( Worker & w ) with(w) { 45 46 WRequest *request = remove( requests[i + start] );47 48 49 doit( *request );50 delete( request );51 53 for ( int i = 0;; i = (i + 1) % range ) { 54 request = remove( requests[i + start] ); 55 if ( ! request ) { yield(); continue; } 56 if ( stop( *request ) ) break; 57 doit( *request ); 58 delete( request ); 59 } // for 52 60 } // Worker::main 53 61 54 void ?{}( Worker & worker, cluster * wc, WRBuffer* requests, unsigned int start, unsigned int range ) {55 (*get_thread(worker)){ *wc }; // create on given cluster 56 worker.[requests, start, range] = [requests, start, range];62 void ?{}( Worker & worker, cluster * wc, Buffer(WRequest) * requests, unsigned int start, unsigned int range ) { 63 ((thread &)worker){ *wc }; 64 worker.[requests, request, start, range] = [requests, 0p, start, range]; 57 65 } // ?{} 58 66 67 WRequest * current_request( Worker & worker ) { return worker.request; } 68 59 69 struct Executor { 60 cluster * cluster;// if workers execute on separate cluster61 processor ** processors;// array of virtual processors adding parallelism for workers62 WRBuffer * requests;// list of work requests63 Worker ** workers;// array of workers executing work requests64 unsigned int nprocessors, nworkers, nmailboxes; // number of mailboxes/workers/processor tasks65 bool sepClus;// use same or separate cluster for executor70 cluster * cluster; // if workers execute on separate cluster 71 processor ** processors; // array of virtual processors adding parallelism for workers 72 Buffer(WRequest) * requests; // list of work requests 73 Worker ** workers; // array of workers executing work requests 74 unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues 75 bool sepClus; // use same or separate cluster for executor 66 76 }; // Executor 67 77 68 static thread_local unsigned int next; // demultiplexed across worker buffers 78 static thread_local unsigned int next; // demultiplexed across worker buffers 79 69 80 unsigned int tickets( Executor & ex ) with(ex) { 70 //return uFetchAdd( next, 1 ) % nmailboxes;71 return next++ % nmailboxes;// no locking, interference randomizes81 //return uFetchAdd( next, 1 ) % nrqueues; 82 return next++ % nrqueues; // no locking, interference randomizes 72 83 } // tickets 73 84 74 void ?{}( Executor & ex, unsigned int np, unsigned int nw, unsigned int n m, bool sc = false ) with(ex) {75 [nprocessors, nworkers, nmailboxes, sepClus] = [np, nw, nm, sc];76 assert( nmailboxes >= nworkers );77 78 processors = (processor **)anew( nprocessors );79 requests = (WRBuffer *)anew( nmailboxes );80 workers = (Worker **)anew( nworkers );85 void ?{}( Executor & ex, unsigned int np, unsigned int nw, unsigned int nr, bool sc = false ) with(ex) { 86 [nprocessors, nworkers, nrqueues, sepClus] = [np, nw, nr, sc]; 87 assert( nrqueues >= nworkers ); 88 cluster = sepClus ? new( "Executor" ) : active_cluster(); 89 processors = aalloc( nprocessors ); 90 requests = anew( nrqueues ); 91 workers = aalloc( nworkers ); 81 92 82 83 processors[ i] = new( *cluster );84 93 for ( i; nprocessors ) { 94 processors[i] = new( *cluster ); 95 } // for 85 96 86 unsigned int reqPerWorker = nmailboxes / nworkers, extras = nmailboxes % nworkers; 87 for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker + ( i < extras ? 1 : 0 ) ) { 88 workers[ i ] = new( cluster, requests, step, reqPerWorker + ( i < extras ? 1 : 0 ) ); 89 } // for 97 unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers; 98 // for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) { 99 for ( i; nworkers : start; 0u ~ @ ~ range : range; ) { 100 range = reqPerWorker + ( i < extras ? 1 : 0 ); 101 workers[i] = new( cluster, requests, start, range ); 102 } // for 90 103 } // ?{} 91 104 92 105 void ?{}( Executor & ex, unsigned int nprocessors, unsigned int nworkers, bool sepClus = false ) { 93 106 ex{ nprocessors, nworkers, nworkers, sepClus }; 94 107 } 95 108 void ?{}( Executor & ex, unsigned int nprocessors, bool sepClus = false ) { 96 109 ex{ nprocessors, nprocessors, nprocessors, sepClus }; 97 110 } 98 void ?{}( Executor & ex ) { // special for current cluster99 111 void ?{}( Executor & ex ) { // special for current cluster 112 ex{ 0, active_cluster()->nprocessors, false }; 100 113 } 101 114 void ^?{}( Executor & ex ) with(ex) { 102 // Add one sentinel per worker to stop them. Since in destructor, no new work should be queued. Cannot combine next103 // two loops and only have a single sentinel because workers arrive in arbitrary order, so worker1 may take the 104 //single sentinel while waiting for worker 0 to end.115 // Add one sentinel per worker to stop them. Since in destructor, no new external work should be queued. Cannot 116 // combine next two loops and only have a single sentinel because workers arrive in arbitrary order, so worker1 may 117 // take the single sentinel while waiting for worker 0 to end. 105 118 106 107 unsigned int reqPerWorker = nmailboxes / nworkers;108 109 insert( requests[step], &sentinel[i] );// force eventually termination110 111 112 delete( workers[ i] );113 114 115 delete( processors[ i] );116 119 WRequest sentinel[nworkers]; 120 unsigned int reqPerWorker = nrqueues / nworkers; 121 for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) { 122 insert( requests[step], &sentinel[i] ); // force eventually termination 123 } // for 124 for ( i; nworkers ) { 125 delete( workers[i] ); 126 } // for 127 for ( i; nprocessors ) { 128 delete( processors[i] ); 129 } // for 117 130 118 delete( workers ); 119 delete( requests ); 120 delete( processors ); 121 if ( sepClus ) { delete( cluster ); } 131 free( workers ); 132 // adelete( nrqueues, requests ); 133 for ( i; nrqueues ) ^?{}( requests[i] ); 134 free( requests ); 135 free( processors ); 136 if ( sepClus ) { delete( cluster ); } 122 137 } // ^?{} 123 138 124 139 void send( Executor & ex, void (* action)( void ) ) { // asynchronous call, no return value 125 126 140 WRequest * node = new( action ); 141 insert( ex.requests[tickets( ex )], node ); 127 142 } // send 143 128 144 129 145 int counter = 0; 130 146 131 147 void workie( void ) { 132 133 // 148 __atomic_add_fetch( &counter, 1, __ATOMIC_SEQ_CST ); 149 // fprintf( stderr, "workie\n" ); 134 150 } 135 151 136 int main() { 137 { 138 Executor exector; 139 for ( i; 3000 ) { 140 send( exector, workie ); 141 if ( i % 100 == 0 ) { 142 // fprintf( stderr, "%d\n", i ); 143 yield(); 144 } 145 } // for 146 } 147 printf( "%d\n", counter ); 152 int main( int argc, char * argv[] ) { 153 int times = 1_000_000; 154 if ( argc == 2 ) times = atoi( argv[1] ); 155 processor p[7]; 156 { 157 printf( "%d\n", active_cluster()->nprocessors ); 158 Executor exector; 159 for ( i; times ) { 160 send( exector, workie ); 161 if ( i % 100 == 0 ) { 162 // fprintf( stderr, "%d\n", i ); 163 yield(); 164 } 165 } // for 166 } 167 printf( "%d\n", counter ); 148 168 } 149 169 150 170 // Local Variables: // 171 // tab-width: 4" // 151 172 // compile-command: "cfa executor.cfa" // 152 173 // End: //
Note: See TracChangeset
for help on using the changeset viewer.