Changeset 2d028039 for libcfa/src
- Timestamp:
- Feb 8, 2023, 3:07:52 PM (23 months ago)
- Branches:
- ADT, ast-experimental, master
- Children:
- 4616622
- Parents:
- ccf1d99
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/actor.hfa
rccf1d99 r2d028039 5 5 #include <list.hfa> 6 6 #include <kernel.hfa> 7 #include <vector2.hfa> 7 8 8 9 #ifdef __CFA_DEBUG__ … … 25 26 #define __DEFAULT_EXECUTOR_SEPCLUS__ false 26 27 27 // when you flip this make sure to recompile compiler and flip the appropriate flag there too in Actors.cpp 28 #define __ALLOC 0 28 #define __STEAL 1 // workstealing toggle. Disjoint from toggles above 29 30 // whether to steal work or to steal a queue Only applicable if __STEAL == 1 31 #define __STEAL_WORK 0 32 33 // heuristic selection (only set one to be 1) 34 #define __RAND_QUEUE 1 35 #define __RAND_WORKER 0 36 37 // show stealing stats 38 // #define __STEAL_STATS 29 39 30 40 // forward decls … … 40 50 __receive_fn fn; 41 51 bool stop; 42 inline dlink(request);43 52 }; 44 P9_EMBEDDED( request, dlink(request) )45 53 46 54 static inline void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel … … 60 68 // hybrid data structure. Copies until buffer is full and then allocates for intrusive list 61 69 struct copy_queue { 62 dlist( request ) list;63 #if ! __ALLOC64 70 request * buffer; 65 size_t count, buffer_size, index; 66 #endif 71 size_t count, buffer_size, index, utilized, last_size; 67 72 }; 68 73 static inline void ?{}( copy_queue & this ) {} 69 74 static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) { 70 list{};71 #if ! __ALLOC72 75 buffer_size = buf_size; 73 76 buffer = aalloc( buffer_size ); 74 77 count = 0; 78 utilized = 0; 75 79 index = 0; 76 #endif 77 } 78 static inline void ^?{}( copy_queue & this ) with(this) { 79 #if ! __ALLOC 80 adelete(buffer); 81 #endif 82 } 80 last_size = 0; 81 } 82 static inline void ^?{}( copy_queue & this ) with(this) { adelete(buffer); } 83 83 84 84 static inline void insert( copy_queue & this, request & elem ) with(this) { 85 #if ! __ALLOC 86 if ( count < buffer_size ) { // fast path ( no alloc ) 87 buffer[count]{ elem }; 88 count++; 89 return; 90 } 91 request * new_elem = alloc(); 92 (*new_elem){ elem }; 93 insert_last( list, *new_elem ); 94 #else 95 insert_last( list, elem ); 96 #endif 85 if ( count >= buffer_size ) { // increase arr size 86 last_size = buffer_size; 87 buffer_size = 2 * buffer_size; 88 buffer = realloc( buffer, sizeof( request ) * buffer_size ); 89 /* paranoid */ verify( buffer ); 90 } 91 buffer[count]{ elem }; // C_TODO: change to memcpy 92 // memcpy( &buffer[count], &elem, sizeof(request) ); 93 count++; 97 94 } 98 95 99 96 // once you start removing you need to remove all elements 100 97 // it is not supported to call insert() before the list is fully empty 101 // should_delete is an output param 102 static inline request & remove( copy_queue & this, bool & should_delete ) with(this) { 103 #if ! __ALLOC 98 static inline request & remove( copy_queue & this ) with(this) { 104 99 if ( count > 0 ) { 105 100 count--; 106 should_delete = false;107 101 size_t old_idx = index; 108 102 index = count == 0 ? 0 : index + 1; 109 103 return buffer[old_idx]; 110 104 } 111 #endif 112 should_delete = true; 113 return try_pop_front( list ); 114 } 115 116 static inline bool isEmpty( copy_queue & this ) with(this) { 117 #if ! __ALLOC 118 return count == 0 && list`isEmpty; 119 #else 120 return list`isEmpty; 121 #endif 122 } 105 request * ret = 0p; 106 return *0p; 107 } 108 109 // try to reclaim some memory 110 static inline void reclaim( copy_queue & this ) with(this) { 111 if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; } 112 utilized = 0; 113 buffer_size--; 114 buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory 115 } 116 117 static inline bool isEmpty( copy_queue & this ) with(this) { return count == 0; } 123 118 124 119 static size_t __buffer_size = 10; // C_TODO: rework this to be passed from executor through ctors (no need for global) … … 126 121 __spinlock_t mutex_lock; 127 122 copy_queue owned_queue; 128 copy_queue * c_queue; // C_TODO: try putting this on the stack with ptr juggling129 123 copy_queue * c_queue; 124 volatile bool being_processed; 130 125 }; // work_queue 131 126 static inline void ?{}( work_queue & this ) with(this) { 132 // c_queue = alloc();133 // (*c_queue){ __buffer_size };134 127 owned_queue{ __buffer_size }; 135 128 c_queue = &owned_queue; 136 } 137 // static inline void ^?{}( work_queue & this ) with(this) { delete( c_queue );}129 being_processed = false; 130 } 138 131 139 132 static inline void insert( work_queue & this, request & elem ) with(this) { … … 143 136 } // insert 144 137 145 static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) {138 static inline void transfer( work_queue & this, copy_queue ** transfer_to, work_queue ** queue_arr, unsigned int idx ) with(this) { 146 139 lock( mutex_lock __cfaabi_dbg_ctx2 ); 140 #if __STEAL 141 142 #if __STEAL_WORK 143 if ( unlikely( being_processed ) ) 144 #else 145 // check if queue has been stolen out from under us between 146 // transfer() call and lock acquire C_TODO: maybe just use new queue! 147 if ( unlikely( being_processed || queue_arr[idx] != &this ) ) 148 #endif // __STEAL_WORK 149 { 150 unlock( mutex_lock ); 151 return; 152 } 153 154 being_processed = c_queue->count != 0; 155 #endif // __STEAL 156 157 c_queue->utilized = c_queue->count; 158 147 159 // swap copy queue ptrs 148 160 copy_queue * temp = *transfer_to; … … 153 165 154 166 thread worker { 155 copy_queue owned_queue; 156 work_queue * request_queues; 167 work_queue ** request_queues; 157 168 copy_queue * current_queue; 169 worker ** worker_arr; // C_TODO: change n_workers, n_queues,worker_arr to just be pulled from ptr to executor 158 170 request & req; 159 unsigned int start, range; 171 unsigned int start, range, empty_count, n_workers, n_queues, id; 172 #ifdef __STEAL_STATS 173 unsigned int try_steal, stolen; 174 #endif 160 175 }; 161 176 162 static inline void ?{}( worker & this, cluster & clu, work_queue * request_queues, unsigned int start, unsigned int range ) { 177 #ifdef __STEAL_STATS 178 unsigned int total_tries = 0, total_stolen = 0, total_workers; 179 #endif 180 static inline void ?{}( worker & this, cluster & clu, work_queue ** request_queues, copy_queue * current_queue, unsigned int start, 181 unsigned int range, worker ** worker_arr, unsigned int n_workers, unsigned int n_queues, unsigned int id ) { 163 182 ((thread &)this){ clu }; 164 183 this.request_queues = request_queues; 165 // this.current_queue = alloc(); 166 // (*this.current_queue){ __buffer_size }; 167 this.owned_queue{ __buffer_size }; 168 this.current_queue = &this.owned_queue; 184 this.current_queue = current_queue; 169 185 this.start = start; 170 186 this.range = range; 171 } 172 // static inline void ^?{}( worker & mutex this ) with(this) { delete( current_queue ); } 187 this.empty_count = 0; 188 this.n_workers = n_workers; 189 this.worker_arr = worker_arr; 190 this.n_queues = n_queues; 191 this.id = id; 192 #ifdef __STEAL_STATS 193 this.try_steal = 0; 194 this.stolen = 0; 195 total_workers = n_workers; 196 #endif 197 } 198 static inline void ^?{}( worker & mutex this ) with(this) { 199 // delete( current_queue ); 200 #ifdef __STEAL_STATS 201 __atomic_add_fetch(&total_tries, try_steal, __ATOMIC_SEQ_CST); 202 __atomic_add_fetch(&total_stolen, stolen, __ATOMIC_SEQ_CST); 203 if (__atomic_sub_fetch(&total_workers, 1, __ATOMIC_SEQ_CST) == 0) 204 printf("steal attempts: %u, steals: %u\n", total_tries, total_stolen); 205 #endif 206 } 173 207 174 208 struct executor { 175 209 cluster * cluster; // if workers execute on separate cluster 176 210 processor ** processors; // array of virtual processors adding parallelism for workers 177 work_queue * request_queues; // master list of work request queues 178 worker ** workers; // array of workers executing work requests 211 work_queue * request_queues; // master array of work request queues 212 copy_queue * local_queues; // array of all worker local queues to avoid deletion race 213 work_queue ** worker_req_queues; // secondary array of work queues to allow for swapping 214 worker ** workers; // array of workers executing work requests 179 215 unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues 180 216 bool seperate_clus; // use same or separate cluster for executor … … 195 231 196 232 request_queues = aalloc( nrqueues ); 197 for ( i; nrqueues ) 233 worker_req_queues = aalloc( nrqueues ); 234 for ( i; nrqueues ) { 198 235 request_queues[i]{}; 236 worker_req_queues[i] = &request_queues[i]; 237 } 199 238 200 239 processors = aalloc( nprocessors ); … … 202 241 (*(processors[i] = alloc())){ *cluster }; 203 242 243 local_queues = aalloc( nworkers ); 204 244 workers = alloc( nworkers ); 205 245 unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers; 206 246 for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) { 247 local_queues[i]{ buf_size }; 207 248 range = reqPerWorker + ( i < extras ? 1 : 0 ); 208 (*(workers[i] = alloc())){ *cluster, request_queues, start, range};249 (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], start, range, workers, nworkers, nrqueues, i }; 209 250 } // for 210 251 } … … 215 256 static inline void ?{}( executor & this ) { this{ __DEFAULT_EXECUTOR_PROCESSORS__ }; } 216 257 258 // C_TODO: once stealing is implemented make sure shutdown still works 217 259 static inline void ^?{}( executor & this ) with(this) { 260 #if __STEAL 261 request sentinels[nrqueues]; 262 for ( unsigned int i = 0; i < nrqueues; i++ ) { 263 insert( request_queues[i], sentinels[i] ); // force eventually termination 264 } // for 265 #else 218 266 request sentinels[nworkers]; 219 267 unsigned int reqPerWorker = nrqueues / nworkers; … … 221 269 insert( request_queues[step], sentinels[i] ); // force eventually termination 222 270 } // for 271 #endif 223 272 224 273 for ( i; nworkers ) … … 230 279 231 280 adelete( workers ); 281 adelete( local_queues ); 232 282 adelete( request_queues ); 283 adelete( worker_req_queues ); 233 284 adelete( processors ); 234 285 if ( seperate_clus ) delete( cluster ); … … 306 357 } 307 358 359 // Couple of ways to approach work stealing 360 // 1: completely worker agnostic, just find a big queue and steal it 361 // 2: track some heuristic of worker's load and focus on that and then pick a queue from that worker 362 // worker heuristics: 363 // - how many queues have work? 364 // - size of largest queue 365 // - total # of messages 366 // - messages currently servicing 367 // - pick randomly 368 // - pick from closer threads/workers (this can be combined with others) 369 370 // lock free or global lock for queue stealing 371 #define __LOCK_SWP 0 372 373 __spinlock_t swp_lock; 374 375 // tries to atomically swap two queues and returns a bool indicating if the swap failed 376 static inline bool try_swap_queues( worker & this, unsigned int victim_idx, unsigned int my_idx ) with(this) { 377 #if __LOCK_SWP 378 379 lock( swp_lock __cfaabi_dbg_ctx2 ); 380 work_queue * temp = request_queues[my_idx]; 381 request_queues[my_idx] = request_queues[victim_idx]; 382 request_queues[victim_idx] = temp; 383 unlock( swp_lock ); 384 385 return true; 386 387 #else // __LOCK_SWP else 388 work_queue * my_queue = request_queues[my_idx]; 389 work_queue * other_queue = request_queues[victim_idx]; 390 if ( other_queue == 0p || my_queue == 0p ) return false; 391 392 // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false 393 if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) 394 return false; 395 396 // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false 397 if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) { 398 /* paranoid */ verify( request_queues[my_idx] == 0p ); 399 request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val 400 return false; 401 } 402 403 // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically 404 request_queues[my_idx] = other_queue; // last write does not need to be atomic 405 return true; 406 407 #endif // __LOCK_SWP 408 } 409 410 // once a worker to steal from has been chosen, choose queue to steal from 411 static inline bool choose_queue( worker & this, unsigned int victim_id, unsigned int & last_idx ) with(this) { 412 #if __RAND_QUEUE 413 unsigned int tries = 0; 414 const unsigned int start_idx = prng( n_queues ); 415 work_queue * curr_steal_queue; 416 417 for ( unsigned int i = start_idx; tries < n_queues; i = (i + 1) % n_queues ) { 418 tries++; 419 curr_steal_queue = request_queues[i]; 420 #if __STEAL_WORK 421 422 // avoid empty queues and queues that are being operated on 423 if ( curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) ) 424 continue; 425 426 // in this case we just return from transfer if this doesn't work 427 transfer( *curr_steal_queue, ¤t_queue, request_queues, i ); 428 if ( isEmpty( *current_queue ) ) continue; 429 last_idx = i; 430 431 #ifdef __STEAL_STATS 432 stolen++; 433 #endif // __STEAL_STATS 434 435 #else // __STEAL_WORK else 436 437 // avoid empty queues and queues that are being operated on 438 if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) ) 439 continue; 440 441 #ifdef __STEAL_STATS 442 bool success = try_swap_queues( this, i, last_idx ); 443 if ( success ) stolen++; 444 #else 445 try_swap_queues( this, i, last_idx ); 446 #endif // __STEAL_STATS 447 448 // C_TODO: try transfer immediately 449 // transfer( *request_queues[last_idx], ¤t_queue, request_queues, last_idx ); 450 // if ( isEmpty( *current_queue ) ) return false; 451 return false; 452 453 #endif // __STEAL_WORK 454 455 return true; 456 } // for 457 return false; 458 459 #elif __RAND_WORKER 460 461 // have to calculate victim start and range since victim may be deleted before us in shutdown 462 const unsigned int queues_per_worker = n_queues / n_workers; 463 const unsigned int extras = n_queues % n_workers; 464 unsigned int vic_start, vic_range; 465 if ( extras > victim_id ) { 466 vic_range = queues_per_worker + 1; 467 vic_start = vic_range * victim_id; 468 } else { 469 vic_start = extras + victim_id * queues_per_worker; 470 vic_range = queues_per_worker; 471 } 472 unsigned int start_idx = prng( vic_range ); 473 unsigned int tries = 0; 474 work_queue * curr_steal_queue; 475 476 for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) { 477 tries++; 478 curr_steal_queue = request_queues[ i + vic_start ]; 479 // avoid empty queues and queues that are being operated on 480 if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) ) 481 continue; 482 483 try_swap_queues( this, i, last_idx ); 484 485 #ifdef __STEAL_STATS 486 bool success = try_swap_queues( this, i, last_idx ); 487 if ( success ) stolen++; 488 #else 489 try_swap_queues( this, i, last_idx ); 490 #endif // __STEAL_STATS 491 492 // C_TODO: try transfer immediately 493 // transfer( *request_queues[last_idx], ¤t_queue, request_queues, last_idx ); 494 // if ( isEmpty( *current_queue ) ) return false; 495 return false; 496 } 497 #endif 498 } 499 500 // choose a worker to steal from 501 static inline bool choose_victim( worker & this, unsigned int & last_idx ) with(this) { 502 #if __RAND_WORKER 503 unsigned int victim = prng( n_workers ); 504 if ( victim == id ) victim = ( victim + 1 ) % n_workers; 505 return choose_queue( this, victim, last_idx ); 506 #else 507 return choose_queue( this, 0, last_idx ); 508 #endif 509 } 510 511 // look for work to steal 512 // returns a bool: true => a queue was stolen, false => no work was stolen 513 static inline bool steal_work( worker & this, unsigned int & last_idx ) with(this) { // C_TODO: add debug tracking of how many steals occur 514 // to steal queue acquire both queue's locks in address ordering (maybe can do atomic swap) 515 // maybe a flag to hint which queue is being processed 516 // look at count to see if queue is worth stealing (dont steal empty queues) 517 // if steal and then flag is up then dont process and just continue looking at own queues 518 // (best effort approach) its ok if stealing isn't fruitful 519 // -> more important to not delay busy threads 520 521 return choose_victim( this, last_idx ); 522 } 523 308 524 void main( worker & this ) with(this) { 309 bool should_delete; 525 // threshold of empty queues we see before we go stealing 526 const unsigned int steal_threshold = 2 * n_queues; 527 unsigned int curr_idx; 528 work_queue * curr_work_queue; 310 529 Exit: 311 530 for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers 312 531 // C_TODO: potentially check queue count instead of immediately trying to transfer 313 transfer( request_queues[i + start], ¤t_queue ); 532 curr_idx = i + start; 533 curr_work_queue = request_queues[curr_idx]; 534 transfer( *curr_work_queue, ¤t_queue, request_queues, curr_idx ); 535 if ( isEmpty( *current_queue ) ) { 536 #if __STEAL 537 empty_count++; 538 if ( empty_count < steal_threshold ) continue; 539 empty_count = 0; // C_TODO: look into stealing backoff schemes 540 #ifdef __STEAL_STATS 541 try_steal++; 542 #endif // __STEAL_STATS 543 544 if ( ! steal_work( this, curr_idx ) ) continue; 545 546 #else // __STEAL else 547 548 continue; 549 550 #endif // __STEAL 551 } 314 552 while ( ! isEmpty( *current_queue ) ) { 315 &req = &remove( *current_queue , should_delete);553 &req = &remove( *current_queue ); 316 554 if ( !&req ) continue; // possibly add some work stealing/idle sleep here 317 555 if ( req.stop ) break Exit; 318 556 deliver_request( req ); 319 320 if ( should_delete ) delete( &req ); 321 } // while 557 } 558 #if __STEAL 559 curr_work_queue->being_processed = false; // set done processing 560 #endif 561 empty_count = 0; // we found work so reset empty counter 562 reclaim( *current_queue ); 322 563 } // for 323 564 }
Note: See TracChangeset
for help on using the changeset viewer.