Changeset 2dfdae3
- Timestamp:
- Jun 27, 2023, 4:48:44 PM (22 months ago)
- Branches:
- master
- Children:
- 70e47fec
- Parents:
- 508671e (diff), b9c06b98 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
TabularUnified libcfa/src/concurrency/actor.hfa ¶
r508671e r2dfdae3 48 48 typedef allocation (*__receive_fn)(actor &, message &, actor **, message **); 49 49 struct request { 50 51 52 50 actor * receiver; 51 message * msg; 52 __receive_fn fn; 53 53 }; 54 54 55 55 struct a_msg { 56 56 int m; 57 57 }; 58 58 static inline void ?{}( request & this ) {} 59 59 static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) { 60 61 62 60 this.receiver = receiver; 61 this.msg = msg; 62 this.fn = fn; 63 63 } 64 64 static inline void ?{}( request & this, request & copy ) { 65 66 67 65 this.receiver = copy.receiver; 66 this.msg = copy.msg; 67 this.fn = copy.fn; 68 68 } 69 69 … … 71 71 // assumes gulping behaviour (once a remove occurs, removes happen until empty beforw next insert) 72 72 struct copy_queue { 73 74 73 request * buffer; 74 size_t count, buffer_size, index, utilized, last_size; 75 75 }; 76 76 static inline void ?{}( copy_queue & this ) {} 77 77 static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) { 78 79 80 81 82 83 78 buffer_size = buf_size; 79 buffer = aalloc( buffer_size ); 80 count = 0; 81 utilized = 0; 82 index = 0; 83 last_size = 0; 84 84 } 85 85 static inline void ^?{}( copy_queue & this ) with(this) { 86 87 86 DEBUG_ABORT( count != 0, "Actor system terminated with messages sent but not received\n" ); 87 adelete(buffer); 88 88 } 89 89 90 90 static inline void insert( copy_queue & this, request & elem ) with(this) { 91 92 93 94 95 96 97 98 91 if ( count >= buffer_size ) { // increase arr size 92 last_size = buffer_size; 93 buffer_size = 2 * buffer_size; 94 buffer = realloc( buffer, sizeof( request ) * buffer_size ); 95 /* paranoid */ verify( buffer ); 96 } 97 memcpy( &buffer[count], &elem, sizeof(request) ); 98 count++; 99 99 } 100 100 … … 102 102 // it is not supported to call insert() before the array is fully empty 103 103 static inline request & remove( copy_queue & this ) with(this) { 104 105 106 107 108 109 110 111 104 if ( count > 0 ) { 105 count--; 106 size_t old_idx = index; 107 index = count == 0 ? 0 : index + 1; 108 return buffer[old_idx]; 109 } 110 request * ret = 0p; 111 return *0p; 112 112 } 113 113 114 114 // try to reclaim some memory if less than half of buffer is utilized 115 115 static inline void reclaim( copy_queue & this ) with(this) { 116 117 118 119 116 if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; } 117 utilized = 0; 118 buffer_size--; 119 buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory 120 120 } 121 121 … … 123 123 124 124 struct work_queue { 125 126 copy_queue * owned_queue;// copy queue allocated and cleaned up by this work_queue127 copy_queue * c_queue;// current queue128 volatile bool being_processed;// flag to prevent concurrent processing129 130 131 size_t missed;// transfers skipped due to being_processed flag being up132 125 __spinlock_t mutex_lock; 126 copy_queue * owned_queue; // copy queue allocated and cleaned up by this work_queue 127 copy_queue * c_queue; // current queue 128 volatile bool being_processed; // flag to prevent concurrent processing 129 #ifdef ACTOR_STATS 130 unsigned int id; 131 size_t missed; // transfers skipped due to being_processed flag being up 132 #endif 133 133 }; // work_queue 134 134 static inline void ?{}( work_queue & this, size_t buf_size, unsigned int i ) with(this) { 135 owned_queue = alloc();// allocated separately to avoid false sharing136 137 138 139 140 141 142 135 owned_queue = alloc(); // allocated separately to avoid false sharing 136 (*owned_queue){ buf_size }; 137 c_queue = owned_queue; 138 being_processed = false; 139 #ifdef ACTOR_STATS 140 id = i; 141 missed = 0; 142 #endif 143 143 } 144 144 … … 147 147 148 148 static inline void insert( work_queue & this, request & elem ) with(this) { 149 150 151 149 lock( mutex_lock __cfaabi_dbg_ctx2 ); 150 insert( *c_queue, elem ); 151 unlock( mutex_lock ); 152 152 } // insert 153 153 154 154 static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) { 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 155 lock( mutex_lock __cfaabi_dbg_ctx2 ); 156 #ifdef __STEAL 157 158 // check if queue is being processed elsewhere 159 if ( unlikely( being_processed ) ) { 160 #ifdef ACTOR_STATS 161 missed++; 162 #endif 163 unlock( mutex_lock ); 164 return; 165 } 166 167 being_processed = c_queue->count != 0; 168 #endif // __STEAL 169 170 c_queue->utilized = c_queue->count; 171 172 // swap copy queue ptrs 173 copy_queue * temp = *transfer_to; 174 *transfer_to = c_queue; 175 c_queue = temp; 176 unlock( mutex_lock ); 177 177 } // transfer 178 178 179 179 // needed since some info needs to persist past worker lifetimes 180 180 struct worker_info { 181 182 183 184 185 186 181 volatile unsigned long long stamp; 182 #ifdef ACTOR_STATS 183 size_t stolen_from, try_steal, stolen, empty_stolen, failed_swaps, msgs_stolen; 184 unsigned long long processed; 185 size_t gulps; 186 #endif 187 187 }; 188 188 static inline void ?{}( worker_info & this ) { 189 190 191 this.try_steal = 0;// attempts to steal192 this.stolen = 0;// successful steals193 this.processed = 0;// requests processed194 this.gulps = 0;// number of gulps195 this.failed_swaps = 0;// steal swap failures196 this.empty_stolen = 0;// queues empty after steal197 this.msgs_stolen = 0;// number of messages stolen198 199 189 #ifdef ACTOR_STATS 190 this.stolen_from = 0; 191 this.try_steal = 0; // attempts to steal 192 this.stolen = 0; // successful steals 193 this.processed = 0; // requests processed 194 this.gulps = 0; // number of gulps 195 this.failed_swaps = 0; // steal swap failures 196 this.empty_stolen = 0; // queues empty after steal 197 this.msgs_stolen = 0; // number of messages stolen 198 #endif 199 this.stamp = rdtscl(); 200 200 } 201 201 … … 205 205 // #endif 206 206 thread worker { 207 208 209 210 211 207 work_queue ** request_queues; 208 copy_queue * current_queue; 209 executor * executor_; 210 unsigned int start, range; 211 int id; 212 212 }; 213 213 … … 215 215 // aggregate counters for statistics 216 216 size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0, __total_empty_stolen = 0, 217 217 __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0; 218 218 #endif 219 219 static inline void ?{}( worker & this, cluster & clu, work_queue ** request_queues, copy_queue * current_queue, executor * executor_, 220 221 222 this.request_queues = request_queues;// array of all queues223 this.current_queue = current_queue;// currently gulped queue (start with empty queue to use in swap later)224 this.executor_ = executor_;// pointer to current executor225 this.start = start;// start of worker's subrange of request_queues226 this.range = range;// size of worker's subrange of request_queues227 this.id = id;// worker's id and index in array of workers220 unsigned int start, unsigned int range, int id ) { 221 ((thread &)this){ clu }; 222 this.request_queues = request_queues; // array of all queues 223 this.current_queue = current_queue; // currently gulped queue (start with empty queue to use in swap later) 224 this.executor_ = executor_; // pointer to current executor 225 this.start = start; // start of worker's subrange of request_queues 226 this.range = range; // size of worker's subrange of request_queues 227 this.id = id; // worker's id and index in array of workers 228 228 } 229 229 230 230 static bool no_steal = false; 231 231 struct executor { 232 cluster * cluster;// if workers execute on separate cluster233 processor ** processors; 234 work_queue * request_queues; 235 copy_queue * local_queues;// array of all worker local queues to avoid deletion race236 work_queue ** worker_req_queues; 237 worker ** workers;// array of workers executing work requests238 worker_info * w_infos;// array of info about each worker239 unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues240 bool seperate_clus; // use same or separate cluster for executor241 volatile bool is_shutdown;// flag to communicate shutdown to worker threads232 cluster * cluster; // if workers execute on separate cluster 233 processor ** processors; // array of virtual processors adding parallelism for workers 234 work_queue * request_queues; // master array of work request queues 235 copy_queue * local_queues; // array of all worker local queues to avoid deletion race 236 work_queue ** worker_req_queues; // secondary array of work queues to allow for swapping 237 worker ** workers; // array of workers executing work requests 238 worker_info * w_infos; // array of info about each worker 239 unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues 240 bool seperate_clus; // use same or separate cluster for executor 241 volatile bool is_shutdown; // flag to communicate shutdown to worker threads 242 242 }; // executor 243 243 … … 246 246 // #endif 247 247 static inline void ^?{}( worker & mutex this ) with(this) { 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 //if ( replaced_queue[start + i] > 0 ){264 //count++;265 //// printf("%d: %u, ",i, replaced_queue[i]);266 //}267 //if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0)268 //count2++;269 270 271 272 273 248 #ifdef ACTOR_STATS 249 __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST); 250 __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST); 251 __atomic_add_fetch(&__all_msgs_stolen, executor_->w_infos[id].msgs_stolen,__ATOMIC_SEQ_CST); 252 __atomic_add_fetch(&__total_tries, executor_->w_infos[id].try_steal, __ATOMIC_SEQ_CST); 253 __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST); 254 __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST); 255 __atomic_add_fetch(&__total_empty_stolen, executor_->w_infos[id].empty_stolen, __ATOMIC_SEQ_CST); 256 257 // per worker steal stats (uncomment alongside the lock above this routine to print) 258 // lock( out_lock __cfaabi_dbg_ctx2 ); 259 // printf("Worker id: %d, processed: %llu messages, attempted %lu, stole: %lu, stolen from: %lu\n", id, processed, try_steal, stolen, __atomic_add_fetch(&executor_->w_infos[id].stolen_from, 0, __ATOMIC_SEQ_CST) ); 260 // int count = 0; 261 // int count2 = 0; 262 // for ( i; range ) { 263 // if ( replaced_queue[start + i] > 0 ){ 264 // count++; 265 // // printf("%d: %u, ",i, replaced_queue[i]); 266 // } 267 // if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0) 268 // count2++; 269 // } 270 // printf("swapped with: %d of %u indices\n", count, executor_->nrqueues / executor_->nworkers ); 271 // printf("%d of %u indices were stolen\n", count2, executor_->nrqueues / executor_->nworkers ); 272 // unlock( out_lock ); 273 #endif 274 274 } 275 275 276 276 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) { 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 277 if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" ); 278 this.nprocessors = nprocessors; 279 this.nworkers = nworkers; 280 this.nrqueues = nrqueues; 281 this.seperate_clus = seperate_clus; 282 this.is_shutdown = false; 283 284 if ( nworkers == nrqueues ) 285 no_steal = true; 286 287 #ifdef ACTOR_STATS 288 // stolen_arr = aalloc( nrqueues ); 289 // replaced_queue = aalloc( nrqueues ); 290 __total_workers = nworkers; 291 #endif 292 293 if ( seperate_clus ) { 294 cluster = alloc(); 295 (*cluster){}; 296 } else cluster = active_cluster(); 297 298 request_queues = aalloc( nrqueues ); 299 worker_req_queues = aalloc( nrqueues ); 300 for ( i; nrqueues ) { 301 request_queues[i]{ buf_size, i }; 302 worker_req_queues[i] = &request_queues[i]; 303 } 304 305 processors = aalloc( nprocessors ); 306 for ( i; nprocessors ) 307 (*(processors[i] = alloc())){ *cluster }; 308 309 local_queues = aalloc( nworkers ); 310 workers = aalloc( nworkers ); 311 w_infos = aalloc( nworkers ); 312 unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers; 313 314 for ( i; nworkers ) { 315 w_infos[i]{}; 316 local_queues[i]{ buf_size }; 317 } 318 319 for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) { 320 range = reqPerWorker + ( i < extras ? 1 : 0 ); 321 (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], &this, start, range, i }; 322 } // for 323 323 } 324 324 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __DEFAULT_EXECUTOR_BUFSIZE__ }; } … … 329 329 330 330 static inline void ^?{}( executor & this ) with(this) { 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 printf("Actor System Stats:\n");359 360 361 362 363 364 365 366 367 331 is_shutdown = true; 332 333 for ( i; nworkers ) 334 delete( workers[i] ); 335 336 for ( i; nprocessors ) { 337 delete( processors[i] ); 338 } // for 339 340 #ifdef ACTOR_STATS 341 size_t misses = 0; 342 for ( i; nrqueues ) { 343 misses += worker_req_queues[i]->missed; 344 } 345 // adelete( stolen_arr ); 346 // adelete( replaced_queue ); 347 #endif 348 349 adelete( workers ); 350 adelete( w_infos ); 351 adelete( local_queues ); 352 adelete( request_queues ); 353 adelete( worker_req_queues ); 354 adelete( processors ); 355 if ( seperate_clus ) delete( cluster ); 356 357 #ifdef ACTOR_STATS // print formatted stats 358 printf(" Actor System Stats:\n"); 359 printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed); 360 size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps; 361 printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses); 362 printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\t Empty steals:\t\t%lu\n", 363 __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps, __total_empty_stolen); 364 size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen; 365 printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal); 366 #endif 367 368 368 } 369 369 … … 372 372 373 373 static inline size_t __get_next_ticket( executor & this ) with(this) { 374 375 376 377 378 379 380 381 382 374 #ifdef __CFA_DEBUG__ 375 size_t temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 376 377 // reserve MAX for dead actors 378 if ( unlikely( temp == MAX ) ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 379 return temp; 380 #else 381 return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_RELAXED) % nrqueues; 382 #endif 383 383 } // tickets 384 384 385 385 // TODO: update globals in this file to be static fields once the static fields project is done 386 386 static executor * __actor_executor_ = 0p; 387 static bool __actor_executor_passed = false; 388 static size_t __num_actors_ = 0; 387 static bool __actor_executor_passed = false; // was an executor passed to start_actor_system 388 static size_t __num_actors_ = 0; // number of actor objects in system 389 389 static struct thread$ * __actor_executor_thd = 0p; // used to wake executor after actors finish 390 390 struct actor { 391 size_t ticket;// executor-queue handle392 allocation alloc;// allocation action393 391 size_t ticket; // executor-queue handle 392 allocation alloc; // allocation action 393 inline virtual_dtor; 394 394 }; 395 395 396 396 static inline void ?{}( actor & this ) with(this) { 397 398 399 400 401 402 403 404 405 397 // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive 398 // member must be called to end it 399 DEBUG_ABORT( __actor_executor_ == 0p, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" ); 400 alloc = Nodelete; 401 ticket = __get_next_ticket( *__actor_executor_ ); 402 __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED ); 403 #ifdef ACTOR_STATS 404 __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST ); 405 #endif 406 406 } 407 407 408 408 static inline void check_actor( actor & this ) { 409 410 411 412 413 CFA_DEBUG( this.ticket = MAX; );// mark as terminated414 415 416 417 CFA_DEBUG( this.ticket = MAX; );// mark as terminated418 419 default: ;// stop warning420 421 422 423 424 425 409 if ( this.alloc != Nodelete ) { 410 switch( this.alloc ) { 411 case Delete: delete( &this ); break; 412 case Destroy: 413 CFA_DEBUG( this.ticket = MAX; ); // mark as terminated 414 ^?{}(this); 415 break; 416 case Finished: 417 CFA_DEBUG( this.ticket = MAX; ); // mark as terminated 418 break; 419 default: ; // stop warning 420 } 421 422 if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_RELAXED ) == 0 ) ) { // all actors have terminated 423 unpark( __actor_executor_thd ); 424 } 425 } 426 426 } 427 427 428 428 struct message { 429 allocation alloc;// allocation action430 429 allocation alloc; // allocation action 430 inline virtual_dtor; 431 431 }; 432 432 433 433 static inline void ?{}( message & this ) { 434 434 this.alloc = Nodelete; 435 435 } 436 436 static inline void ?{}( message & this, allocation alloc ) { 437 memcpy( &this.alloc, &alloc, sizeof(allocation) );// optimization to elide ctor438 CFA_DEBUG( if( this.alloc == Finished ) this.alloc = Nodelete; ) 437 memcpy( &this.alloc, &alloc, sizeof(allocation) ); // optimization to elide ctor 438 CFA_DEBUG( if ( this.alloc == Finished ) this.alloc = Nodelete; ); 439 439 } 440 440 static inline void ^?{}( message & this ) with(this) { 441 441 CFA_DEBUG( 442 442 if ( alloc == Nodelete ) { 443 443 printf( "CFA warning (UNIX pid:%ld) : program terminating with message %p allocated but never sent.\n", … … 448 448 449 449 static inline void check_message( message & this ) { 450 switch ( this.alloc ) { // analyze message status 451 case Nodelete: CFA_DEBUG( this.alloc = Finished ); break; 452 case Delete: delete( &this ); break; 453 case Destroy: ^?{}( this ); break; 454 case Finished: break; 455 } // switch 456 } 457 static inline void set_allocation( message & this, allocation state ) { 458 CFA_DEBUG( if ( state == Nodelete ) state = Finished; ) 459 this.alloc = state; 450 switch ( this.alloc ) { // analyze message status 451 case Nodelete: CFA_DEBUG( this.alloc = Finished ); break; 452 case Delete: delete( &this ); break; 453 case Destroy: ^?{}( this ); break; 454 case Finished: break; 455 } // switch 456 } 457 static inline allocation set_allocation( message & this, allocation state ) { 458 CFA_DEBUG( if ( state == Nodelete ) state = Finished; ); 459 allocation prev = this.alloc; 460 this.alloc = state; 461 return prev; 462 } 463 static inline allocation get_allocation( message & this ) { 464 return this.alloc; 460 465 } 461 466 462 467 static inline void deliver_request( request & this ) { 463 464 465 466 467 468 469 468 DEBUG_ABORT( this.receiver->ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" ); 469 actor * base_actor; 470 message * base_msg; 471 allocation temp = this.fn( *this.receiver, *this.msg, &base_actor, &base_msg ); 472 memcpy( &base_actor->alloc, &temp, sizeof(allocation) ); // optimization to elide ctor 473 check_message( *base_msg ); 474 check_actor( *base_actor ); 470 475 } 471 476 … … 473 478 // returns ptr to newly owned queue if swap succeeds 474 479 static inline work_queue * try_swap_queues( worker & this, unsigned int victim_idx, unsigned int my_idx ) with(this) { 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 480 work_queue * my_queue = request_queues[my_idx]; 481 work_queue * other_queue = request_queues[victim_idx]; 482 483 // if either queue is 0p then they are in the process of being stolen 484 if ( other_queue == 0p ) return 0p; 485 486 // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false 487 if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) 488 return 0p; 489 490 // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false 491 if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) { 492 /* paranoid */ verify( request_queues[my_idx] == 0p ); 493 request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val 494 return 0p; 495 } 496 497 // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically 498 request_queues[my_idx] = other_queue; // last write does not need to be atomic 499 return other_queue; 495 500 } 496 501 497 502 // once a worker to steal from has been chosen, choose queue to steal from 498 503 static inline void choose_queue( worker & this, unsigned int victim_id, unsigned int swap_idx ) with(this) { 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 504 // have to calculate victim start and range since victim may be deleted before us in shutdown 505 const unsigned int queues_per_worker = executor_->nrqueues / executor_->nworkers; 506 const unsigned int extras = executor_->nrqueues % executor_->nworkers; 507 unsigned int vic_start, vic_range; 508 if ( extras > victim_id ) { 509 vic_range = queues_per_worker + 1; 510 vic_start = vic_range * victim_id; 511 } else { 512 vic_start = extras + victim_id * queues_per_worker; 513 vic_range = queues_per_worker; 514 } 515 unsigned int start_idx = prng( vic_range ); 516 517 unsigned int tries = 0; 518 work_queue * curr_steal_queue; 519 520 for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) { 521 tries++; 522 curr_steal_queue = request_queues[ i + vic_start ]; 523 // avoid empty queues and queues that are being operated on 524 if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is_empty( *curr_steal_queue->c_queue ) ) 525 continue; 526 527 #ifdef ACTOR_STATS 528 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx ); 529 if ( curr_steal_queue ) { 530 executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count; 531 executor_->w_infos[id].stolen++; 532 if ( is_empty( *curr_steal_queue->c_queue ) ) executor_->w_infos[id].empty_stolen++; 533 // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED); 534 // replaced_queue[swap_idx]++; 535 // __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED); 536 } else { 537 executor_->w_infos[id].failed_swaps++; 538 } 539 #else 540 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx ); 541 #endif // ACTOR_STATS 542 543 return; 544 } 545 546 return; 542 547 } 543 548 544 549 // choose a worker to steal from 545 550 static inline void steal_work( worker & this, unsigned int swap_idx ) with(this) { 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 551 #if RAND 552 unsigned int victim = prng( executor_->nworkers ); 553 if ( victim == id ) victim = ( victim + 1 ) % executor_->nworkers; 554 choose_queue( this, victim, swap_idx ); 555 #elif SEARCH 556 unsigned long long min = MAX; // smaller timestamp means longer since service 557 int min_id = 0; // use ints not uints to avoid integer underflow without hacky math 558 int n_workers = executor_->nworkers; 559 unsigned long long curr_stamp; 560 int scount = 1; 561 for ( int i = (id + 1) % n_workers; scount < n_workers; i = (i + 1) % n_workers, scount++ ) { 562 curr_stamp = executor_->w_infos[i].stamp; 563 if ( curr_stamp < min ) { 564 min = curr_stamp; 565 min_id = i; 566 } 567 } 568 choose_queue( this, min_id, swap_idx ); 569 #endif 565 570 } 566 571 567 572 #define CHECK_TERMINATION if ( unlikely( executor_->is_shutdown ) ) break Exit 568 573 void main( worker & this ) with(this) { 569 570 571 //replaced_queue[i] = 0;572 //__atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST );573 574 575 576 577 578 579 580 581 582 583 584 585 for ( unsigned int i = 0;; i = (i + 1) % range ) {// cycle through set of request buffers586 574 // #ifdef ACTOR_STATS 575 // for ( i; executor_->nrqueues ) { 576 // replaced_queue[i] = 0; 577 // __atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST ); 578 // } 579 // #endif 580 581 // threshold of empty queues we see before we go stealing 582 const unsigned int steal_threshold = 2 * range; 583 584 // Store variable data here instead of worker struct to avoid any potential false sharing 585 unsigned int empty_count = 0; 586 request & req; 587 work_queue * curr_work_queue; 588 589 Exit: 590 for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers 591 curr_work_queue = request_queues[i + start]; 587 592 588 593 #ifndef __STEAL 589 594 CHECK_TERMINATION; 590 595 #endif 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 curr_work_queue->being_processed = false;// set done processing634 635 636 637 638 639 640 641 596 597 // check if queue is empty before trying to gulp it 598 if ( is_empty( *curr_work_queue->c_queue ) ) { 599 #ifdef __STEAL 600 empty_count++; 601 if ( empty_count < steal_threshold ) continue; 602 #else 603 continue; 604 #endif 605 } 606 transfer( *curr_work_queue, ¤t_queue ); 607 #ifdef ACTOR_STATS 608 executor_->w_infos[id].gulps++; 609 #endif // ACTOR_STATS 610 #ifdef __STEAL 611 if ( is_empty( *current_queue ) ) { 612 if ( unlikely( no_steal ) ) { CHECK_TERMINATION; continue; } 613 empty_count++; 614 if ( empty_count < steal_threshold ) continue; 615 empty_count = 0; 616 617 CHECK_TERMINATION; // check for termination 618 619 __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED ); 620 621 #ifdef ACTOR_STATS 622 executor_->w_infos[id].try_steal++; 623 #endif // ACTOR_STATS 624 625 steal_work( this, start + prng( range ) ); 626 continue; 627 } 628 #endif // __STEAL 629 while ( ! is_empty( *current_queue ) ) { 630 #ifdef ACTOR_STATS 631 executor_->w_infos[id].processed++; 632 #endif 633 &req = &remove( *current_queue ); 634 if ( !&req ) continue; 635 deliver_request( req ); 636 } 637 #ifdef __STEAL 638 curr_work_queue->being_processed = false; // set done processing 639 empty_count = 0; // we found work so reset empty counter 640 #endif 641 642 CHECK_TERMINATION; 643 644 // potentially reclaim some of the current queue's vector space if it is unused 645 reclaim( *current_queue ); 646 } // for 642 647 } 643 648 644 649 static inline void send( executor & this, request & req, unsigned long int ticket ) with(this) { 645 650 insert( request_queues[ticket], req); 646 651 } 647 652 648 653 static inline void send( actor & this, request & req ) { 649 650 654 DEBUG_ABORT( this.ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" ); 655 send( *__actor_executor_, req, this.ticket ); 651 656 } 652 657 653 658 static inline void __reset_stats() { 654 655 656 657 658 659 660 661 662 663 659 #ifdef ACTOR_STATS 660 __total_tries = 0; 661 __total_stolen = 0; 662 __all_gulps = 0; 663 __total_failed_swaps = 0; 664 __total_empty_stolen = 0; 665 __all_processed = 0; 666 __num_actors_stats = 0; 667 __all_msgs_stolen = 0; 668 #endif 664 669 } 665 670 666 671 static inline void start_actor_system( size_t num_thds ) { 667 668 669 670 672 __reset_stats(); 673 __actor_executor_thd = active_thread(); 674 __actor_executor_ = alloc(); 675 (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 }; 671 676 } 672 677 … … 674 679 675 680 static inline void start_actor_system( executor & this ) { 676 677 678 679 681 __reset_stats(); 682 __actor_executor_thd = active_thread(); 683 __actor_executor_ = &this; 684 __actor_executor_passed = true; 680 685 } 681 686 682 687 static inline void stop_actor_system() { 683 park( );// unparked when actor system is finished684 685 686 687 688 689 688 park(); // unparked when actor system is finished 689 690 if ( !__actor_executor_passed ) delete( __actor_executor_ ); 691 __actor_executor_ = 0p; 692 __actor_executor_thd = 0p; 693 __next_ticket = 0; 694 __actor_executor_passed = false; 690 695 } 691 696 … … 693 698 // assigned at creation to __base_msg_finished to avoid unused message warning 694 699 message __base_msg_finished @= { .alloc : Finished }; 695 struct delete_m essage_t { inline message; } delete_msg = __base_msg_finished;700 struct delete_msg_t { inline message; } delete_msg = __base_msg_finished; 696 701 struct destroy_msg_t { inline message; } destroy_msg = __base_msg_finished; 697 702 struct finished_msg_t { inline message; } finished_msg = __base_msg_finished; 698 703 699 allocation receive( actor & this, delete_m essage_t & msg ) { return Delete; }704 allocation receive( actor & this, delete_msg_t & msg ) { return Delete; } 700 705 allocation receive( actor & this, destroy_msg_t & msg ) { return Destroy; } 701 706 allocation receive( actor & this, finished_msg_t & msg ) { return Finished; } 702 -
TabularUnified src/Parser/parser.yy ¶
r508671e r2dfdae3 10 10 // Created On : Sat Sep 1 20:22:55 2001 11 11 // Last Modified By : Peter A. Buhr 12 // Last Modified On : Sat Jun 17 18:53:24202313 // Update Count : 634 712 // Last Modified On : Tue Jun 20 22:10:31 2023 13 // Update Count : 6348 14 14 // 15 15 … … 689 689 // | RESUME '(' comma_expression ')' compound_statement 690 690 // { SemanticError( yylloc, "Resume expression is currently unimplemented." ); $$ = nullptr; } 691 | IDENTIFIER IDENTIFIER // invalid syntax rule s691 | IDENTIFIER IDENTIFIER // invalid syntax rule 692 692 { IdentifierBeforeIdentifier( *$1.str, *$2.str, "n expression" ); $$ = nullptr; } 693 | IDENTIFIER type_qualifier // invalid syntax rule s693 | IDENTIFIER type_qualifier // invalid syntax rule 694 694 { IdentifierBeforeType( *$1.str, "type qualifier" ); $$ = nullptr; } 695 | IDENTIFIER storage_class // invalid syntax rule s695 | IDENTIFIER storage_class // invalid syntax rule 696 696 { IdentifierBeforeType( *$1.str, "storage class" ); $$ = nullptr; } 697 | IDENTIFIER basic_type_name // invalid syntax rule s697 | IDENTIFIER basic_type_name // invalid syntax rule 698 698 { IdentifierBeforeType( *$1.str, "type" ); $$ = nullptr; } 699 | IDENTIFIER TYPEDEFname // invalid syntax rule s699 | IDENTIFIER TYPEDEFname // invalid syntax rule 700 700 { IdentifierBeforeType( *$1.str, "type" ); $$ = nullptr; } 701 | IDENTIFIER TYPEGENname // invalid syntax rule s701 | IDENTIFIER TYPEGENname // invalid syntax rule 702 702 { IdentifierBeforeType( *$1.str, "type" ); $$ = nullptr; } 703 703 ; … … 1275 1275 | DEFAULT ':' { $$ = new ClauseNode( build_default( yylloc ) ); } 1276 1276 // A semantic check is required to ensure only one default clause per switch/choose statement. 1277 | DEFAULT error // invalid syntax rule s1277 | DEFAULT error // invalid syntax rule 1278 1278 { SemanticError( yylloc, "syntax error, colon missing after default." ); $$ = nullptr; } 1279 1279 ; … … 1405 1405 else { SemanticError( yylloc, MISSING_HIGH ); $$ = nullptr; } 1406 1406 } 1407 | comma_expression updowneq comma_expression '~' '@' // CFA, invalid syntax rule s1407 | comma_expression updowneq comma_expression '~' '@' // CFA, invalid syntax rule 1408 1408 { SemanticError( yylloc, MISSING_ANON_FIELD ); $$ = nullptr; } 1409 | '@' updowneq '@' // CFA, invalid syntax rule s1409 | '@' updowneq '@' // CFA, invalid syntax rule 1410 1410 { SemanticError( yylloc, MISSING_ANON_FIELD ); $$ = nullptr; } 1411 | '@' updowneq comma_expression '~' '@' // CFA, invalid syntax rule s1411 | '@' updowneq comma_expression '~' '@' // CFA, invalid syntax rule 1412 1412 { SemanticError( yylloc, MISSING_ANON_FIELD ); $$ = nullptr; } 1413 | comma_expression updowneq '@' '~' '@' // CFA, invalid syntax rule s1413 | comma_expression updowneq '@' '~' '@' // CFA, invalid syntax rule 1414 1414 { SemanticError( yylloc, MISSING_ANON_FIELD ); $$ = nullptr; } 1415 | '@' updowneq '@' '~' '@' // CFA, invalid syntax rule s1415 | '@' updowneq '@' '~' '@' // CFA, invalid syntax rule 1416 1416 { SemanticError( yylloc, MISSING_ANON_FIELD ); $$ = nullptr; } 1417 1417 … … 1434 1434 else $$ = forCtrl( yylloc, $3, $1, $3->clone(), $4, nullptr, NEW_ONE ); 1435 1435 } 1436 | comma_expression ';' '@' updowneq '@' // CFA, invalid syntax rule s1436 | comma_expression ';' '@' updowneq '@' // CFA, invalid syntax rule 1437 1437 { SemanticError( yylloc, "syntax error, missing low/high value for up/down-to range so index is uninitialized." ); $$ = nullptr; } 1438 1438 1439 1439 | comma_expression ';' comma_expression updowneq comma_expression '~' comma_expression // CFA 1440 1440 { $$ = forCtrl( yylloc, $3, $1, UPDOWN( $4, $3->clone(), $5 ), $4, UPDOWN( $4, $5->clone(), $3->clone() ), $7 ); } 1441 | comma_expression ';' '@' updowneq comma_expression '~' comma_expression // CFA, invalid syntax rule s1441 | comma_expression ';' '@' updowneq comma_expression '~' comma_expression // CFA, invalid syntax rule 1442 1442 { 1443 1443 if ( $4 == OperKinds::LThan || $4 == OperKinds::LEThan ) { SemanticError( yylloc, MISSING_LOW ); $$ = nullptr; } … … 1452 1452 | comma_expression ';' comma_expression updowneq comma_expression '~' '@' // CFA 1453 1453 { $$ = forCtrl( yylloc, $3, $1, UPDOWN( $4, $3->clone(), $5 ), $4, UPDOWN( $4, $5->clone(), $3->clone() ), nullptr ); } 1454 | comma_expression ';' '@' updowneq comma_expression '~' '@' // CFA, invalid syntax rule s1454 | comma_expression ';' '@' updowneq comma_expression '~' '@' // CFA, invalid syntax rule 1455 1455 { 1456 1456 if ( $4 == OperKinds::LThan || $4 == OperKinds::LEThan ) { SemanticError( yylloc, MISSING_LOW ); $$ = nullptr; } … … 1511 1511 else $$ = forCtrl( yylloc, $1, $2, $3, nullptr, nullptr ); 1512 1512 } 1513 | declaration '@' updowneq '@' '~' '@' // CFA, invalid syntax rule s1513 | declaration '@' updowneq '@' '~' '@' // CFA, invalid syntax rule 1514 1514 { SemanticError( yylloc, "syntax error, missing low/high value for up/down-to range so index is uninitialized." ); $$ = nullptr; } 1515 1515 … … 1666 1666 { $$ = build_waitfor_timeout( yylloc, $1, $3, $4, maybe_build_compound( yylloc, $5 ) ); } 1667 1667 // "else" must be conditional after timeout or timeout is never triggered (i.e., it is meaningless) 1668 | wor_waitfor_clause wor when_clause_opt timeout statement wor ELSE statement // invalid syntax rule s1668 | wor_waitfor_clause wor when_clause_opt timeout statement wor ELSE statement // invalid syntax rule 1669 1669 { SemanticError( yylloc, "syntax error, else clause must be conditional after timeout or timeout never triggered." ); $$ = nullptr; } 1670 1670 | wor_waitfor_clause wor when_clause_opt timeout statement wor when_clause ELSE statement … … 1711 1711 { $$ = new ast::WaitUntilStmt::ClauseNode( ast::WaitUntilStmt::ClauseNode::Op::LEFT_OR, $1, build_waituntil_timeout( yylloc, $3, $4, maybe_build_compound( yylloc, $5 ) ) ); } 1712 1712 // "else" must be conditional after timeout or timeout is never triggered (i.e., it is meaningless) 1713 | wor_waituntil_clause wor when_clause_opt timeout statement wor ELSE statement // invalid syntax rule s1713 | wor_waituntil_clause wor when_clause_opt timeout statement wor ELSE statement // invalid syntax rule 1714 1714 { SemanticError( yylloc, "syntax error, else clause must be conditional after timeout or timeout never triggered." ); $$ = nullptr; } 1715 1715 | wor_waituntil_clause wor when_clause_opt timeout statement wor when_clause ELSE statement … … 3175 3175 | IDENTIFIER IDENTIFIER 3176 3176 { IdentifierBeforeIdentifier( *$1.str, *$2.str, " declaration" ); $$ = nullptr; } 3177 | IDENTIFIER type_qualifier // invalid syntax rule s3177 | IDENTIFIER type_qualifier // invalid syntax rule 3178 3178 { IdentifierBeforeType( *$1.str, "type qualifier" ); $$ = nullptr; } 3179 | IDENTIFIER storage_class // invalid syntax rule s3179 | IDENTIFIER storage_class // invalid syntax rule 3180 3180 { IdentifierBeforeType( *$1.str, "storage class" ); $$ = nullptr; } 3181 | IDENTIFIER basic_type_name // invalid syntax rule s3181 | IDENTIFIER basic_type_name // invalid syntax rule 3182 3182 { IdentifierBeforeType( *$1.str, "type" ); $$ = nullptr; } 3183 | IDENTIFIER TYPEDEFname // invalid syntax rule s3183 | IDENTIFIER TYPEDEFname // invalid syntax rule 3184 3184 { IdentifierBeforeType( *$1.str, "type" ); $$ = nullptr; } 3185 | IDENTIFIER TYPEGENname // invalid syntax rule s3185 | IDENTIFIER TYPEGENname // invalid syntax rule 3186 3186 { IdentifierBeforeType( *$1.str, "type" ); $$ = nullptr; } 3187 3187 | external_function_definition
Note: See TracChangeset
for help on using the changeset viewer.