Changeset b9c06b98
- Timestamp:
- Jun 26, 2023, 11:01:39 PM (22 months ago)
- Branches:
- master
- Children:
- 2dfdae3
- Parents:
- c4497e3
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
TabularUnified libcfa/src/concurrency/actor.hfa ¶
rc4497e3 rb9c06b98 48 48 typedef allocation (*__receive_fn)(actor &, message &, actor **, message **); 49 49 struct request { 50 51 52 50 actor * receiver; 51 message * msg; 52 __receive_fn fn; 53 53 }; 54 54 55 55 struct a_msg { 56 56 int m; 57 57 }; 58 58 static inline void ?{}( request & this ) {} 59 59 static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) { 60 61 62 60 this.receiver = receiver; 61 this.msg = msg; 62 this.fn = fn; 63 63 } 64 64 static inline void ?{}( request & this, request & copy ) { 65 66 67 65 this.receiver = copy.receiver; 66 this.msg = copy.msg; 67 this.fn = copy.fn; 68 68 } 69 69 … … 71 71 // assumes gulping behaviour (once a remove occurs, removes happen until empty beforw next insert) 72 72 struct copy_queue { 73 74 73 request * buffer; 74 size_t count, buffer_size, index, utilized, last_size; 75 75 }; 76 76 static inline void ?{}( copy_queue & this ) {} 77 77 static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) { 78 79 80 81 82 83 78 buffer_size = buf_size; 79 buffer = aalloc( buffer_size ); 80 count = 0; 81 utilized = 0; 82 index = 0; 83 last_size = 0; 84 84 } 85 85 static inline void ^?{}( copy_queue & this ) with(this) { 86 87 86 DEBUG_ABORT( count != 0, "Actor system terminated with messages sent but not received\n" ); 87 adelete(buffer); 88 88 } 89 89 90 90 static inline void insert( copy_queue & this, request & elem ) with(this) { 91 92 93 94 95 96 97 98 91 if ( count >= buffer_size ) { // increase arr size 92 last_size = buffer_size; 93 buffer_size = 2 * buffer_size; 94 buffer = realloc( buffer, sizeof( request ) * buffer_size ); 95 /* paranoid */ verify( buffer ); 96 } 97 memcpy( &buffer[count], &elem, sizeof(request) ); 98 count++; 99 99 } 100 100 … … 102 102 // it is not supported to call insert() before the array is fully empty 103 103 static inline request & remove( copy_queue & this ) with(this) { 104 105 106 107 108 109 110 111 104 if ( count > 0 ) { 105 count--; 106 size_t old_idx = index; 107 index = count == 0 ? 0 : index + 1; 108 return buffer[old_idx]; 109 } 110 request * ret = 0p; 111 return *0p; 112 112 } 113 113 114 114 // try to reclaim some memory if less than half of buffer is utilized 115 115 static inline void reclaim( copy_queue & this ) with(this) { 116 117 118 119 116 if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; } 117 utilized = 0; 118 buffer_size--; 119 buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory 120 120 } 121 121 … … 123 123 124 124 struct work_queue { 125 126 copy_queue * owned_queue;// copy queue allocated and cleaned up by this work_queue127 copy_queue * c_queue;// current queue128 volatile bool being_processed;// flag to prevent concurrent processing129 130 131 size_t missed;// transfers skipped due to being_processed flag being up132 125 __spinlock_t mutex_lock; 126 copy_queue * owned_queue; // copy queue allocated and cleaned up by this work_queue 127 copy_queue * c_queue; // current queue 128 volatile bool being_processed; // flag to prevent concurrent processing 129 #ifdef ACTOR_STATS 130 unsigned int id; 131 size_t missed; // transfers skipped due to being_processed flag being up 132 #endif 133 133 }; // work_queue 134 134 static inline void ?{}( work_queue & this, size_t buf_size, unsigned int i ) with(this) { 135 owned_queue = alloc();// allocated separately to avoid false sharing136 137 138 139 140 141 142 135 owned_queue = alloc(); // allocated separately to avoid false sharing 136 (*owned_queue){ buf_size }; 137 c_queue = owned_queue; 138 being_processed = false; 139 #ifdef ACTOR_STATS 140 id = i; 141 missed = 0; 142 #endif 143 143 } 144 144 … … 147 147 148 148 static inline void insert( work_queue & this, request & elem ) with(this) { 149 150 151 149 lock( mutex_lock __cfaabi_dbg_ctx2 ); 150 insert( *c_queue, elem ); 151 unlock( mutex_lock ); 152 152 } // insert 153 153 154 154 static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) { 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 155 lock( mutex_lock __cfaabi_dbg_ctx2 ); 156 #ifdef __STEAL 157 158 // check if queue is being processed elsewhere 159 if ( unlikely( being_processed ) ) { 160 #ifdef ACTOR_STATS 161 missed++; 162 #endif 163 unlock( mutex_lock ); 164 return; 165 } 166 167 being_processed = c_queue->count != 0; 168 #endif // __STEAL 169 170 c_queue->utilized = c_queue->count; 171 172 // swap copy queue ptrs 173 copy_queue * temp = *transfer_to; 174 *transfer_to = c_queue; 175 c_queue = temp; 176 unlock( mutex_lock ); 177 177 } // transfer 178 178 179 179 // needed since some info needs to persist past worker lifetimes 180 180 struct worker_info { 181 182 183 184 185 186 181 volatile unsigned long long stamp; 182 #ifdef ACTOR_STATS 183 size_t stolen_from, try_steal, stolen, empty_stolen, failed_swaps, msgs_stolen; 184 unsigned long long processed; 185 size_t gulps; 186 #endif 187 187 }; 188 188 static inline void ?{}( worker_info & this ) { 189 190 191 this.try_steal = 0;// attempts to steal192 this.stolen = 0;// successful steals193 this.processed = 0;// requests processed194 this.gulps = 0;// number of gulps195 this.failed_swaps = 0;// steal swap failures196 this.empty_stolen = 0;// queues empty after steal197 this.msgs_stolen = 0;// number of messages stolen198 199 189 #ifdef ACTOR_STATS 190 this.stolen_from = 0; 191 this.try_steal = 0; // attempts to steal 192 this.stolen = 0; // successful steals 193 this.processed = 0; // requests processed 194 this.gulps = 0; // number of gulps 195 this.failed_swaps = 0; // steal swap failures 196 this.empty_stolen = 0; // queues empty after steal 197 this.msgs_stolen = 0; // number of messages stolen 198 #endif 199 this.stamp = rdtscl(); 200 200 } 201 201 … … 205 205 // #endif 206 206 thread worker { 207 208 209 210 211 207 work_queue ** request_queues; 208 copy_queue * current_queue; 209 executor * executor_; 210 unsigned int start, range; 211 int id; 212 212 }; 213 213 … … 215 215 // aggregate counters for statistics 216 216 size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0, __total_empty_stolen = 0, 217 217 __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0; 218 218 #endif 219 219 static inline void ?{}( worker & this, cluster & clu, work_queue ** request_queues, copy_queue * current_queue, executor * executor_, 220 221 222 this.request_queues = request_queues;// array of all queues223 this.current_queue = current_queue;// currently gulped queue (start with empty queue to use in swap later)224 this.executor_ = executor_;// pointer to current executor225 this.start = start;// start of worker's subrange of request_queues226 this.range = range;// size of worker's subrange of request_queues227 this.id = id;// worker's id and index in array of workers220 unsigned int start, unsigned int range, int id ) { 221 ((thread &)this){ clu }; 222 this.request_queues = request_queues; // array of all queues 223 this.current_queue = current_queue; // currently gulped queue (start with empty queue to use in swap later) 224 this.executor_ = executor_; // pointer to current executor 225 this.start = start; // start of worker's subrange of request_queues 226 this.range = range; // size of worker's subrange of request_queues 227 this.id = id; // worker's id and index in array of workers 228 228 } 229 229 230 230 static bool no_steal = false; 231 231 struct executor { 232 cluster * cluster;// if workers execute on separate cluster233 processor ** processors; 234 work_queue * request_queues; 235 copy_queue * local_queues;// array of all worker local queues to avoid deletion race236 work_queue ** worker_req_queues; 237 worker ** workers;// array of workers executing work requests238 worker_info * w_infos;// array of info about each worker239 unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues240 bool seperate_clus; // use same or separate cluster for executor241 volatile bool is_shutdown;// flag to communicate shutdown to worker threads232 cluster * cluster; // if workers execute on separate cluster 233 processor ** processors; // array of virtual processors adding parallelism for workers 234 work_queue * request_queues; // master array of work request queues 235 copy_queue * local_queues; // array of all worker local queues to avoid deletion race 236 work_queue ** worker_req_queues; // secondary array of work queues to allow for swapping 237 worker ** workers; // array of workers executing work requests 238 worker_info * w_infos; // array of info about each worker 239 unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues 240 bool seperate_clus; // use same or separate cluster for executor 241 volatile bool is_shutdown; // flag to communicate shutdown to worker threads 242 242 }; // executor 243 243 … … 246 246 // #endif 247 247 static inline void ^?{}( worker & mutex this ) with(this) { 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 //if ( replaced_queue[start + i] > 0 ){264 //count++;265 //// printf("%d: %u, ",i, replaced_queue[i]);266 //}267 //if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0)268 //count2++;269 270 271 272 273 248 #ifdef ACTOR_STATS 249 __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST); 250 __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST); 251 __atomic_add_fetch(&__all_msgs_stolen, executor_->w_infos[id].msgs_stolen,__ATOMIC_SEQ_CST); 252 __atomic_add_fetch(&__total_tries, executor_->w_infos[id].try_steal, __ATOMIC_SEQ_CST); 253 __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST); 254 __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST); 255 __atomic_add_fetch(&__total_empty_stolen, executor_->w_infos[id].empty_stolen, __ATOMIC_SEQ_CST); 256 257 // per worker steal stats (uncomment alongside the lock above this routine to print) 258 // lock( out_lock __cfaabi_dbg_ctx2 ); 259 // printf("Worker id: %d, processed: %llu messages, attempted %lu, stole: %lu, stolen from: %lu\n", id, processed, try_steal, stolen, __atomic_add_fetch(&executor_->w_infos[id].stolen_from, 0, __ATOMIC_SEQ_CST) ); 260 // int count = 0; 261 // int count2 = 0; 262 // for ( i; range ) { 263 // if ( replaced_queue[start + i] > 0 ){ 264 // count++; 265 // // printf("%d: %u, ",i, replaced_queue[i]); 266 // } 267 // if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0) 268 // count2++; 269 // } 270 // printf("swapped with: %d of %u indices\n", count, executor_->nrqueues / executor_->nworkers ); 271 // printf("%d of %u indices were stolen\n", count2, executor_->nrqueues / executor_->nworkers ); 272 // unlock( out_lock ); 273 #endif 274 274 } 275 275 276 276 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) { 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 277 if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" ); 278 this.nprocessors = nprocessors; 279 this.nworkers = nworkers; 280 this.nrqueues = nrqueues; 281 this.seperate_clus = seperate_clus; 282 this.is_shutdown = false; 283 284 if ( nworkers == nrqueues ) 285 no_steal = true; 286 287 #ifdef ACTOR_STATS 288 // stolen_arr = aalloc( nrqueues ); 289 // replaced_queue = aalloc( nrqueues ); 290 __total_workers = nworkers; 291 #endif 292 293 if ( seperate_clus ) { 294 cluster = alloc(); 295 (*cluster){}; 296 } else cluster = active_cluster(); 297 298 request_queues = aalloc( nrqueues ); 299 worker_req_queues = aalloc( nrqueues ); 300 for ( i; nrqueues ) { 301 request_queues[i]{ buf_size, i }; 302 worker_req_queues[i] = &request_queues[i]; 303 } 304 305 processors = aalloc( nprocessors ); 306 for ( i; nprocessors ) 307 (*(processors[i] = alloc())){ *cluster }; 308 309 local_queues = aalloc( nworkers ); 310 workers = aalloc( nworkers ); 311 w_infos = aalloc( nworkers ); 312 unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers; 313 314 for ( i; nworkers ) { 315 w_infos[i]{}; 316 local_queues[i]{ buf_size }; 317 } 318 319 for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) { 320 range = reqPerWorker + ( i < extras ? 1 : 0 ); 321 (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], &this, start, range, i }; 322 } // for 323 323 } 324 324 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __DEFAULT_EXECUTOR_BUFSIZE__ }; } … … 329 329 330 330 static inline void ^?{}( executor & this ) with(this) { 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 printf("Actor System Stats:\n");359 360 361 362 363 364 365 366 367 331 is_shutdown = true; 332 333 for ( i; nworkers ) 334 delete( workers[i] ); 335 336 for ( i; nprocessors ) { 337 delete( processors[i] ); 338 } // for 339 340 #ifdef ACTOR_STATS 341 size_t misses = 0; 342 for ( i; nrqueues ) { 343 misses += worker_req_queues[i]->missed; 344 } 345 // adelete( stolen_arr ); 346 // adelete( replaced_queue ); 347 #endif 348 349 adelete( workers ); 350 adelete( w_infos ); 351 adelete( local_queues ); 352 adelete( request_queues ); 353 adelete( worker_req_queues ); 354 adelete( processors ); 355 if ( seperate_clus ) delete( cluster ); 356 357 #ifdef ACTOR_STATS // print formatted stats 358 printf(" Actor System Stats:\n"); 359 printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed); 360 size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps; 361 printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses); 362 printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\t Empty steals:\t\t%lu\n", 363 __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps, __total_empty_stolen); 364 size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen; 365 printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal); 366 #endif 367 368 368 } 369 369 … … 372 372 373 373 static inline size_t __get_next_ticket( executor & this ) with(this) { 374 375 376 377 378 379 380 381 382 374 #ifdef __CFA_DEBUG__ 375 size_t temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 376 377 // reserve MAX for dead actors 378 if ( unlikely( temp == MAX ) ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 379 return temp; 380 #else 381 return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_RELAXED) % nrqueues; 382 #endif 383 383 } // tickets 384 384 385 385 // TODO: update globals in this file to be static fields once the static fields project is done 386 386 static executor * __actor_executor_ = 0p; 387 static bool __actor_executor_passed = false; 388 static size_t __num_actors_ = 0; 387 static bool __actor_executor_passed = false; // was an executor passed to start_actor_system 388 static size_t __num_actors_ = 0; // number of actor objects in system 389 389 static struct thread$ * __actor_executor_thd = 0p; // used to wake executor after actors finish 390 390 struct actor { 391 size_t ticket;// executor-queue handle392 allocation alloc;// allocation action393 391 size_t ticket; // executor-queue handle 392 allocation alloc; // allocation action 393 inline virtual_dtor; 394 394 }; 395 395 396 396 static inline void ?{}( actor & this ) with(this) { 397 398 399 400 401 402 403 404 405 397 // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive 398 // member must be called to end it 399 DEBUG_ABORT( __actor_executor_ == 0p, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" ); 400 alloc = Nodelete; 401 ticket = __get_next_ticket( *__actor_executor_ ); 402 __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED ); 403 #ifdef ACTOR_STATS 404 __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST ); 405 #endif 406 406 } 407 407 408 408 static inline void check_actor( actor & this ) { 409 410 411 412 413 CFA_DEBUG( this.ticket = MAX; );// mark as terminated414 415 416 417 CFA_DEBUG( this.ticket = MAX; );// mark as terminated418 419 default: ;// stop warning420 421 422 423 424 425 409 if ( this.alloc != Nodelete ) { 410 switch( this.alloc ) { 411 case Delete: delete( &this ); break; 412 case Destroy: 413 CFA_DEBUG( this.ticket = MAX; ); // mark as terminated 414 ^?{}(this); 415 break; 416 case Finished: 417 CFA_DEBUG( this.ticket = MAX; ); // mark as terminated 418 break; 419 default: ; // stop warning 420 } 421 422 if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_RELAXED ) == 0 ) ) { // all actors have terminated 423 unpark( __actor_executor_thd ); 424 } 425 } 426 426 } 427 427 428 428 struct message { 429 allocation alloc;// allocation action430 429 allocation alloc; // allocation action 430 inline virtual_dtor; 431 431 }; 432 432 433 433 static inline void ?{}( message & this ) { 434 434 this.alloc = Nodelete; 435 435 } 436 436 static inline void ?{}( message & this, allocation alloc ) { 437 memcpy( &this.alloc, &alloc, sizeof(allocation) );// optimization to elide ctor438 CFA_DEBUG( if( this.alloc == Finished ) this.alloc = Nodelete; ) 437 memcpy( &this.alloc, &alloc, sizeof(allocation) ); // optimization to elide ctor 438 CFA_DEBUG( if ( this.alloc == Finished ) this.alloc = Nodelete; ); 439 439 } 440 440 static inline void ^?{}( message & this ) with(this) { 441 441 CFA_DEBUG( 442 442 if ( alloc == Nodelete ) { 443 443 printf( "CFA warning (UNIX pid:%ld) : program terminating with message %p allocated but never sent.\n", … … 448 448 449 449 static inline void check_message( message & this ) { 450 switch ( this.alloc ) { // analyze message status 451 case Nodelete: CFA_DEBUG( this.alloc = Finished ); break; 452 case Delete: delete( &this ); break; 453 case Destroy: ^?{}( this ); break; 454 case Finished: break; 455 } // switch 456 } 457 static inline void set_allocation( message & this, allocation state ) { 458 CFA_DEBUG( if ( state == Nodelete ) state = Finished; ) 459 this.alloc = state; 450 switch ( this.alloc ) { // analyze message status 451 case Nodelete: CFA_DEBUG( this.alloc = Finished ); break; 452 case Delete: delete( &this ); break; 453 case Destroy: ^?{}( this ); break; 454 case Finished: break; 455 } // switch 456 } 457 static inline allocation set_allocation( message & this, allocation state ) { 458 CFA_DEBUG( if ( state == Nodelete ) state = Finished; ); 459 allocation prev = this.alloc; 460 this.alloc = state; 461 return prev; 462 } 463 static inline allocation get_allocation( message & this ) { 464 return this.alloc; 460 465 } 461 466 462 467 static inline void deliver_request( request & this ) { 463 464 465 466 467 468 469 468 DEBUG_ABORT( this.receiver->ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" ); 469 actor * base_actor; 470 message * base_msg; 471 allocation temp = this.fn( *this.receiver, *this.msg, &base_actor, &base_msg ); 472 memcpy( &base_actor->alloc, &temp, sizeof(allocation) ); // optimization to elide ctor 473 check_message( *base_msg ); 474 check_actor( *base_actor ); 470 475 } 471 476 … … 473 478 // returns ptr to newly owned queue if swap succeeds 474 479 static inline work_queue * try_swap_queues( worker & this, unsigned int victim_idx, unsigned int my_idx ) with(this) { 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 480 work_queue * my_queue = request_queues[my_idx]; 481 work_queue * other_queue = request_queues[victim_idx]; 482 483 // if either queue is 0p then they are in the process of being stolen 484 if ( other_queue == 0p ) return 0p; 485 486 // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false 487 if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) 488 return 0p; 489 490 // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false 491 if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) { 492 /* paranoid */ verify( request_queues[my_idx] == 0p ); 493 request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val 494 return 0p; 495 } 496 497 // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically 498 request_queues[my_idx] = other_queue; // last write does not need to be atomic 499 return other_queue; 495 500 } 496 501 497 502 // once a worker to steal from has been chosen, choose queue to steal from 498 503 static inline void choose_queue( worker & this, unsigned int victim_id, unsigned int swap_idx ) with(this) { 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 504 // have to calculate victim start and range since victim may be deleted before us in shutdown 505 const unsigned int queues_per_worker = executor_->nrqueues / executor_->nworkers; 506 const unsigned int extras = executor_->nrqueues % executor_->nworkers; 507 unsigned int vic_start, vic_range; 508 if ( extras > victim_id ) { 509 vic_range = queues_per_worker + 1; 510 vic_start = vic_range * victim_id; 511 } else { 512 vic_start = extras + victim_id * queues_per_worker; 513 vic_range = queues_per_worker; 514 } 515 unsigned int start_idx = prng( vic_range ); 516 517 unsigned int tries = 0; 518 work_queue * curr_steal_queue; 519 520 for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) { 521 tries++; 522 curr_steal_queue = request_queues[ i + vic_start ]; 523 // avoid empty queues and queues that are being operated on 524 if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is_empty( *curr_steal_queue->c_queue ) ) 525 continue; 526 527 #ifdef ACTOR_STATS 528 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx ); 529 if ( curr_steal_queue ) { 530 executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count; 531 executor_->w_infos[id].stolen++; 532 if ( is_empty( *curr_steal_queue->c_queue ) ) executor_->w_infos[id].empty_stolen++; 533 // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED); 534 // replaced_queue[swap_idx]++; 535 // __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED); 536 } else { 537 executor_->w_infos[id].failed_swaps++; 538 } 539 #else 540 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx ); 541 #endif // ACTOR_STATS 542 543 return; 544 } 545 546 return; 542 547 } 543 548 544 549 // choose a worker to steal from 545 550 static inline void steal_work( worker & this, unsigned int swap_idx ) with(this) { 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 551 #if RAND 552 unsigned int victim = prng( executor_->nworkers ); 553 if ( victim == id ) victim = ( victim + 1 ) % executor_->nworkers; 554 choose_queue( this, victim, swap_idx ); 555 #elif SEARCH 556 unsigned long long min = MAX; // smaller timestamp means longer since service 557 int min_id = 0; // use ints not uints to avoid integer underflow without hacky math 558 int n_workers = executor_->nworkers; 559 unsigned long long curr_stamp; 560 int scount = 1; 561 for ( int i = (id + 1) % n_workers; scount < n_workers; i = (i + 1) % n_workers, scount++ ) { 562 curr_stamp = executor_->w_infos[i].stamp; 563 if ( curr_stamp < min ) { 564 min = curr_stamp; 565 min_id = i; 566 } 567 } 568 choose_queue( this, min_id, swap_idx ); 569 #endif 565 570 } 566 571 567 572 #define CHECK_TERMINATION if ( unlikely( executor_->is_shutdown ) ) break Exit 568 573 void main( worker & this ) with(this) { 569 570 571 //replaced_queue[i] = 0;572 //__atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST );573 574 575 576 577 578 579 580 581 582 583 584 585 for ( unsigned int i = 0;; i = (i + 1) % range ) {// cycle through set of request buffers586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 curr_work_queue->being_processed = false;// set done processing630 631 632 633 634 635 636 637 574 // #ifdef ACTOR_STATS 575 // for ( i; executor_->nrqueues ) { 576 // replaced_queue[i] = 0; 577 // __atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST ); 578 // } 579 // #endif 580 581 // threshold of empty queues we see before we go stealing 582 const unsigned int steal_threshold = 2 * range; 583 584 // Store variable data here instead of worker struct to avoid any potential false sharing 585 unsigned int empty_count = 0; 586 request & req; 587 work_queue * curr_work_queue; 588 589 Exit: 590 for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers 591 curr_work_queue = request_queues[i + start]; 592 593 // check if queue is empty before trying to gulp it 594 if ( is_empty( *curr_work_queue->c_queue ) ) { 595 #ifdef __STEAL 596 empty_count++; 597 if ( empty_count < steal_threshold ) continue; 598 #else 599 continue; 600 #endif 601 } 602 transfer( *curr_work_queue, ¤t_queue ); 603 #ifdef ACTOR_STATS 604 executor_->w_infos[id].gulps++; 605 #endif // ACTOR_STATS 606 #ifdef __STEAL 607 if ( is_empty( *current_queue ) ) { 608 if ( unlikely( no_steal ) ) { CHECK_TERMINATION; continue; } 609 empty_count++; 610 if ( empty_count < steal_threshold ) continue; 611 empty_count = 0; 612 613 CHECK_TERMINATION; // check for termination 614 615 __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED ); 616 617 #ifdef ACTOR_STATS 618 executor_->w_infos[id].try_steal++; 619 #endif // ACTOR_STATS 620 621 steal_work( this, start + prng( range ) ); 622 continue; 623 } 624 #endif // __STEAL 625 while ( ! is_empty( *current_queue ) ) { 626 #ifdef ACTOR_STATS 627 executor_->w_infos[id].processed++; 628 #endif 629 &req = &remove( *current_queue ); 630 if ( !&req ) continue; 631 deliver_request( req ); 632 } 633 #ifdef __STEAL 634 curr_work_queue->being_processed = false; // set done processing 635 empty_count = 0; // we found work so reset empty counter 636 #endif 637 638 CHECK_TERMINATION; 639 640 // potentially reclaim some of the current queue's vector space if it is unused 641 reclaim( *current_queue ); 642 } // for 638 643 } 639 644 640 645 static inline void send( executor & this, request & req, unsigned long int ticket ) with(this) { 641 646 insert( request_queues[ticket], req); 642 647 } 643 648 644 649 static inline void send( actor & this, request & req ) { 645 646 650 DEBUG_ABORT( this.ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" ); 651 send( *__actor_executor_, req, this.ticket ); 647 652 } 648 653 649 654 static inline void __reset_stats() { 650 651 652 653 654 655 656 657 658 659 655 #ifdef ACTOR_STATS 656 __total_tries = 0; 657 __total_stolen = 0; 658 __all_gulps = 0; 659 __total_failed_swaps = 0; 660 __total_empty_stolen = 0; 661 __all_processed = 0; 662 __num_actors_stats = 0; 663 __all_msgs_stolen = 0; 664 #endif 660 665 } 661 666 662 667 static inline void start_actor_system( size_t num_thds ) { 663 664 665 666 668 __reset_stats(); 669 __actor_executor_thd = active_thread(); 670 __actor_executor_ = alloc(); 671 (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 }; 667 672 } 668 673 … … 670 675 671 676 static inline void start_actor_system( executor & this ) { 672 673 674 675 677 __reset_stats(); 678 __actor_executor_thd = active_thread(); 679 __actor_executor_ = &this; 680 __actor_executor_passed = true; 676 681 } 677 682 678 683 static inline void stop_actor_system() { 679 park( );// unparked when actor system is finished680 681 682 683 684 685 684 park(); // unparked when actor system is finished 685 686 if ( !__actor_executor_passed ) delete( __actor_executor_ ); 687 __actor_executor_ = 0p; 688 __actor_executor_thd = 0p; 689 __next_ticket = 0; 690 __actor_executor_passed = false; 686 691 } 687 692 … … 689 694 // assigned at creation to __base_msg_finished to avoid unused message warning 690 695 message __base_msg_finished @= { .alloc : Finished }; 691 struct delete_m essage_t { inline message; } delete_msg = __base_msg_finished;696 struct delete_msg_t { inline message; } delete_msg = __base_msg_finished; 692 697 struct destroy_msg_t { inline message; } destroy_msg = __base_msg_finished; 693 698 struct finished_msg_t { inline message; } finished_msg = __base_msg_finished; 694 699 695 allocation receive( actor & this, delete_m essage_t & msg ) { return Delete; }700 allocation receive( actor & this, delete_msg_t & msg ) { return Delete; } 696 701 allocation receive( actor & this, destroy_msg_t & msg ) { return Destroy; } 697 702 allocation receive( actor & this, finished_msg_t & msg ) { return Finished; } 698
Note: See TracChangeset
for help on using the changeset viewer.