Changeset 1e38178 for libcfa/src
- Timestamp:
- Mar 4, 2023, 1:35:11 PM (22 months ago)
- Branches:
- ADT, ast-experimental, master
- Children:
- 13f066d
- Parents:
- 601bd9e
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/actor.hfa
r601bd9e r1e38178 3 3 #include <locks.hfa> 4 4 #include <limits.hfa> 5 #include <list.hfa>6 5 #include <kernel.hfa> 7 #include <vector2.hfa> 6 #include <time_t.hfa> 7 #include <time.hfa> 8 #include <iofwd.hfa> 8 9 9 10 #ifdef __CFA_DEBUG__ … … 21 22 // Define the default number of executor request-queues (mailboxes) written to by actors and serviced by the 22 23 // actor-executor threads. Must be greater than 0. 23 #define __DEFAULT_EXECUTOR_RQUEUES__ 224 #define __DEFAULT_EXECUTOR_RQUEUES__ 4 24 25 25 26 // Define if executor is created in a separate cluster 26 27 #define __DEFAULT_EXECUTOR_SEPCLUS__ false 27 28 28 #define __STEAL 1 // workstealing toggle. Disjoint from toggles above 29 30 // whether to steal work or to steal a queue Only applicable if __STEAL == 1 31 #define __STEAL_WORK 0 32 33 // heuristic selection (only set one to be 1) 34 #define __RAND_QUEUE 1 35 #define __RAND_WORKER 0 36 37 // show stealing stats 38 // #define __STEAL_STATS 29 #define __DEFAULT_EXECUTOR_BUFSIZE__ 10 30 31 // #define __STEAL 0 // workstealing toggle. Disjoint from toggles above 32 33 // workstealing heuristic selection (only set one to be 1) 34 // #define RAND 0 35 // #define SEARCH 0 36 37 // show stats 38 // #define STATS 39 39 40 40 // forward decls 41 41 struct actor; 42 42 struct message; 43 struct executor; 43 44 44 45 enum Allocation { Nodelete, Delete, Destroy, Finished }; // allocation status … … 66 67 } 67 68 68 // hybrid data structure. Copies until buffer is full and then allocates for intrusive list 69 // Vector-like data structure that supports O(1) queue operations with no bound on size 70 // assumes gulping behaviour (once a remove occurs, removes happen until empty beforw next insert) 69 71 struct copy_queue { 70 72 request * buffer; … … 89 91 /* paranoid */ verify( buffer ); 90 92 } 91 buffer[count]{ elem }; // C_TODO: change to memcpy 92 // memcpy( &buffer[count], &elem, sizeof(request) ); 93 memcpy( &buffer[count], &elem, sizeof(request) ); 93 94 count++; 94 95 } 95 96 96 97 // once you start removing you need to remove all elements 97 // it is not supported to call insert() before the listis fully empty98 // it is not supported to call insert() before the array is fully empty 98 99 static inline request & remove( copy_queue & this ) with(this) { 99 100 if ( count > 0 ) { … … 107 108 } 108 109 109 // try to reclaim some memory 110 // try to reclaim some memory if less than half of buffer is utilized 110 111 static inline void reclaim( copy_queue & this ) with(this) { 111 112 if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; } … … 117 118 static inline bool isEmpty( copy_queue & this ) with(this) { return count == 0; } 118 119 119 static size_t __buffer_size = 10; // C_TODO: rework this to be passed from executor through ctors (no need for global)120 120 struct work_queue { 121 121 __spinlock_t mutex_lock; 122 copy_queue owned_queue; 123 copy_queue * c_queue; 124 volatile bool being_processed; 122 copy_queue * owned_queue; // copy queue allocated and cleaned up by this work_queue 123 copy_queue * c_queue; // current queue 124 volatile bool being_processed; // flag to prevent concurrent processing 125 #ifdef STATS 126 unsigned int id; 127 size_t missed; // transfers skipped due to being_processed flag being up 128 #endif 125 129 }; // work_queue 126 static inline void ?{}( work_queue & this ) with(this) { 127 owned_queue{ __buffer_size }; 128 c_queue = &owned_queue; 130 static inline void ?{}( work_queue & this, size_t buf_size, unsigned int i ) with(this) { 131 owned_queue = alloc(); // allocated separately to avoid false sharing 132 (*owned_queue){ buf_size }; 133 c_queue = owned_queue; 129 134 being_processed = false; 130 } 135 #ifdef STATS 136 id = i; 137 missed = 0; 138 #endif 139 } 140 141 // clean up copy_queue owned by this work_queue 142 static inline void ^?{}( work_queue & this ) with(this) { delete( owned_queue ); } 131 143 132 144 static inline void insert( work_queue & this, request & elem ) with(this) { … … 136 148 } // insert 137 149 138 static inline void transfer( work_queue & this, copy_queue ** transfer_to , work_queue ** queue_arr, unsigned int idx) with(this) {150 static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) { 139 151 lock( mutex_lock __cfaabi_dbg_ctx2 ); 140 #if __STEAL 141 142 #if __STEAL_WORK 143 if ( unlikely( being_processed ) ) 144 #else 145 // check if queue has been stolen out from under us between 146 // transfer() call and lock acquire C_TODO: maybe just use new queue! 147 if ( unlikely( being_processed || queue_arr[idx] != &this ) ) 148 #endif // __STEAL_WORK 149 { 152 #ifdef __STEAL 153 154 // check if queue is being processed elsewhere 155 if ( unlikely( being_processed ) ) { 156 #ifdef STATS 157 missed++; 158 #endif 150 159 unlock( mutex_lock ); 151 160 return; … … 164 173 } // transfer 165 174 175 // needed since some info needs to persist past worker lifetimes 176 struct worker_info { 177 volatile unsigned long long stamp; 178 #ifdef STATS 179 size_t stolen_from; 180 #endif 181 }; 182 static inline void ?{}( worker_info & this ) { 183 #ifdef STATS 184 this.stolen_from = 0; 185 #endif 186 this.stamp = rdtscl(); 187 } 188 189 #ifdef STATS 190 unsigned int * stolen_arr; 191 unsigned int * replaced_queue; 192 #endif 166 193 thread worker { 167 194 work_queue ** request_queues; 168 195 copy_queue * current_queue; 169 worker ** worker_arr; // C_TODO: change n_workers, n_queues,worker_arr to just be pulled from ptr to executor 170 request & req; 171 unsigned int start, range, empty_count, n_workers, n_queues, id; 172 #ifdef __STEAL_STATS 173 unsigned int try_steal, stolen; 196 executor * executor_; 197 unsigned int start, range; 198 int id; 199 #ifdef STATS 200 size_t try_steal, stolen, failed_swaps, msgs_stolen; 201 unsigned long long processed; 202 size_t gulps; 174 203 #endif 175 204 }; 176 205 177 #ifdef __STEAL_STATS 178 unsigned int total_tries = 0, total_stolen = 0, total_workers; 206 #ifdef STATS 207 // aggregate counters for statistics 208 size_t total_tries = 0, total_stolen = 0, total_workers, all_gulps = 0, 209 total_failed_swaps = 0, all_processed = 0, __num_actors_stats = 0, all_msgs_stolen = 0; 179 210 #endif 180 static inline void ?{}( worker & this, cluster & clu, work_queue ** request_queues, copy_queue * current_queue, unsigned int start,181 unsigned int range, worker ** worker_arr, unsigned int n_workers, unsigned int n_queues, unsignedint id ) {211 static inline void ?{}( worker & this, cluster & clu, work_queue ** request_queues, copy_queue * current_queue, executor * executor_, 212 unsigned int start, unsigned int range, int id ) { 182 213 ((thread &)this){ clu }; 183 this.request_queues = request_queues; 184 this.current_queue = current_queue; 185 this.start = start; 186 this.range = range; 187 this.empty_count = 0; 188 this.n_workers = n_workers; 189 this.worker_arr = worker_arr; 190 this.n_queues = n_queues; 191 this.id = id; 192 #ifdef __STEAL_STATS 193 this.try_steal = 0; 194 this.stolen = 0; 195 total_workers = n_workers; 196 #endif 197 } 198 static inline void ^?{}( worker & mutex this ) with(this) { 199 // delete( current_queue ); 200 #ifdef __STEAL_STATS 201 __atomic_add_fetch(&total_tries, try_steal, __ATOMIC_SEQ_CST); 202 __atomic_add_fetch(&total_stolen, stolen, __ATOMIC_SEQ_CST); 203 if (__atomic_sub_fetch(&total_workers, 1, __ATOMIC_SEQ_CST) == 0) 204 printf("steal attempts: %u, steals: %u\n", total_tries, total_stolen); 205 #endif 206 } 207 214 this.request_queues = request_queues; // array of all queues 215 this.current_queue = current_queue; // currently gulped queue (start with empty queue to use in swap later) 216 this.executor_ = executor_; // pointer to current executor 217 this.start = start; // start of worker's subrange of request_queues 218 this.range = range; // size of worker's subrange of request_queues 219 this.id = id; // worker's id and index in array of workers 220 #ifdef STATS 221 this.try_steal = 0; // attempts to steal 222 this.stolen = 0; // successful steals 223 this.processed = 0; // requests processed 224 this.gulps = 0; // number of gulps 225 this.failed_swaps = 0; // steal swap failures 226 this.msgs_stolen = 0; // number of messages stolen 227 #endif 228 } 229 230 static bool no_steal = false; 208 231 struct executor { 209 232 cluster * cluster; // if workers execute on separate cluster … … 213 236 work_queue ** worker_req_queues; // secondary array of work queues to allow for swapping 214 237 worker ** workers; // array of workers executing work requests 238 worker_info * w_infos; // array of info about each worker 215 239 unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues 216 240 bool seperate_clus; // use same or separate cluster for executor 217 241 }; // executor 218 242 243 // #ifdef STATS 244 // __spinlock_t out_lock; 245 // #endif 246 static inline void ^?{}( worker & mutex this ) with(this) { 247 #ifdef STATS 248 __atomic_add_fetch(&all_gulps, gulps,__ATOMIC_SEQ_CST); 249 __atomic_add_fetch(&all_processed, processed,__ATOMIC_SEQ_CST); 250 __atomic_add_fetch(&all_msgs_stolen, msgs_stolen,__ATOMIC_SEQ_CST); 251 __atomic_add_fetch(&total_tries, try_steal, __ATOMIC_SEQ_CST); 252 __atomic_add_fetch(&total_stolen, stolen, __ATOMIC_SEQ_CST); 253 __atomic_add_fetch(&total_failed_swaps, failed_swaps, __ATOMIC_SEQ_CST); 254 255 // per worker steal stats (uncomment alongside the lock above this routine to print) 256 // lock( out_lock __cfaabi_dbg_ctx2 ); 257 // printf("Worker id: %d, processed: %llu messages, attempted %lu, stole: %lu, stolen from: %lu\n", id, processed, try_steal, stolen, __atomic_add_fetch(&executor_->w_infos[id].stolen_from, 0, __ATOMIC_SEQ_CST) ); 258 // int count = 0; 259 // int count2 = 0; 260 // for ( i; range ) { 261 // if ( replaced_queue[start + i] > 0 ){ 262 // count++; 263 // // printf("%d: %u, ",i, replaced_queue[i]); 264 // } 265 // if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0) 266 // count2++; 267 // } 268 // printf("swapped with: %d of %u indices\n", count, executor_->nrqueues / executor_->nworkers ); 269 // printf("%d of %u indices were stolen\n", count2, executor_->nrqueues / executor_->nworkers ); 270 // unlock( out_lock ); 271 #endif 272 } 273 219 274 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) { 220 275 if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" ); 221 __buffer_size = buf_size;222 276 this.nprocessors = nprocessors; 223 277 this.nworkers = nworkers; … … 225 279 this.seperate_clus = seperate_clus; 226 280 281 if ( nworkers == nrqueues ) 282 no_steal = true; 283 284 #ifdef STATS 285 stolen_arr = aalloc( nrqueues ); 286 replaced_queue = aalloc( nrqueues ); 287 total_workers = nworkers; 288 #endif 289 227 290 if ( seperate_clus ) { 228 291 cluster = alloc(); … … 233 296 worker_req_queues = aalloc( nrqueues ); 234 297 for ( i; nrqueues ) { 235 request_queues[i]{ };298 request_queues[i]{ buf_size, i }; 236 299 worker_req_queues[i] = &request_queues[i]; 237 300 } … … 242 305 243 306 local_queues = aalloc( nworkers ); 244 workers = alloc( nworkers ); 307 workers = aalloc( nworkers ); 308 w_infos = aalloc( nworkers ); 245 309 unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers; 310 311 for ( i; nworkers ) { 312 w_infos[i]{}; 313 local_queues[i]{ buf_size }; 314 } 315 246 316 for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) { 247 local_queues[i]{ buf_size };248 317 range = reqPerWorker + ( i < extras ? 1 : 0 ); 249 (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], start, range, workers, nworkers, nrqueues, i };318 (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], &this, start, range, i }; 250 319 } // for 251 320 } 252 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __ buffer_size}; }321 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __DEFAULT_EXECUTOR_BUFSIZE__ }; } 253 322 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues ) { this{ nprocessors, nworkers, nrqueues, __DEFAULT_EXECUTOR_SEPCLUS__ }; } 254 323 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers ) { this{ nprocessors, nworkers, __DEFAULT_EXECUTOR_RQUEUES__ }; } … … 256 325 static inline void ?{}( executor & this ) { this{ __DEFAULT_EXECUTOR_PROCESSORS__ }; } 257 326 258 // C_TODO: once stealing is implemented make sure shutdown still works259 327 static inline void ^?{}( executor & this ) with(this) { 260 #if __STEAL328 #ifdef __STEAL 261 329 request sentinels[nrqueues]; 262 330 for ( unsigned int i = 0; i < nrqueues; i++ ) { … … 265 333 #else 266 334 request sentinels[nworkers]; 267 unsigned int reqPerWorker = nrqueues / nworkers; 268 for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) { 335 unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers; 336 for ( unsigned int i = 0, step = 0, range; i < nworkers; i += 1, step += range ) { 337 range = reqPerWorker + ( i < extras ? 1 : 0 ); 269 338 insert( request_queues[step], sentinels[i] ); // force eventually termination 270 339 } // for … … 278 347 } // for 279 348 349 #ifdef STATS 350 size_t misses = 0; 351 for ( i; nrqueues ) { 352 misses += worker_req_queues[i]->missed; 353 } 354 adelete( stolen_arr ); 355 adelete( replaced_queue ); 356 #endif 357 280 358 adelete( workers ); 359 adelete( w_infos ); 281 360 adelete( local_queues ); 282 361 adelete( request_queues ); … … 284 363 adelete( processors ); 285 364 if ( seperate_clus ) delete( cluster ); 365 366 #ifdef STATS 367 printf(" Actor System Stats:\n"); 368 printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, all_processed); 369 printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", all_gulps, all_processed / all_gulps, misses); 370 printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\n", 371 total_tries, total_stolen, total_tries - total_stolen - total_failed_swaps, total_failed_swaps); 372 printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", all_msgs_stolen, all_msgs_stolen/total_stolen); 373 #endif 374 286 375 } 287 376 288 377 // this is a static field of executor but have to forward decl for get_next_ticket 289 static unsigned int __next_ticket = 0; 290 291 static inline unsigned int get_next_ticket( executor & this ) with(this) { 292 return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 378 static unsigned long int __next_ticket = 0; 379 380 static inline unsigned long int __get_next_ticket( executor & this ) with(this) { 381 unsigned long int temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 382 383 // reserve MAX for dead actors 384 if ( temp == MAX ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 385 return temp; 293 386 } // tickets 294 387 295 // C_TODO: update globals in this file to be static fields once theproject is done388 // TODO: update globals in this file to be static fields once the static fields project is done 296 389 static executor * __actor_executor_ = 0p; 297 static bool __actor_executor_passed = false; // was an executor passed to start_actor_system298 static unsigned long int __num_actors_ ; // number of actor objects in system390 static bool __actor_executor_passed = false; // was an executor passed to start_actor_system 391 static unsigned long int __num_actors_ = 0; // number of actor objects in system 299 392 static struct thread$ * __actor_executor_thd = 0p; // used to wake executor after actors finish 300 393 struct actor { 301 unsigned long int ticket; // executor-queue handle to provide FIFO message execution302 Allocation allocation_; // allocation action394 unsigned long int ticket; // executor-queue handle 395 Allocation allocation_; // allocation action 303 396 }; 304 397 … … 306 399 // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive 307 400 // member must be called to end it 308 verifyf( __actor_executor_, "Creating actor before calling start_actor_system() ." );401 verifyf( __actor_executor_, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" ); 309 402 this.allocation_ = Nodelete; 310 this.ticket = get_next_ticket( *__actor_executor_ );403 this.ticket = __get_next_ticket( *__actor_executor_ ); 311 404 __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_SEQ_CST ); 405 #ifdef STATS 406 __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST ); 407 #endif 312 408 } 313 409 static inline void ^?{}( actor & this ) {} … … 338 434 339 435 static inline void ?{}( message & this ) { this.allocation_ = Nodelete; } 340 static inline void ?{}( message & this, Allocation allocation ) { this.allocation_ = allocation; } 341 static inline void ^?{}( message & this ) {} 436 static inline void ?{}( message & this, Allocation allocation ) { 437 this.allocation_ = allocation; 438 verifyf( this.allocation_ != Finished, "The Finished Allocation status is not supported for message types.\n"); 439 } 440 static inline void ^?{}( message & this ) { 441 CFA_DEBUG( if ( this.allocation_ == Nodelete ) printf("A message at location %p was allocated but never sent.\n", &this); ) 442 } 342 443 343 444 static inline void check_message( message & this ) { 344 445 switch ( this.allocation_ ) { // analyze message status 345 case Nodelete: break;446 case Nodelete: CFA_DEBUG( this.allocation_ = Finished; ) break; 346 447 case Delete: delete( &this ); break; 347 448 case Destroy: ^?{}(this); break; … … 349 450 } // switch 350 451 } 452 static inline void set_allocation( message & this, Allocation state ) { this.allocation_ = state; } 351 453 352 454 static inline void deliver_request( request & this ) { … … 357 459 } 358 460 359 // Couple of ways to approach work stealing 360 // 1: completely worker agnostic, just find a big queue and steal it 361 // 2: track some heuristic of worker's load and focus on that and then pick a queue from that worker 362 // worker heuristics: 363 // - how many queues have work? 364 // - size of largest queue 365 // - total # of messages 366 // - messages currently servicing 367 // - pick randomly 368 // - pick from closer threads/workers (this can be combined with others) 369 370 // lock free or global lock for queue stealing 371 #define __LOCK_SWP 0 372 373 __spinlock_t swp_lock; 374 375 // tries to atomically swap two queues and returns a bool indicating if the swap failed 376 static inline bool try_swap_queues( worker & this, unsigned int victim_idx, unsigned int my_idx ) with(this) { 377 #if __LOCK_SWP 378 379 lock( swp_lock __cfaabi_dbg_ctx2 ); 380 work_queue * temp = request_queues[my_idx]; 381 request_queues[my_idx] = request_queues[victim_idx]; 382 request_queues[victim_idx] = temp; 383 unlock( swp_lock ); 384 385 return true; 386 387 #else // __LOCK_SWP else 461 // tries to atomically swap two queues and returns 0p if the swap failed 462 // returns ptr to newly owned queue if swap succeeds 463 static inline work_queue * try_swap_queues( worker & this, unsigned int victim_idx, unsigned int my_idx ) with(this) { 388 464 work_queue * my_queue = request_queues[my_idx]; 389 465 work_queue * other_queue = request_queues[victim_idx]; 390 if ( other_queue == 0p || my_queue == 0p ) return false; 466 467 // if either queue is 0p then they are in the process of being stolen 468 if ( other_queue == 0p ) return 0p; 391 469 392 470 // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false 393 471 if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) 394 return false;472 return 0p; 395 473 396 474 // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false … … 398 476 /* paranoid */ verify( request_queues[my_idx] == 0p ); 399 477 request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val 400 return false;478 return 0p; 401 479 } 402 480 403 481 // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically 404 482 request_queues[my_idx] = other_queue; // last write does not need to be atomic 405 return true; 406 407 #endif // __LOCK_SWP 483 return other_queue; 408 484 } 409 485 410 486 // once a worker to steal from has been chosen, choose queue to steal from 411 static inline bool choose_queue( worker & this, unsigned int victim_id, unsigned int & last_idx ) with(this) { 412 #if __RAND_QUEUE 413 unsigned int tries = 0; 414 const unsigned int start_idx = prng( n_queues ); 415 work_queue * curr_steal_queue; 416 417 for ( unsigned int i = start_idx; tries < n_queues; i = (i + 1) % n_queues ) { 418 tries++; 419 curr_steal_queue = request_queues[i]; 420 #if __STEAL_WORK 421 422 // avoid empty queues and queues that are being operated on 423 if ( curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) ) 424 continue; 425 426 // in this case we just return from transfer if this doesn't work 427 transfer( *curr_steal_queue, ¤t_queue, request_queues, i ); 428 if ( isEmpty( *current_queue ) ) continue; 429 last_idx = i; 430 431 #ifdef __STEAL_STATS 432 stolen++; 433 #endif // __STEAL_STATS 434 435 #else // __STEAL_WORK else 436 437 // avoid empty queues and queues that are being operated on 438 if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) ) 439 continue; 440 441 #ifdef __STEAL_STATS 442 bool success = try_swap_queues( this, i, last_idx ); 443 if ( success ) stolen++; 444 #else 445 try_swap_queues( this, i, last_idx ); 446 #endif // __STEAL_STATS 447 448 // C_TODO: try transfer immediately 449 // transfer( *request_queues[last_idx], ¤t_queue, request_queues, last_idx ); 450 // if ( isEmpty( *current_queue ) ) return false; 451 return false; 452 453 #endif // __STEAL_WORK 454 455 return true; 456 } // for 457 return false; 458 459 #elif __RAND_WORKER 460 487 static inline void choose_queue( worker & this, unsigned int victim_id, unsigned int swap_idx ) with(this) { 461 488 // have to calculate victim start and range since victim may be deleted before us in shutdown 462 const unsigned int queues_per_worker = n_queues / n_workers;463 const unsigned int extras = n_queues % n_workers;489 const unsigned int queues_per_worker = executor_->nrqueues / executor_->nworkers; 490 const unsigned int extras = executor_->nrqueues % executor_->nworkers; 464 491 unsigned int vic_start, vic_range; 465 492 if ( extras > victim_id ) { … … 471 498 } 472 499 unsigned int start_idx = prng( vic_range ); 500 473 501 unsigned int tries = 0; 474 502 work_queue * curr_steal_queue; … … 481 509 continue; 482 510 483 try_swap_queues( this, i, last_idx ); 484 485 #ifdef __STEAL_STATS 486 bool success = try_swap_queues( this, i, last_idx ); 487 if ( success ) stolen++; 511 #ifdef STATS 512 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx ); 513 if ( curr_steal_queue ) { 514 msgs_stolen += curr_steal_queue->c_queue->count; 515 stolen++; 516 __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED); 517 replaced_queue[swap_idx]++; 518 __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED); 519 } else { 520 failed_swaps++; 521 } 488 522 #else 489 try_swap_queues( this, i, last_idx ); 490 #endif // __STEAL_STATS 491 492 // C_TODO: try transfer immediately 493 // transfer( *request_queues[last_idx], ¤t_queue, request_queues, last_idx ); 494 // if ( isEmpty( *current_queue ) ) return false; 495 return false; 496 } 497 #endif 523 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx ); 524 #endif // STATS 525 526 return; 527 } 528 529 return; 498 530 } 499 531 500 532 // choose a worker to steal from 501 static inline bool choose_victim( worker & this, unsigned int & last_idx ) with(this) { 502 #if __RAND_WORKER 503 unsigned int victim = prng( n_workers ); 504 if ( victim == id ) victim = ( victim + 1 ) % n_workers; 505 return choose_queue( this, victim, last_idx ); 506 #else 507 return choose_queue( this, 0, last_idx ); 508 #endif 509 } 510 511 // look for work to steal 512 // returns a bool: true => a queue was stolen, false => no work was stolen 513 static inline bool steal_work( worker & this, unsigned int & last_idx ) with(this) { // C_TODO: add debug tracking of how many steals occur 514 // to steal queue acquire both queue's locks in address ordering (maybe can do atomic swap) 515 // maybe a flag to hint which queue is being processed 516 // look at count to see if queue is worth stealing (dont steal empty queues) 517 // if steal and then flag is up then dont process and just continue looking at own queues 518 // (best effort approach) its ok if stealing isn't fruitful 519 // -> more important to not delay busy threads 520 521 return choose_victim( this, last_idx ); 533 static inline void steal_work( worker & this, unsigned int swap_idx ) with(this) { 534 #if RAND 535 unsigned int victim = prng( executor_->nworkers ); 536 if ( victim == id ) victim = ( victim + 1 ) % executor_->nworkers; 537 choose_queue( this, victim, swap_idx ); 538 #elif SEARCH 539 unsigned long long min = MAX; // smaller timestamp means longer since service 540 int min_id = 0; // use ints not uints to avoid integer underflow without hacky math 541 int n_workers = executor_->nworkers; 542 unsigned long long curr_stamp; 543 int scount = 1; 544 for ( int i = (id + 1) % n_workers; scount < n_workers; i = (i + 1) % n_workers, scount++ ) { 545 curr_stamp = executor_->w_infos[i].stamp; 546 if ( curr_stamp < min ) { 547 min = curr_stamp; 548 min_id = i; 549 } 550 } 551 choose_queue( this, min_id, swap_idx ); 552 #endif 522 553 } 523 554 524 555 void main( worker & this ) with(this) { 556 #ifdef STATS 557 for ( i; executor_->nrqueues ) { 558 replaced_queue[i] = 0; 559 __atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST ); 560 } 561 #endif 562 525 563 // threshold of empty queues we see before we go stealing 526 const unsigned int steal_threshold = 2 * n_queues; 564 const unsigned int steal_threshold = 2 * range; 565 566 // Store variable data here instead of worker struct to avoid any potential false sharing 567 unsigned int empty_count = 0; 568 request & req; 527 569 unsigned int curr_idx; 528 570 work_queue * curr_work_queue; 571 529 572 Exit: 530 573 for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers 531 // C_TODO: potentially check queue count instead of immediately trying to transfer532 574 curr_idx = i + start; 533 575 curr_work_queue = request_queues[curr_idx]; 534 transfer( *curr_work_queue, ¤t_queue, request_queues, curr_idx ); 535 if ( isEmpty( *current_queue ) ) { 536 #if __STEAL 576 577 // check if queue is empty before trying to gulp it 578 if ( isEmpty( *curr_work_queue->c_queue ) ) { 579 #ifdef __STEAL 537 580 empty_count++; 538 581 if ( empty_count < steal_threshold ) continue; 539 empty_count = 0; // C_TODO: look into stealing backoff schemes 540 #ifdef __STEAL_STATS 582 #else 583 continue; 584 #endif 585 } 586 transfer( *curr_work_queue, ¤t_queue ); 587 #ifdef STATS 588 gulps++; 589 #endif // STATS 590 #ifdef __STEAL 591 if ( isEmpty( *current_queue ) ) { 592 if ( unlikely( no_steal ) ) continue; 593 empty_count++; 594 if ( empty_count < steal_threshold ) continue; 595 empty_count = 0; 596 597 __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED ); 598 599 #ifdef STATS 541 600 try_steal++; 542 #endif // __STEAL_STATS 543 544 if ( ! steal_work( this, curr_idx ) ) continue; 545 546 #else // __STEAL else 547 601 #endif // STATS 602 603 steal_work( this, start + prng( range ) ); 548 604 continue; 549 550 #endif // __STEAL551 605 } 606 #endif // __STEAL 552 607 while ( ! isEmpty( *current_queue ) ) { 608 #ifdef STATS 609 processed++; 610 #endif 553 611 &req = &remove( *current_queue ); 554 if ( !&req ) continue; // possibly add some work stealing/idle sleep here612 if ( !&req ) continue; 555 613 if ( req.stop ) break Exit; 556 614 deliver_request( req ); 557 615 } 558 #if __STEAL616 #ifdef __STEAL 559 617 curr_work_queue->being_processed = false; // set done processing 618 empty_count = 0; // we found work so reset empty counter 560 619 #endif 561 empty_count = 0; // we found work so reset empty counter 620 621 // potentially reclaim some of the current queue's vector space if it is unused 562 622 reclaim( *current_queue ); 563 623 } // for … … 569 629 570 630 static inline void send( actor & this, request & req ) { 631 verifyf( this.ticket != (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" ); 571 632 send( *__actor_executor_, req, this.ticket ); 572 633 } … … 578 639 } 579 640 580 static inline void start_actor_system() { start_actor_system( active_cluster()->procs.total ); } 641 // TODO: potentially getting revisit number of processors 642 // ( currently the value stored in active_cluster()->procs.total is often stale 643 // and doesn't reflect how many procs are allocated ) 644 // static inline void start_actor_system() { start_actor_system( active_cluster()->procs.total ); } 645 static inline void start_actor_system() { start_actor_system( 1 ); } 581 646 582 647 static inline void start_actor_system( executor & this ) { … … 595 660 __actor_executor_passed = false; 596 661 } 662 663 // Default messages to send to any actor to change status 664 // struct __DeleteMsg { inline message; } DeleteMsg; 665 // void ?{}( __DeleteMsg & this ) { ((message &) this){ Finished }; } 666 // struct __DestroyMsg { inline message; } DestroyMsg; 667 // void ?{}( __DestroyMsg & this ) { ((message &) this){ Finished }; } 668 // struct __FinishedMsg { inline message; } FinishedMsg; 669 // void ?{}( __FinishedMsg & this ) { ((message &) this){ Finished }; } 670 671 // Allocation receive( actor & this, __DeleteMsg & msg ) { return Delete; } 672 // Allocation receive( actor & this, __DestroyMsg & msg ) { return Destroy; } 673 // Allocation receive( actor & this, __FinishedMsg & msg ) { return Finished; }
Note: See TracChangeset
for help on using the changeset viewer.