- File:
-
- 1 edited
-
libcfa/src/concurrency/actor.hfa (modified) (17 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/actor.hfa
refdd18c rccf1d99 3 3 #include <locks.hfa> 4 4 #include <limits.hfa> 5 #include <list.hfa> 5 6 #include <kernel.hfa> 6 #include <iofwd.hfa>7 #include <virtual_dtor.hfa>8 7 9 8 #ifdef __CFA_DEBUG__ … … 21 20 // Define the default number of executor request-queues (mailboxes) written to by actors and serviced by the 22 21 // actor-executor threads. Must be greater than 0. 23 #define __DEFAULT_EXECUTOR_RQUEUES__ 422 #define __DEFAULT_EXECUTOR_RQUEUES__ 2 24 23 25 24 // Define if executor is created in a separate cluster 26 25 #define __DEFAULT_EXECUTOR_SEPCLUS__ false 27 26 28 #define __DEFAULT_EXECUTOR_BUFSIZE__ 10 29 30 #define __STEAL 0 // workstealing toggle. Disjoint from toggles above 31 32 // workstealing heuristic selection (only set one to be 1) 33 // #define RAND 0 34 #define SEARCH 1 35 36 // show stats 37 // #define ACTOR_STATS 27 // when you flip this make sure to recompile compiler and flip the appropriate flag there too in Actors.cpp 28 #define __ALLOC 0 38 29 39 30 // forward decls 40 31 struct actor; 41 32 struct message; 42 struct executor;43 33 44 34 enum Allocation { Nodelete, Delete, Destroy, Finished }; // allocation status … … 50 40 __receive_fn fn; 51 41 bool stop; 52 }; 42 inline dlink(request); 43 }; 44 P9_EMBEDDED( request, dlink(request) ) 53 45 54 46 static inline void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel … … 66 58 } 67 59 68 // Vector-like data structure that supports O(1) queue operations with no bound on size 69 // assumes gulping behaviour (once a remove occurs, removes happen until empty beforw next insert) 60 // hybrid data structure. Copies until buffer is full and then allocates for intrusive list 70 61 struct copy_queue { 62 dlist( request ) list; 63 #if ! __ALLOC 71 64 request * buffer; 72 size_t count, buffer_size, index, utilized, last_size; 65 size_t count, buffer_size, index; 66 #endif 73 67 }; 74 68 static inline void ?{}( copy_queue & this ) {} 75 69 static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) { 70 list{}; 71 #if ! __ALLOC 76 72 buffer_size = buf_size; 77 73 buffer = aalloc( buffer_size ); 78 74 count = 0; 79 utilized = 0;80 75 index = 0; 81 last_size = 0; 82 } 83 static inline void ^?{}( copy_queue & this ) with(this) { adelete(buffer); } 76 #endif 77 } 78 static inline void ^?{}( copy_queue & this ) with(this) { 79 #if ! __ALLOC 80 adelete(buffer); 81 #endif 82 } 84 83 85 84 static inline void insert( copy_queue & this, request & elem ) with(this) { 86 if ( count >= buffer_size ) { // increase arr size87 last_size = buffer_size;88 buffer _size = 2 * buffer_size;89 buffer = realloc( buffer, sizeof( request ) * buffer_size );90 /* paranoid */ verify( buffer );85 #if ! __ALLOC 86 if ( count < buffer_size ) { // fast path ( no alloc ) 87 buffer[count]{ elem }; 88 count++; 89 return; 91 90 } 92 memcpy( &buffer[count], &elem, sizeof(request) ); 93 count++; 91 request * new_elem = alloc(); 92 (*new_elem){ elem }; 93 insert_last( list, *new_elem ); 94 #else 95 insert_last( list, elem ); 96 #endif 94 97 } 95 98 96 99 // once you start removing you need to remove all elements 97 // it is not supported to call insert() before the array is fully empty 98 static inline request & remove( copy_queue & this ) with(this) { 100 // it is not supported to call insert() before the list is fully empty 101 // should_delete is an output param 102 static inline request & remove( copy_queue & this, bool & should_delete ) with(this) { 103 #if ! __ALLOC 99 104 if ( count > 0 ) { 100 105 count--; 106 should_delete = false; 101 107 size_t old_idx = index; 102 108 index = count == 0 ? 0 : index + 1; 103 109 return buffer[old_idx]; 104 110 } 105 request * ret = 0p;106 return *0p;107 } 108 109 // try to reclaim some memory if less than half of buffer is utilized 110 static inline void reclaim( copy_queue & this ) with(this) {111 if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; }112 utilized = 0;113 buffer_size--;114 buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory115 } 116 117 static inline bool isEmpty( copy_queue & this ) with(this) { return count == 0; } 118 111 #endif 112 should_delete = true; 113 return try_pop_front( list ); 114 } 115 116 static inline bool isEmpty( copy_queue & this ) with(this) { 117 #if ! __ALLOC 118 return count == 0 && list`isEmpty; 119 #else 120 return list`isEmpty; 121 #endif 122 } 123 124 static size_t __buffer_size = 10; // C_TODO: rework this to be passed from executor through ctors (no need for global) 119 125 struct work_queue { 120 126 __spinlock_t mutex_lock; 121 copy_queue * owned_queue; // copy queue allocated and cleaned up by this work_queue 122 copy_queue * c_queue; // current queue 123 volatile bool being_processed; // flag to prevent concurrent processing 124 #ifdef ACTOR_STATS 125 unsigned int id; 126 size_t missed; // transfers skipped due to being_processed flag being up 127 #endif 127 copy_queue owned_queue; 128 copy_queue * c_queue; // C_TODO: try putting this on the stack with ptr juggling 129 128 130 }; // work_queue 129 static inline void ?{}( work_queue & this, size_t buf_size, unsigned int i ) with(this) { 130 owned_queue = alloc(); // allocated separately to avoid false sharing 131 (*owned_queue){ buf_size }; 132 c_queue = owned_queue; 133 being_processed = false; 134 #ifdef ACTOR_STATS 135 id = i; 136 missed = 0; 137 #endif 138 } 139 140 // clean up copy_queue owned by this work_queue 141 static inline void ^?{}( work_queue & this ) with(this) { delete( owned_queue ); } 131 static inline void ?{}( work_queue & this ) with(this) { 132 // c_queue = alloc(); 133 // (*c_queue){ __buffer_size }; 134 owned_queue{ __buffer_size }; 135 c_queue = &owned_queue; 136 } 137 // static inline void ^?{}( work_queue & this ) with(this) { delete( c_queue ); } 142 138 143 139 static inline void insert( work_queue & this, request & elem ) with(this) { … … 149 145 static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) { 150 146 lock( mutex_lock __cfaabi_dbg_ctx2 ); 151 #ifdef __STEAL152 153 // check if queue is being processed elsewhere154 if ( unlikely( being_processed ) ) {155 #ifdef ACTOR_STATS156 missed++;157 #endif158 unlock( mutex_lock );159 return;160 }161 162 being_processed = c_queue->count != 0;163 #endif // __STEAL164 165 c_queue->utilized = c_queue->count;166 167 147 // swap copy queue ptrs 168 148 copy_queue * temp = *transfer_to; … … 172 152 } // transfer 173 153 174 // needed since some info needs to persist past worker lifetimes175 struct worker_info {176 volatile unsigned long long stamp;177 #ifdef ACTOR_STATS178 size_t stolen_from, try_steal, stolen, failed_swaps, msgs_stolen;179 unsigned long long processed;180 size_t gulps;181 #endif182 };183 static inline void ?{}( worker_info & this ) {184 #ifdef ACTOR_STATS185 this.stolen_from = 0;186 this.try_steal = 0; // attempts to steal187 this.stolen = 0; // successful steals188 this.processed = 0; // requests processed189 this.gulps = 0; // number of gulps190 this.failed_swaps = 0; // steal swap failures191 this.msgs_stolen = 0; // number of messages stolen192 #endif193 this.stamp = rdtscl();194 }195 196 // #ifdef ACTOR_STATS197 // unsigned int * stolen_arr;198 // unsigned int * replaced_queue;199 // #endif200 154 thread worker { 201 work_queue ** request_queues; 155 copy_queue owned_queue; 156 work_queue * request_queues; 202 157 copy_queue * current_queue; 203 executor * executor_;158 request & req; 204 159 unsigned int start, range; 205 int id; 206 }; 207 208 #ifdef ACTOR_STATS 209 // aggregate counters for statistics 210 size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0, 211 __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0; 212 #endif 213 static inline void ?{}( worker & this, cluster & clu, work_queue ** request_queues, copy_queue * current_queue, executor * executor_, 214 unsigned int start, unsigned int range, int id ) { 160 }; 161 162 static inline void ?{}( worker & this, cluster & clu, work_queue * request_queues, unsigned int start, unsigned int range ) { 215 163 ((thread &)this){ clu }; 216 this.request_queues = request_queues; // array of all queues 217 this.current_queue = current_queue; // currently gulped queue (start with empty queue to use in swap later) 218 this.executor_ = executor_; // pointer to current executor 219 this.start = start; // start of worker's subrange of request_queues 220 this.range = range; // size of worker's subrange of request_queues 221 this.id = id; // worker's id and index in array of workers 222 } 223 224 static bool no_steal = false; 164 this.request_queues = request_queues; 165 // this.current_queue = alloc(); 166 // (*this.current_queue){ __buffer_size }; 167 this.owned_queue{ __buffer_size }; 168 this.current_queue = &this.owned_queue; 169 this.start = start; 170 this.range = range; 171 } 172 // static inline void ^?{}( worker & mutex this ) with(this) { delete( current_queue ); } 173 225 174 struct executor { 226 175 cluster * cluster; // if workers execute on separate cluster 227 176 processor ** processors; // array of virtual processors adding parallelism for workers 228 work_queue * request_queues; // master array of work request queues 229 copy_queue * local_queues; // array of all worker local queues to avoid deletion race 230 work_queue ** worker_req_queues; // secondary array of work queues to allow for swapping 231 worker ** workers; // array of workers executing work requests 232 worker_info * w_infos; // array of info about each worker 177 work_queue * request_queues; // master list of work request queues 178 worker ** workers; // array of workers executing work requests 233 179 unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues 234 180 bool seperate_clus; // use same or separate cluster for executor 235 181 }; // executor 236 182 237 // #ifdef ACTOR_STATS238 // __spinlock_t out_lock;239 // #endif240 static inline void ^?{}( worker & mutex this ) with(this) {241 #ifdef ACTOR_STATS242 __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST);243 __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST);244 __atomic_add_fetch(&__all_msgs_stolen, executor_->w_infos[id].msgs_stolen,__ATOMIC_SEQ_CST);245 __atomic_add_fetch(&__total_tries, executor_->w_infos[id].try_steal, __ATOMIC_SEQ_CST);246 __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST);247 __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST);248 249 // per worker steal stats (uncomment alongside the lock above this routine to print)250 // lock( out_lock __cfaabi_dbg_ctx2 );251 // printf("Worker id: %d, processed: %llu messages, attempted %lu, stole: %lu, stolen from: %lu\n", id, processed, try_steal, stolen, __atomic_add_fetch(&executor_->w_infos[id].stolen_from, 0, __ATOMIC_SEQ_CST) );252 // int count = 0;253 // int count2 = 0;254 // for ( i; range ) {255 // if ( replaced_queue[start + i] > 0 ){256 // count++;257 // // printf("%d: %u, ",i, replaced_queue[i]);258 // }259 // if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0)260 // count2++;261 // }262 // printf("swapped with: %d of %u indices\n", count, executor_->nrqueues / executor_->nworkers );263 // printf("%d of %u indices were stolen\n", count2, executor_->nrqueues / executor_->nworkers );264 // unlock( out_lock );265 #endif266 }267 268 183 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) { 269 184 if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" ); 185 __buffer_size = buf_size; 270 186 this.nprocessors = nprocessors; 271 187 this.nworkers = nworkers; … … 273 189 this.seperate_clus = seperate_clus; 274 190 275 if ( nworkers == nrqueues )276 no_steal = true;277 278 #ifdef ACTOR_STATS279 // stolen_arr = aalloc( nrqueues );280 // replaced_queue = aalloc( nrqueues );281 __total_workers = nworkers;282 #endif283 284 191 if ( seperate_clus ) { 285 192 cluster = alloc(); … … 288 195 289 196 request_queues = aalloc( nrqueues ); 290 worker_req_queues = aalloc( nrqueues ); 291 for ( i; nrqueues ) { 292 request_queues[i]{ buf_size, i }; 293 worker_req_queues[i] = &request_queues[i]; 294 } 197 for ( i; nrqueues ) 198 request_queues[i]{}; 295 199 296 200 processors = aalloc( nprocessors ); … … 298 202 (*(processors[i] = alloc())){ *cluster }; 299 203 300 local_queues = aalloc( nworkers ); 301 workers = aalloc( nworkers ); 302 w_infos = aalloc( nworkers ); 204 workers = alloc( nworkers ); 303 205 unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers; 304 305 for ( i; nworkers ) {306 w_infos[i]{};307 local_queues[i]{ buf_size };308 }309 310 206 for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) { 311 207 range = reqPerWorker + ( i < extras ? 1 : 0 ); 312 (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], &this, start, range, i};208 (*(workers[i] = alloc())){ *cluster, request_queues, start, range }; 313 209 } // for 314 210 } 315 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __ DEFAULT_EXECUTOR_BUFSIZE__}; }211 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __buffer_size }; } 316 212 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues ) { this{ nprocessors, nworkers, nrqueues, __DEFAULT_EXECUTOR_SEPCLUS__ }; } 317 213 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers ) { this{ nprocessors, nworkers, __DEFAULT_EXECUTOR_RQUEUES__ }; } … … 320 216 321 217 static inline void ^?{}( executor & this ) with(this) { 322 #ifdef __STEAL323 request sentinels[nrqueues];324 for ( unsigned int i = 0; i < nrqueues; i++ ) {325 insert( request_queues[i], sentinels[i] ); // force eventually termination326 } // for327 #else328 218 request sentinels[nworkers]; 329 unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers; 330 for ( unsigned int i = 0, step = 0, range; i < nworkers; i += 1, step += range ) { 331 range = reqPerWorker + ( i < extras ? 1 : 0 ); 219 unsigned int reqPerWorker = nrqueues / nworkers; 220 for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) { 332 221 insert( request_queues[step], sentinels[i] ); // force eventually termination 333 222 } // for 334 #endif335 223 336 224 for ( i; nworkers ) … … 341 229 } // for 342 230 343 #ifdef ACTOR_STATS344 size_t misses = 0;345 for ( i; nrqueues ) {346 misses += worker_req_queues[i]->missed;347 }348 // adelete( stolen_arr );349 // adelete( replaced_queue );350 #endif351 352 231 adelete( workers ); 353 adelete( w_infos );354 adelete( local_queues );355 232 adelete( request_queues ); 356 adelete( worker_req_queues );357 233 adelete( processors ); 358 234 if ( seperate_clus ) delete( cluster ); 359 360 #ifdef ACTOR_STATS // print formatted stats361 printf(" Actor System Stats:\n");362 printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed);363 size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps;364 printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses);365 printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\n",366 __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps);367 size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen;368 printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal);369 #endif370 371 235 } 372 236 373 237 // this is a static field of executor but have to forward decl for get_next_ticket 374 static size_t __next_ticket = 0; 375 376 static inline size_t __get_next_ticket( executor & this ) with(this) { 377 #ifdef __CFA_DEBUG__ 378 size_t temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 379 380 // reserve MAX for dead actors 381 if ( unlikely( temp == MAX ) ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 382 return temp; 383 #else 384 return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_RELAXED) % nrqueues; 385 #endif 238 static unsigned int __next_ticket = 0; 239 240 static inline unsigned int get_next_ticket( executor & this ) with(this) { 241 return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 386 242 } // tickets 387 243 388 // TODO: update globals in this file to be static fields once the static fieldsproject is done244 // C_TODO: update globals in this file to be static fields once the project is done 389 245 static executor * __actor_executor_ = 0p; 390 static bool __actor_executor_passed = false; // was an executor passed to start_actor_system391 static size_t __num_actors_ = 0;// number of actor objects in system246 static bool __actor_executor_passed = false; // was an executor passed to start_actor_system 247 static unsigned long int __num_actors_; // number of actor objects in system 392 248 static struct thread$ * __actor_executor_thd = 0p; // used to wake executor after actors finish 393 249 struct actor { 394 size_t ticket; // executor-queue handle 395 Allocation allocation_; // allocation action 396 inline virtual_dtor; 397 }; 398 399 static inline void ?{}( actor & this ) with(this) { 250 unsigned long int ticket; // executor-queue handle to provide FIFO message execution 251 Allocation allocation_; // allocation action 252 }; 253 254 static inline void ?{}( actor & this ) { 400 255 // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive 401 256 // member must be called to end it 402 verifyf( __actor_executor_, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" ); 403 allocation_ = Nodelete; 404 ticket = __get_next_ticket( *__actor_executor_ ); 405 __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED ); 406 #ifdef ACTOR_STATS 407 __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST ); 408 #endif 409 } 257 verifyf( __actor_executor_, "Creating actor before calling start_actor_system()." ); 258 this.allocation_ = Nodelete; 259 this.ticket = get_next_ticket( *__actor_executor_ ); 260 __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_SEQ_CST ); 261 } 262 static inline void ^?{}( actor & this ) {} 410 263 411 264 static inline void check_actor( actor & this ) { … … 423 276 } 424 277 425 if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_ RELAXED) == 0 ) ) { // all actors have terminated278 if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_SEQ_CST ) == 0 ) ) { // all actors have terminated 426 279 unpark( __actor_executor_thd ); 427 280 } … … 431 284 struct message { 432 285 Allocation allocation_; // allocation action 433 inline virtual_dtor; 434 }; 435 436 static inline void ?{}( message & this ) { 437 this.allocation_ = Nodelete; 438 } 439 static inline void ?{}( message & this, Allocation allocation ) { 440 memcpy( &this.allocation_, &allocation, sizeof(allocation) ); // optimization to elide ctor 441 verifyf( this.allocation_ != Finished, "The Finished Allocation status is not supported for message types.\n"); 442 } 443 static inline void ^?{}( message & this ) with(this) { 444 CFA_DEBUG( if ( allocation_ == Nodelete ) printf("A message at location %p was allocated but never sent.\n", &this); ) 445 } 286 }; 287 288 static inline void ?{}( message & this ) { this.allocation_ = Nodelete; } 289 static inline void ?{}( message & this, Allocation allocation ) { this.allocation_ = allocation; } 290 static inline void ^?{}( message & this ) {} 446 291 447 292 static inline void check_message( message & this ) { 448 293 switch ( this.allocation_ ) { // analyze message status 449 case Nodelete: CFA_DEBUG(this.allocation_ = Finished);break;294 case Nodelete: break; 450 295 case Delete: delete( &this ); break; 451 296 case Destroy: ^?{}(this); break; … … 453 298 } // switch 454 299 } 455 static inline void set_allocation( message & this, Allocation state ) {456 this.allocation_ = state;457 }458 300 459 301 static inline void deliver_request( request & this ) { 460 this.receiver->allocation_ = this.fn( *this.receiver, *this.msg ); 302 Allocation actor_allocation = this.fn( *this.receiver, *this.msg ); 303 this.receiver->allocation_ = actor_allocation; 304 check_actor( *this.receiver ); 461 305 check_message( *this.msg ); 462 check_actor( *this.receiver );463 }464 465 // tries to atomically swap two queues and returns 0p if the swap failed466 // returns ptr to newly owned queue if swap succeeds467 static inline work_queue * try_swap_queues( worker & this, unsigned int victim_idx, unsigned int my_idx ) with(this) {468 work_queue * my_queue = request_queues[my_idx];469 work_queue * other_queue = request_queues[victim_idx];470 471 // if either queue is 0p then they are in the process of being stolen472 if ( other_queue == 0p ) return 0p;473 474 // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false475 if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) )476 return 0p;477 478 // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false479 if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {480 /* paranoid */ verify( request_queues[my_idx] == 0p );481 request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val482 return 0p;483 }484 485 // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically486 request_queues[my_idx] = other_queue; // last write does not need to be atomic487 return other_queue;488 }489 490 // once a worker to steal from has been chosen, choose queue to steal from491 static inline void choose_queue( worker & this, unsigned int victim_id, unsigned int swap_idx ) with(this) {492 // have to calculate victim start and range since victim may be deleted before us in shutdown493 const unsigned int queues_per_worker = executor_->nrqueues / executor_->nworkers;494 const unsigned int extras = executor_->nrqueues % executor_->nworkers;495 unsigned int vic_start, vic_range;496 if ( extras > victim_id ) {497 vic_range = queues_per_worker + 1;498 vic_start = vic_range * victim_id;499 } else {500 vic_start = extras + victim_id * queues_per_worker;501 vic_range = queues_per_worker;502 }503 unsigned int start_idx = prng( vic_range );504 505 unsigned int tries = 0;506 work_queue * curr_steal_queue;507 508 for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) {509 tries++;510 curr_steal_queue = request_queues[ i + vic_start ];511 // avoid empty queues and queues that are being operated on512 if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) )513 continue;514 515 #ifdef ACTOR_STATS516 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );517 if ( curr_steal_queue ) {518 executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count;519 executor_->w_infos[id].stolen++;520 // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED);521 // replaced_queue[swap_idx]++;522 // __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED);523 } else {524 executor_->w_infos[id].failed_swaps++;525 }526 #else527 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );528 #endif // ACTOR_STATS529 530 return;531 }532 533 return;534 }535 536 // choose a worker to steal from537 static inline void steal_work( worker & this, unsigned int swap_idx ) with(this) {538 #if RAND539 unsigned int victim = prng( executor_->nworkers );540 if ( victim == id ) victim = ( victim + 1 ) % executor_->nworkers;541 choose_queue( this, victim, swap_idx );542 #elif SEARCH543 unsigned long long min = MAX; // smaller timestamp means longer since service544 int min_id = 0; // use ints not uints to avoid integer underflow without hacky math545 int n_workers = executor_->nworkers;546 unsigned long long curr_stamp;547 int scount = 1;548 for ( int i = (id + 1) % n_workers; scount < n_workers; i = (i + 1) % n_workers, scount++ ) {549 curr_stamp = executor_->w_infos[i].stamp;550 if ( curr_stamp < min ) {551 min = curr_stamp;552 min_id = i;553 }554 }555 choose_queue( this, min_id, swap_idx );556 #endif557 306 } 558 307 559 308 void main( worker & this ) with(this) { 560 // #ifdef ACTOR_STATS 561 // for ( i; executor_->nrqueues ) { 562 // replaced_queue[i] = 0; 563 // __atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST ); 564 // } 565 // #endif 566 567 // threshold of empty queues we see before we go stealing 568 const unsigned int steal_threshold = 2 * range; 569 570 // Store variable data here instead of worker struct to avoid any potential false sharing 571 unsigned int empty_count = 0; 572 request & req; 573 work_queue * curr_work_queue; 574 309 bool should_delete; 575 310 Exit: 576 311 for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers 577 curr_work_queue = request_queues[i + start]; 578 579 // check if queue is empty before trying to gulp it 580 if ( isEmpty( *curr_work_queue->c_queue ) ) { 581 #ifdef __STEAL 582 empty_count++; 583 if ( empty_count < steal_threshold ) continue; 584 #else 585 continue; 586 #endif 587 } 588 transfer( *curr_work_queue, ¤t_queue ); 589 #ifdef ACTOR_STATS 590 executor_->w_infos[id].gulps++; 591 #endif // ACTOR_STATS 592 #ifdef __STEAL 593 if ( isEmpty( *current_queue ) ) { 594 if ( unlikely( no_steal ) ) continue; 595 empty_count++; 596 if ( empty_count < steal_threshold ) continue; 597 empty_count = 0; 598 599 __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED ); 600 601 #ifdef ACTOR_STATS 602 executor_->w_infos[id].try_steal++; 603 #endif // ACTOR_STATS 604 605 steal_work( this, start + prng( range ) ); 606 continue; 607 } 608 #endif // __STEAL 312 // C_TODO: potentially check queue count instead of immediately trying to transfer 313 transfer( request_queues[i + start], ¤t_queue ); 609 314 while ( ! isEmpty( *current_queue ) ) { 610 #ifdef ACTOR_STATS 611 executor_->w_infos[id].processed++; 612 #endif 613 &req = &remove( *current_queue ); 614 if ( !&req ) continue; 315 &req = &remove( *current_queue, should_delete ); 316 if ( !&req ) continue; // possibly add some work stealing/idle sleep here 615 317 if ( req.stop ) break Exit; 616 318 deliver_request( req ); 617 } 618 #ifdef __STEAL 619 curr_work_queue->being_processed = false; // set done processing 620 empty_count = 0; // we found work so reset empty counter 621 #endif 622 623 // potentially reclaim some of the current queue's vector space if it is unused 624 reclaim( *current_queue ); 319 320 if ( should_delete ) delete( &req ); 321 } // while 625 322 } // for 626 323 } … … 631 328 632 329 static inline void send( actor & this, request & req ) { 633 verifyf( this.ticket != (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );634 330 send( *__actor_executor_, req, this.ticket ); 635 331 } 636 332 637 static inline void __reset_stats() {638 #ifdef ACTOR_STATS639 __total_tries = 0;640 __total_stolen = 0;641 __all_gulps = 0;642 __total_failed_swaps = 0;643 __all_processed = 0;644 __num_actors_stats = 0;645 __all_msgs_stolen = 0;646 #endif647 }648 649 333 static inline void start_actor_system( size_t num_thds ) { 650 __reset_stats();651 334 __actor_executor_thd = active_thread(); 652 335 __actor_executor_ = alloc(); … … 654 337 } 655 338 656 // TODO: potentially revisit getting number of processors 657 // ( currently the value stored in active_cluster()->procs.total is often stale 658 // and doesn't reflect how many procs are allocated ) 659 // static inline void start_actor_system() { start_actor_system( active_cluster()->procs.total ); } 660 static inline void start_actor_system() { start_actor_system( 1 ); } 339 static inline void start_actor_system() { start_actor_system( active_cluster()->procs.total ); } 661 340 662 341 static inline void start_actor_system( executor & this ) { 663 __reset_stats();664 342 __actor_executor_thd = active_thread(); 665 343 __actor_executor_ = &this; … … 676 354 __actor_executor_passed = false; 677 355 } 678 679 // Default messages to send to any actor to change status680 // assigned at creation to __base_msg_finished to avoid unused message warning681 message __base_msg_finished @= { .allocation_ : Finished };682 struct __DeleteMsg { inline message; } DeleteMsg = __base_msg_finished;683 struct __DestroyMsg { inline message; } DestroyMsg = __base_msg_finished;684 struct __FinishedMsg { inline message; } FinishedMsg = __base_msg_finished;685 686 Allocation receive( actor & this, __DeleteMsg & msg ) { return Delete; }687 Allocation receive( actor & this, __DestroyMsg & msg ) { return Destroy; }688 Allocation receive( actor & this, __FinishedMsg & msg ) { return Finished; }689
Note:
See TracChangeset
for help on using the changeset viewer.