- Timestamp:
- Feb 1, 2023, 4:20:33 PM (3 years ago)
- Branches:
- ADT, ast-experimental, master
- Children:
- a4ab235
- Parents:
- dab2b6a
- File:
- 
      - 1 edited
 
 - 
          
  libcfa/src/concurrency/actor.hfa (modified) (13 diffs)
 
Legend:
- Unmodified
- Added
- Removed
- 
      libcfa/src/concurrency/actor.hfardab2b6a recfe574 41 41 P9_EMBEDDED( request, dlink(request) ) 42 42 43 void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel44 void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) {43 static inline void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel 44 static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) { 45 45 this.receiver = receiver; 46 46 this.msg = msg; … … 48 48 this.stop = false; 49 49 } 50 50 static inline void ?{}( request & this, request & copy ) { 51 this.receiver = copy.receiver; 52 this.msg = copy.msg; 53 this.fn = copy.fn; 54 this.stop = copy.stop; 55 } 56 57 // hybrid data structure. Copies until buffer is full and then allocates for intrusive list 58 struct copy_queue { 59 dlist( request ) list; 60 request * buffer; 61 size_t count, buffer_size; 62 }; 63 static inline void ?{}( copy_queue & this ) {} 64 static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) { 65 list{}; 66 buffer_size = buf_size; 67 buffer = aalloc( buffer_size ); 68 count = 0; 69 } 70 static inline void ^?{}( copy_queue & this ) with(this) { adelete(buffer); } 71 72 static inline void insert( copy_queue & this, request & elem ) with(this) { 73 if ( count < buffer_size ) { // fast path ( no alloc ) 74 buffer[count]{ elem }; 75 count++; 76 return; 77 } 78 request * new_elem = alloc(); 79 (*new_elem){ elem }; 80 insert_last( list, *new_elem ); 81 } 82 83 // once you start removing you need to remove all elements 84 // it is not supported to call insert() before the list is fully empty 85 // should_delete is an output param 86 static inline request & remove( copy_queue & this, bool & should_delete ) with(this) { 87 if ( count > 0 ) { 88 count--; 89 should_delete = false; 90 return buffer[count]; 91 } 92 should_delete = true; 93 return try_pop_front( list ); 94 } 95 96 static inline bool isEmpty( copy_queue & this ) with(this) { return count == 0 && list`isEmpty; } 97 98 static size_t __buffer_size = 10; // C_TODO: rework this to be passed from executor through ctors (no need for global) 51 99 struct work_queue { 52 100 futex_mutex mutex_lock; 53 dlist( request ) input; // unbounded list of work requests101 copy_queue * c_queue; // C_TODO: try putting this on the stack with ptr juggling 54 102 }; // work_queue 55 void ?{}( work_queue & this ) with(this) { input{}; mutex_lock{}; } 56 57 void insert( work_queue & this, request & elem ) with(this) { 103 static inline void ?{}( work_queue & this ) with(this) { 104 c_queue = alloc(); 105 (*c_queue){ __buffer_size }; // C_TODO: support passing copy buff size as arg to executor 106 } 107 static inline void ^?{}( work_queue & this ) with(this) { delete( c_queue ); } 108 109 static inline void insert( work_queue & this, request & elem ) with(this) { 58 110 lock( mutex_lock ); 59 insert _last( input, elem );111 insert( *c_queue, elem ); 60 112 unlock( mutex_lock ); 61 113 } // insert 62 114 63 void transfer( work_queue & this, dlist(request) & transferTo ) with(this) {115 static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) { 64 116 lock( mutex_lock ); 65 66 //C_TODO CHANGE 67 // transferTo->transfer( input ); // transfer input to output 68 69 // this is awfully inefficient but Ill use it until transfer is implemented 70 request * r; 71 while ( ! input`isEmpty ) { 72 r = &try_pop_front( input ); 73 if ( r ) insert_last( transferTo, *r ); 74 } 75 76 // transfer( input, transferTo ); 77 117 // swap copy queue ptrs 118 copy_queue * temp = *transfer_to; 119 *transfer_to = c_queue; 120 c_queue = temp; 78 121 unlock( mutex_lock ); 79 122 } // transfer … … 81 124 thread worker { 82 125 work_queue * request_queues; 83 dlist( request )current_queue;126 copy_queue * current_queue; 84 127 request & req; 85 128 unsigned int start, range; … … 89 132 ((thread &)this){ clu }; 90 133 this.request_queues = request_queues; 91 this.current_queue{}; 134 this.current_queue = alloc(); 135 (*this.current_queue){ __buffer_size }; 92 136 this.start = start; 93 137 this.range = range; 94 138 } 139 static inline void ^?{}( worker & mutex this ) with(this) { delete( current_queue ); } 95 140 96 141 struct executor { … … 103 148 }; // executor 104 149 105 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) with(this) {150 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) { 106 151 if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" ); 152 __buffer_size = buf_size; 107 153 this.nprocessors = nprocessors; 108 154 this.nworkers = nworkers; … … 130 176 } // for 131 177 } 132 178 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __buffer_size }; } 133 179 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues ) { this{ nprocessors, nworkers, nrqueues, __DEFAULT_EXECUTOR_SEPCLUS__ }; } 134 180 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers ) { this{ nprocessors, nworkers, __DEFAULT_EXECUTOR_RQUEUES__ }; } … … 150 196 } // for 151 197 152 delete( workers );153 delete( request_queues );154 delete( processors );198 adelete( workers ); 199 adelete( request_queues ); 200 adelete( processors ); 155 201 if ( seperate_clus ) delete( cluster ); 156 202 } … … 173 219 }; 174 220 175 void ?{}( actor & this ) {221 static inline void ?{}( actor & this ) { 176 222 // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive 177 223 // member must be called to end it … … 181 227 __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_SEQ_CST ); 182 228 } 183 void ^?{}( actor & this ) {}229 static inline void ^?{}( actor & this ) {} 184 230 185 231 static inline void check_actor( actor & this ) { … … 207 253 }; 208 254 209 void ?{}( message & this ) { this.allocation_ = Nodelete; }210 void ?{}( message & this, Allocation allocation ) { this.allocation_ = allocation; }211 void ^?{}( message & this ) {}255 static inline void ?{}( message & this ) { this.allocation_ = Nodelete; } 256 static inline void ?{}( message & this, Allocation allocation ) { this.allocation_ = allocation; } 257 static inline void ^?{}( message & this ) {} 212 258 213 259 static inline void check_message( message & this ) { … … 220 266 } 221 267 222 void deliver_request( request & this ) {268 static inline void deliver_request( request & this ) { 223 269 Allocation actor_allocation = this.fn( *this.receiver, *this.msg ); 224 270 this.receiver->allocation_ = actor_allocation; … … 228 274 229 275 void main( worker & this ) with(this) { 276 bool should_delete; 230 277 Exit: 231 278 for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers 232 transfer( request_queues[i + start], current_queue ); 233 while ( ! current_queue`isEmpty ) { 234 &req = &try_pop_front( current_queue ); 279 // C_TODO: potentially check queue count instead of immediately trying to transfer 280 transfer( request_queues[i + start], ¤t_queue ); 281 while ( ! isEmpty( *current_queue ) ) { 282 &req = &remove( *current_queue, should_delete ); 235 283 if ( !&req ) continue; // possibly add some work stealing/idle sleep here 236 284 if ( req.stop ) break Exit; 237 285 deliver_request( req ); 238 286 239 delete( &req );287 if ( should_delete ) delete( &req ); 240 288 } // while 241 289 } // for … … 253 301 __actor_executor_thd = active_thread(); 254 302 __actor_executor_ = alloc(); 255 (*__actor_executor_){ 0, num_thds, num_thds * 16 };303 (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 }; 256 304 } 257 305 
  Note:
 See   TracChangeset
 for help on using the changeset viewer.
  