- Timestamp:
- May 12, 2020, 7:21:32 PM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 979df46
- Parents:
- f2d05e9
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/executor.cfa
rf2d05e9 r8d462e5 4 4 // buffer. 5 5 6 #include < bits/containers.hfa>6 #include <containers/list.hfa> 7 7 #include <thread.hfa> 8 8 #include <stdio.h> 9 9 10 forall( dtype T )11 monitor Buffer { // unbounded buffer12 __queue_t( T ) queue; // unbounded list of work requests13 condition delay;14 }; // Buffer15 forall( dtype T | is_node(T) ) {16 void insert( Buffer( T ) & mutex buf, T * elem ) with(buf) {17 append( queue, elem ); // insert element into buffer18 signal( delay ); // restart19 } // insert20 21 T * remove( Buffer( T ) & mutex buf ) with(buf) {22 if ( queue.head != 0 ) wait( delay ); // no request to process ? => wait23 // return pop_head( queue );24 } // remove25 } // distribution26 27 10 struct WRequest { // client request, no return 28 11 void (* action)( void ); 29 WRequest * next; // intrusive queue field12 DLISTED_MGD_IMPL_IN(WRequest) 30 13 }; // WRequest 14 DLISTED_MGD_IMPL_OUT(WRequest) 31 15 32 WRequest *& get_next( WRequest & this ) { return this.next; } 33 void ?{}( WRequest & req ) with(req) { action = 0; next = 0; } 34 void ?{}( WRequest & req, void (* action)( void ) ) with(req) { req.action = action; next = 0; } 16 void ?{}( WRequest & req ) with(req) { action = 0; } 17 void ?{}( WRequest & req, void (* action)( void ) ) with(req) { req.action = action; } 35 18 bool stop( WRequest & req ) { return req.action == 0; } 36 19 void doit( WRequest & req ) { req.action(); } 20 21 monitor WRBuffer { // unbounded buffer 22 dlist( WRequest, WRequest ) queue; // unbounded list of work requests 23 condition delay; 24 }; // WRBuffer 25 26 void insert( WRBuffer & mutex buf, WRequest * elem ) with(buf) { 27 insert_last( queue, *elem ); // insert element into buffer 28 signal( delay ); // restart 29 } // insert 30 31 WRequest * remove( WRBuffer & mutex buf ) with(buf) { 32 if ( queue`is_empty ) wait( delay ); // no request to process ? => wait 33 return & pop_first( queue ); 34 } // remove 37 35 38 36 // Each worker has its own work buffer to reduce contention between client and server. Hence, work requests arrive and … … 40 38 41 39 thread Worker { 42 Buffer( WRequest )* requests;40 WRBuffer * requests; 43 41 unsigned int start, range; 44 42 }; // Worker … … 54 52 } // Worker::main 55 53 56 void ?{}( Worker & worker, cluster * wc, Buffer( WRequest )* requests, unsigned int start, unsigned int range ) {54 void ?{}( Worker & worker, cluster * wc, WRBuffer * requests, unsigned int start, unsigned int range ) { 57 55 (*get_thread(worker)){ *wc }; // create on given cluster 58 56 worker.[requests, start, range] = [requests, start, range]; … … 62 60 cluster * cluster; // if workers execute on separate cluster 63 61 processor ** processors; // array of virtual processors adding parallelism for workers 64 Buffer( WRequest ) * requests;// list of work requests62 WRBuffer * requests; // list of work requests 65 63 Worker ** workers; // array of workers executing work requests 66 64 unsigned int nprocessors, nworkers, nmailboxes; // number of mailboxes/workers/processor tasks … … 79 77 cluster = sepClus ? new( "Executor" ) : active_cluster(); 80 78 processors = (processor **)anew( nprocessors ); 81 requests = anew( nmailboxes );79 requests = (WRBuffer *)anew( nmailboxes ); 82 80 workers = (Worker **)anew( nworkers ); 83 81 … … 141 139 for ( i; 3000 ) { 142 140 send( exector, workie ); 143 if ( i % 100 ) yield(); 141 if ( i % 100 == 0 ) { 142 // fprintf( stderr, "%d\n", i ); 143 yield(); 144 } 144 145 } // for 145 146 }
Note: See TracChangeset
for help on using the changeset viewer.