Changes in libcfa/src/executor.cfa [8d462e5:cca568e]
- File:
-
- 1 edited
-
libcfa/src/executor.cfa (modified) (6 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/executor.cfa
r8d462e5 rcca568e 4 4 // buffer. 5 5 6 #include < containers/list.hfa>6 #include <bits/containers.hfa> 7 7 #include <thread.hfa> 8 8 #include <stdio.h> 9 9 10 forall( dtype T ) 11 monitor Buffer { // unbounded buffer 12 __queue_t( T ) queue; // unbounded list of work requests 13 condition delay; 14 }; // Buffer 15 forall( dtype T | is_node(T) ) { 16 void insert( Buffer( T ) & mutex buf, T * elem ) with(buf) { 17 append( queue, elem ); // insert element into buffer 18 signal( delay ); // restart 19 } // insert 20 21 T * remove( Buffer( T ) & mutex buf ) with(buf) { 22 if ( queue.head != 0 ) wait( delay ); // no request to process ? => wait 23 // return pop_head( queue ); 24 } // remove 25 } // distribution 26 10 27 struct WRequest { // client request, no return 11 28 void (* action)( void ); 12 DLISTED_MGD_IMPL_IN(WRequest)29 WRequest * next; // intrusive queue field 13 30 }; // WRequest 14 DLISTED_MGD_IMPL_OUT(WRequest)15 31 16 void ?{}( WRequest & req ) with(req) { action = 0; } 17 void ?{}( WRequest & req, void (* action)( void ) ) with(req) { req.action = action; } 32 WRequest *& get_next( WRequest & this ) { return this.next; } 33 void ?{}( WRequest & req ) with(req) { action = 0; next = 0; } 34 void ?{}( WRequest & req, void (* action)( void ) ) with(req) { req.action = action; next = 0; } 18 35 bool stop( WRequest & req ) { return req.action == 0; } 19 36 void doit( WRequest & req ) { req.action(); } 20 21 monitor WRBuffer { // unbounded buffer22 dlist( WRequest, WRequest ) queue; // unbounded list of work requests23 condition delay;24 }; // WRBuffer25 26 void insert( WRBuffer & mutex buf, WRequest * elem ) with(buf) {27 insert_last( queue, *elem ); // insert element into buffer28 signal( delay ); // restart29 } // insert30 31 WRequest * remove( WRBuffer & mutex buf ) with(buf) {32 if ( queue`is_empty ) wait( delay ); // no request to process ? => wait33 return & pop_first( queue );34 } // remove35 37 36 38 // Each worker has its own work buffer to reduce contention between client and server. Hence, work requests arrive and … … 38 40 39 41 thread Worker { 40 WRBuffer* requests;42 Buffer( WRequest ) * requests; 41 43 unsigned int start, range; 42 44 }; // Worker … … 52 54 } // Worker::main 53 55 54 void ?{}( Worker & worker, cluster * wc, WRBuffer* requests, unsigned int start, unsigned int range ) {56 void ?{}( Worker & worker, cluster * wc, Buffer( WRequest ) * requests, unsigned int start, unsigned int range ) { 55 57 (*get_thread(worker)){ *wc }; // create on given cluster 56 58 worker.[requests, start, range] = [requests, start, range]; … … 60 62 cluster * cluster; // if workers execute on separate cluster 61 63 processor ** processors; // array of virtual processors adding parallelism for workers 62 WRBuffer * requests;// list of work requests64 Buffer( WRequest ) * requests; // list of work requests 63 65 Worker ** workers; // array of workers executing work requests 64 66 unsigned int nprocessors, nworkers, nmailboxes; // number of mailboxes/workers/processor tasks … … 77 79 cluster = sepClus ? new( "Executor" ) : active_cluster(); 78 80 processors = (processor **)anew( nprocessors ); 79 requests = (WRBuffer *)anew( nmailboxes );81 requests = anew( nmailboxes ); 80 82 workers = (Worker **)anew( nworkers ); 81 83 … … 139 141 for ( i; 3000 ) { 140 142 send( exector, workie ); 141 if ( i % 100 == 0 ) { 142 // fprintf( stderr, "%d\n", i ); 143 yield(); 144 } 143 if ( i % 100 ) yield(); 145 144 } // for 146 145 }
Note:
See TracChangeset
for help on using the changeset viewer.