Changes in / [2dfdae3:508671e]
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/actor.hfa
r2dfdae3 r508671e 48 48 typedef allocation (*__receive_fn)(actor &, message &, actor **, message **); 49 49 struct request { 50 51 52 50 actor * receiver; 51 message * msg; 52 __receive_fn fn; 53 53 }; 54 54 55 55 struct a_msg { 56 56 int m; 57 57 }; 58 58 static inline void ?{}( request & this ) {} 59 59 static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) { 60 61 62 60 this.receiver = receiver; 61 this.msg = msg; 62 this.fn = fn; 63 63 } 64 64 static inline void ?{}( request & this, request & copy ) { 65 66 67 65 this.receiver = copy.receiver; 66 this.msg = copy.msg; 67 this.fn = copy.fn; 68 68 } 69 69 … … 71 71 // assumes gulping behaviour (once a remove occurs, removes happen until empty beforw next insert) 72 72 struct copy_queue { 73 74 73 request * buffer; 74 size_t count, buffer_size, index, utilized, last_size; 75 75 }; 76 76 static inline void ?{}( copy_queue & this ) {} 77 77 static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) { 78 79 80 81 82 83 78 buffer_size = buf_size; 79 buffer = aalloc( buffer_size ); 80 count = 0; 81 utilized = 0; 82 index = 0; 83 last_size = 0; 84 84 } 85 85 static inline void ^?{}( copy_queue & this ) with(this) { 86 87 86 DEBUG_ABORT( count != 0, "Actor system terminated with messages sent but not received\n" ); 87 adelete(buffer); 88 88 } 89 89 90 90 static inline void insert( copy_queue & this, request & elem ) with(this) { 91 92 93 94 95 96 97 98 91 if ( count >= buffer_size ) { // increase arr size 92 last_size = buffer_size; 93 buffer_size = 2 * buffer_size; 94 buffer = realloc( buffer, sizeof( request ) * buffer_size ); 95 /* paranoid */ verify( buffer ); 96 } 97 memcpy( &buffer[count], &elem, sizeof(request) ); 98 count++; 99 99 } 100 100 … … 102 102 // it is not supported to call insert() before the array is fully empty 103 103 static inline request & remove( copy_queue & this ) with(this) { 104 105 106 107 108 109 110 111 104 if ( count > 0 ) { 105 count--; 106 size_t old_idx = index; 107 index = count == 0 ? 0 : index + 1; 108 return buffer[old_idx]; 109 } 110 request * ret = 0p; 111 return *0p; 112 112 } 113 113 114 114 // try to reclaim some memory if less than half of buffer is utilized 115 115 static inline void reclaim( copy_queue & this ) with(this) { 116 117 118 119 116 if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; } 117 utilized = 0; 118 buffer_size--; 119 buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory 120 120 } 121 121 … … 123 123 124 124 struct work_queue { 125 126 copy_queue * owned_queue;// copy queue allocated and cleaned up by this work_queue127 copy_queue * c_queue;// current queue128 volatile bool being_processed;// flag to prevent concurrent processing129 130 131 size_t missed;// transfers skipped due to being_processed flag being up132 125 __spinlock_t mutex_lock; 126 copy_queue * owned_queue; // copy queue allocated and cleaned up by this work_queue 127 copy_queue * c_queue; // current queue 128 volatile bool being_processed; // flag to prevent concurrent processing 129 #ifdef ACTOR_STATS 130 unsigned int id; 131 size_t missed; // transfers skipped due to being_processed flag being up 132 #endif 133 133 }; // work_queue 134 134 static inline void ?{}( work_queue & this, size_t buf_size, unsigned int i ) with(this) { 135 owned_queue = alloc();// allocated separately to avoid false sharing136 137 138 139 140 141 142 135 owned_queue = alloc(); // allocated separately to avoid false sharing 136 (*owned_queue){ buf_size }; 137 c_queue = owned_queue; 138 being_processed = false; 139 #ifdef ACTOR_STATS 140 id = i; 141 missed = 0; 142 #endif 143 143 } 144 144 … … 147 147 148 148 static inline void insert( work_queue & this, request & elem ) with(this) { 149 150 151 149 lock( mutex_lock __cfaabi_dbg_ctx2 ); 150 insert( *c_queue, elem ); 151 unlock( mutex_lock ); 152 152 } // insert 153 153 154 154 static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) { 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 155 lock( mutex_lock __cfaabi_dbg_ctx2 ); 156 #ifdef __STEAL 157 158 // check if queue is being processed elsewhere 159 if ( unlikely( being_processed ) ) { 160 #ifdef ACTOR_STATS 161 missed++; 162 #endif 163 unlock( mutex_lock ); 164 return; 165 } 166 167 being_processed = c_queue->count != 0; 168 #endif // __STEAL 169 170 c_queue->utilized = c_queue->count; 171 172 // swap copy queue ptrs 173 copy_queue * temp = *transfer_to; 174 *transfer_to = c_queue; 175 c_queue = temp; 176 unlock( mutex_lock ); 177 177 } // transfer 178 178 179 179 // needed since some info needs to persist past worker lifetimes 180 180 struct worker_info { 181 182 183 184 185 186 181 volatile unsigned long long stamp; 182 #ifdef ACTOR_STATS 183 size_t stolen_from, try_steal, stolen, empty_stolen, failed_swaps, msgs_stolen; 184 unsigned long long processed; 185 size_t gulps; 186 #endif 187 187 }; 188 188 static inline void ?{}( worker_info & this ) { 189 190 191 this.try_steal = 0;// attempts to steal192 this.stolen = 0;// successful steals193 this.processed = 0;// requests processed194 this.gulps = 0;// number of gulps195 this.failed_swaps = 0;// steal swap failures196 this.empty_stolen = 0;// queues empty after steal197 this.msgs_stolen = 0;// number of messages stolen198 199 189 #ifdef ACTOR_STATS 190 this.stolen_from = 0; 191 this.try_steal = 0; // attempts to steal 192 this.stolen = 0; // successful steals 193 this.processed = 0; // requests processed 194 this.gulps = 0; // number of gulps 195 this.failed_swaps = 0; // steal swap failures 196 this.empty_stolen = 0; // queues empty after steal 197 this.msgs_stolen = 0; // number of messages stolen 198 #endif 199 this.stamp = rdtscl(); 200 200 } 201 201 … … 205 205 // #endif 206 206 thread worker { 207 208 209 210 211 207 work_queue ** request_queues; 208 copy_queue * current_queue; 209 executor * executor_; 210 unsigned int start, range; 211 int id; 212 212 }; 213 213 … … 215 215 // aggregate counters for statistics 216 216 size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0, __total_empty_stolen = 0, 217 217 __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0; 218 218 #endif 219 219 static inline void ?{}( worker & this, cluster & clu, work_queue ** request_queues, copy_queue * current_queue, executor * executor_, 220 221 222 this.request_queues = request_queues;// array of all queues223 this.current_queue = current_queue;// currently gulped queue (start with empty queue to use in swap later)224 this.executor_ = executor_;// pointer to current executor225 this.start = start;// start of worker's subrange of request_queues226 this.range = range;// size of worker's subrange of request_queues227 this.id = id;// worker's id and index in array of workers220 unsigned int start, unsigned int range, int id ) { 221 ((thread &)this){ clu }; 222 this.request_queues = request_queues; // array of all queues 223 this.current_queue = current_queue; // currently gulped queue (start with empty queue to use in swap later) 224 this.executor_ = executor_; // pointer to current executor 225 this.start = start; // start of worker's subrange of request_queues 226 this.range = range; // size of worker's subrange of request_queues 227 this.id = id; // worker's id and index in array of workers 228 228 } 229 229 230 230 static bool no_steal = false; 231 231 struct executor { 232 cluster * cluster;// if workers execute on separate cluster233 processor ** processors; 234 work_queue * request_queues; 235 copy_queue * local_queues;// array of all worker local queues to avoid deletion race236 work_queue ** worker_req_queues; 237 worker ** workers;// array of workers executing work requests238 worker_info * w_infos;// array of info about each worker239 unsigned int nprocessors, nworkers, nrqueues; 240 bool seperate_clus; 241 volatile bool is_shutdown;// flag to communicate shutdown to worker threads232 cluster * cluster; // if workers execute on separate cluster 233 processor ** processors; // array of virtual processors adding parallelism for workers 234 work_queue * request_queues; // master array of work request queues 235 copy_queue * local_queues; // array of all worker local queues to avoid deletion race 236 work_queue ** worker_req_queues; // secondary array of work queues to allow for swapping 237 worker ** workers; // array of workers executing work requests 238 worker_info * w_infos; // array of info about each worker 239 unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues 240 bool seperate_clus; // use same or separate cluster for executor 241 volatile bool is_shutdown; // flag to communicate shutdown to worker threads 242 242 }; // executor 243 243 … … 246 246 // #endif 247 247 static inline void ^?{}( worker & mutex this ) with(this) { 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 //if ( replaced_queue[start + i] > 0 ){264 //count++;265 //// printf("%d: %u, ",i, replaced_queue[i]);266 //}267 //if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0)268 //count2++;269 270 271 272 273 248 #ifdef ACTOR_STATS 249 __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST); 250 __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST); 251 __atomic_add_fetch(&__all_msgs_stolen, executor_->w_infos[id].msgs_stolen,__ATOMIC_SEQ_CST); 252 __atomic_add_fetch(&__total_tries, executor_->w_infos[id].try_steal, __ATOMIC_SEQ_CST); 253 __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST); 254 __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST); 255 __atomic_add_fetch(&__total_empty_stolen, executor_->w_infos[id].empty_stolen, __ATOMIC_SEQ_CST); 256 257 // per worker steal stats (uncomment alongside the lock above this routine to print) 258 // lock( out_lock __cfaabi_dbg_ctx2 ); 259 // printf("Worker id: %d, processed: %llu messages, attempted %lu, stole: %lu, stolen from: %lu\n", id, processed, try_steal, stolen, __atomic_add_fetch(&executor_->w_infos[id].stolen_from, 0, __ATOMIC_SEQ_CST) ); 260 // int count = 0; 261 // int count2 = 0; 262 // for ( i; range ) { 263 // if ( replaced_queue[start + i] > 0 ){ 264 // count++; 265 // // printf("%d: %u, ",i, replaced_queue[i]); 266 // } 267 // if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0) 268 // count2++; 269 // } 270 // printf("swapped with: %d of %u indices\n", count, executor_->nrqueues / executor_->nworkers ); 271 // printf("%d of %u indices were stolen\n", count2, executor_->nrqueues / executor_->nworkers ); 272 // unlock( out_lock ); 273 #endif 274 274 } 275 275 276 276 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) { 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 277 if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" ); 278 this.nprocessors = nprocessors; 279 this.nworkers = nworkers; 280 this.nrqueues = nrqueues; 281 this.seperate_clus = seperate_clus; 282 this.is_shutdown = false; 283 284 if ( nworkers == nrqueues ) 285 no_steal = true; 286 287 #ifdef ACTOR_STATS 288 // stolen_arr = aalloc( nrqueues ); 289 // replaced_queue = aalloc( nrqueues ); 290 __total_workers = nworkers; 291 #endif 292 293 if ( seperate_clus ) { 294 cluster = alloc(); 295 (*cluster){}; 296 } else cluster = active_cluster(); 297 298 request_queues = aalloc( nrqueues ); 299 worker_req_queues = aalloc( nrqueues ); 300 for ( i; nrqueues ) { 301 request_queues[i]{ buf_size, i }; 302 worker_req_queues[i] = &request_queues[i]; 303 } 304 305 processors = aalloc( nprocessors ); 306 for ( i; nprocessors ) 307 (*(processors[i] = alloc())){ *cluster }; 308 309 local_queues = aalloc( nworkers ); 310 workers = aalloc( nworkers ); 311 w_infos = aalloc( nworkers ); 312 unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers; 313 314 for ( i; nworkers ) { 315 w_infos[i]{}; 316 local_queues[i]{ buf_size }; 317 } 318 319 for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) { 320 range = reqPerWorker + ( i < extras ? 1 : 0 ); 321 (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], &this, start, range, i }; 322 } // for 323 323 } 324 324 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __DEFAULT_EXECUTOR_BUFSIZE__ }; } … … 329 329 330 330 static inline void ^?{}( executor & this ) with(this) { 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 printf("Actor System Stats:\n");359 360 361 362 363 364 365 366 367 331 is_shutdown = true; 332 333 for ( i; nworkers ) 334 delete( workers[i] ); 335 336 for ( i; nprocessors ) { 337 delete( processors[i] ); 338 } // for 339 340 #ifdef ACTOR_STATS 341 size_t misses = 0; 342 for ( i; nrqueues ) { 343 misses += worker_req_queues[i]->missed; 344 } 345 // adelete( stolen_arr ); 346 // adelete( replaced_queue ); 347 #endif 348 349 adelete( workers ); 350 adelete( w_infos ); 351 adelete( local_queues ); 352 adelete( request_queues ); 353 adelete( worker_req_queues ); 354 adelete( processors ); 355 if ( seperate_clus ) delete( cluster ); 356 357 #ifdef ACTOR_STATS // print formatted stats 358 printf(" Actor System Stats:\n"); 359 printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed); 360 size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps; 361 printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses); 362 printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\t Empty steals:\t\t%lu\n", 363 __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps, __total_empty_stolen); 364 size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen; 365 printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal); 366 #endif 367 368 368 } 369 369 … … 372 372 373 373 static inline size_t __get_next_ticket( executor & this ) with(this) { 374 375 376 377 378 379 380 381 382 374 #ifdef __CFA_DEBUG__ 375 size_t temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 376 377 // reserve MAX for dead actors 378 if ( unlikely( temp == MAX ) ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 379 return temp; 380 #else 381 return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_RELAXED) % nrqueues; 382 #endif 383 383 } // tickets 384 384 385 385 // TODO: update globals in this file to be static fields once the static fields project is done 386 386 static executor * __actor_executor_ = 0p; 387 static bool __actor_executor_passed = false; 388 static size_t __num_actors_ = 0; 387 static bool __actor_executor_passed = false; // was an executor passed to start_actor_system 388 static size_t __num_actors_ = 0; // number of actor objects in system 389 389 static struct thread$ * __actor_executor_thd = 0p; // used to wake executor after actors finish 390 390 struct actor { 391 size_t ticket;// executor-queue handle392 allocation alloc;// allocation action393 391 size_t ticket; // executor-queue handle 392 allocation alloc; // allocation action 393 inline virtual_dtor; 394 394 }; 395 395 396 396 static inline void ?{}( actor & this ) with(this) { 397 398 399 400 401 402 403 404 405 397 // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive 398 // member must be called to end it 399 DEBUG_ABORT( __actor_executor_ == 0p, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" ); 400 alloc = Nodelete; 401 ticket = __get_next_ticket( *__actor_executor_ ); 402 __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED ); 403 #ifdef ACTOR_STATS 404 __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST ); 405 #endif 406 406 } 407 407 408 408 static inline void check_actor( actor & this ) { 409 410 411 412 413 CFA_DEBUG( this.ticket = MAX; );// mark as terminated414 415 416 417 CFA_DEBUG( this.ticket = MAX; );// mark as terminated418 419 default: ;// stop warning420 421 422 423 424 425 409 if ( this.alloc != Nodelete ) { 410 switch( this.alloc ) { 411 case Delete: delete( &this ); break; 412 case Destroy: 413 CFA_DEBUG( this.ticket = MAX; ); // mark as terminated 414 ^?{}(this); 415 break; 416 case Finished: 417 CFA_DEBUG( this.ticket = MAX; ); // mark as terminated 418 break; 419 default: ; // stop warning 420 } 421 422 if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_RELAXED ) == 0 ) ) { // all actors have terminated 423 unpark( __actor_executor_thd ); 424 } 425 } 426 426 } 427 427 428 428 struct message { 429 allocation alloc;// allocation action430 429 allocation alloc; // allocation action 430 inline virtual_dtor; 431 431 }; 432 432 433 433 static inline void ?{}( message & this ) { 434 434 this.alloc = Nodelete; 435 435 } 436 436 static inline void ?{}( message & this, allocation alloc ) { 437 memcpy( &this.alloc, &alloc, sizeof(allocation) );// optimization to elide ctor438 CFA_DEBUG( if ( this.alloc == Finished ) this.alloc = Nodelete; ); 437 memcpy( &this.alloc, &alloc, sizeof(allocation) ); // optimization to elide ctor 438 CFA_DEBUG( if( this.alloc == Finished ) this.alloc = Nodelete; ) 439 439 } 440 440 static inline void ^?{}( message & this ) with(this) { 441 441 CFA_DEBUG( 442 442 if ( alloc == Nodelete ) { 443 443 printf( "CFA warning (UNIX pid:%ld) : program terminating with message %p allocated but never sent.\n", … … 448 448 449 449 static inline void check_message( message & this ) { 450 switch ( this.alloc ) { // analyze message status 451 case Nodelete: CFA_DEBUG( this.alloc = Finished ); break; 452 case Delete: delete( &this ); break; 453 case Destroy: ^?{}( this ); break; 454 case Finished: break; 455 } // switch 456 } 457 static inline allocation set_allocation( message & this, allocation state ) { 458 CFA_DEBUG( if ( state == Nodelete ) state = Finished; ); 459 allocation prev = this.alloc; 460 this.alloc = state; 461 return prev; 462 } 463 static inline allocation get_allocation( message & this ) { 464 return this.alloc; 450 switch ( this.alloc ) { // analyze message status 451 case Nodelete: CFA_DEBUG( this.alloc = Finished ); break; 452 case Delete: delete( &this ); break; 453 case Destroy: ^?{}( this ); break; 454 case Finished: break; 455 } // switch 456 } 457 static inline void set_allocation( message & this, allocation state ) { 458 CFA_DEBUG( if ( state == Nodelete ) state = Finished; ) 459 this.alloc = state; 465 460 } 466 461 467 462 static inline void deliver_request( request & this ) { 468 469 470 471 472 473 474 463 DEBUG_ABORT( this.receiver->ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" ); 464 actor * base_actor; 465 message * base_msg; 466 allocation temp = this.fn( *this.receiver, *this.msg, &base_actor, &base_msg ); 467 memcpy( &base_actor->alloc, &temp, sizeof(allocation) ); // optimization to elide ctor 468 check_message( *base_msg ); 469 check_actor( *base_actor ); 475 470 } 476 471 … … 478 473 // returns ptr to newly owned queue if swap succeeds 479 474 static inline work_queue * try_swap_queues( worker & this, unsigned int victim_idx, unsigned int my_idx ) with(this) { 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 475 work_queue * my_queue = request_queues[my_idx]; 476 work_queue * other_queue = request_queues[victim_idx]; 477 478 // if either queue is 0p then they are in the process of being stolen 479 if ( other_queue == 0p ) return 0p; 480 481 // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false 482 if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) 483 return 0p; 484 485 // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false 486 if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) { 487 /* paranoid */ verify( request_queues[my_idx] == 0p ); 488 request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val 489 return 0p; 490 } 491 492 // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically 493 request_queues[my_idx] = other_queue; // last write does not need to be atomic 494 return other_queue; 500 495 } 501 496 502 497 // once a worker to steal from has been chosen, choose queue to steal from 503 498 static inline void choose_queue( worker & this, unsigned int victim_id, unsigned int swap_idx ) with(this) { 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 499 // have to calculate victim start and range since victim may be deleted before us in shutdown 500 const unsigned int queues_per_worker = executor_->nrqueues / executor_->nworkers; 501 const unsigned int extras = executor_->nrqueues % executor_->nworkers; 502 unsigned int vic_start, vic_range; 503 if ( extras > victim_id ) { 504 vic_range = queues_per_worker + 1; 505 vic_start = vic_range * victim_id; 506 } else { 507 vic_start = extras + victim_id * queues_per_worker; 508 vic_range = queues_per_worker; 509 } 510 unsigned int start_idx = prng( vic_range ); 511 512 unsigned int tries = 0; 513 work_queue * curr_steal_queue; 514 515 for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) { 516 tries++; 517 curr_steal_queue = request_queues[ i + vic_start ]; 518 // avoid empty queues and queues that are being operated on 519 if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is_empty( *curr_steal_queue->c_queue ) ) 520 continue; 521 522 #ifdef ACTOR_STATS 523 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx ); 524 if ( curr_steal_queue ) { 525 executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count; 526 executor_->w_infos[id].stolen++; 527 if ( is_empty( *curr_steal_queue->c_queue ) ) executor_->w_infos[id].empty_stolen++; 528 // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED); 529 // replaced_queue[swap_idx]++; 530 // __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED); 531 } else { 532 executor_->w_infos[id].failed_swaps++; 533 } 534 #else 535 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx ); 536 #endif // ACTOR_STATS 537 538 return; 539 } 540 541 return; 547 542 } 548 543 549 544 // choose a worker to steal from 550 545 static inline void steal_work( worker & this, unsigned int swap_idx ) with(this) { 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 546 #if RAND 547 unsigned int victim = prng( executor_->nworkers ); 548 if ( victim == id ) victim = ( victim + 1 ) % executor_->nworkers; 549 choose_queue( this, victim, swap_idx ); 550 #elif SEARCH 551 unsigned long long min = MAX; // smaller timestamp means longer since service 552 int min_id = 0; // use ints not uints to avoid integer underflow without hacky math 553 int n_workers = executor_->nworkers; 554 unsigned long long curr_stamp; 555 int scount = 1; 556 for ( int i = (id + 1) % n_workers; scount < n_workers; i = (i + 1) % n_workers, scount++ ) { 557 curr_stamp = executor_->w_infos[i].stamp; 558 if ( curr_stamp < min ) { 559 min = curr_stamp; 560 min_id = i; 561 } 562 } 563 choose_queue( this, min_id, swap_idx ); 564 #endif 570 565 } 571 566 572 567 #define CHECK_TERMINATION if ( unlikely( executor_->is_shutdown ) ) break Exit 573 568 void main( worker & this ) with(this) { 574 575 576 //replaced_queue[i] = 0;577 //__atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST );578 579 580 581 582 583 584 585 586 587 588 589 590 for ( unsigned int i = 0;; i = (i + 1) % range ) {// cycle through set of request buffers591 569 // #ifdef ACTOR_STATS 570 // for ( i; executor_->nrqueues ) { 571 // replaced_queue[i] = 0; 572 // __atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST ); 573 // } 574 // #endif 575 576 // threshold of empty queues we see before we go stealing 577 const unsigned int steal_threshold = 2 * range; 578 579 // Store variable data here instead of worker struct to avoid any potential false sharing 580 unsigned int empty_count = 0; 581 request & req; 582 work_queue * curr_work_queue; 583 584 Exit: 585 for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers 586 curr_work_queue = request_queues[i + start]; 592 587 593 588 #ifndef __STEAL 594 589 CHECK_TERMINATION; 595 590 #endif 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 curr_work_queue->being_processed = false;// set done processing639 640 641 642 643 644 645 646 591 592 // check if queue is empty before trying to gulp it 593 if ( is_empty( *curr_work_queue->c_queue ) ) { 594 #ifdef __STEAL 595 empty_count++; 596 if ( empty_count < steal_threshold ) continue; 597 #else 598 continue; 599 #endif 600 } 601 transfer( *curr_work_queue, ¤t_queue ); 602 #ifdef ACTOR_STATS 603 executor_->w_infos[id].gulps++; 604 #endif // ACTOR_STATS 605 #ifdef __STEAL 606 if ( is_empty( *current_queue ) ) { 607 if ( unlikely( no_steal ) ) { CHECK_TERMINATION; continue; } 608 empty_count++; 609 if ( empty_count < steal_threshold ) continue; 610 empty_count = 0; 611 612 CHECK_TERMINATION; // check for termination 613 614 __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED ); 615 616 #ifdef ACTOR_STATS 617 executor_->w_infos[id].try_steal++; 618 #endif // ACTOR_STATS 619 620 steal_work( this, start + prng( range ) ); 621 continue; 622 } 623 #endif // __STEAL 624 while ( ! is_empty( *current_queue ) ) { 625 #ifdef ACTOR_STATS 626 executor_->w_infos[id].processed++; 627 #endif 628 &req = &remove( *current_queue ); 629 if ( !&req ) continue; 630 deliver_request( req ); 631 } 632 #ifdef __STEAL 633 curr_work_queue->being_processed = false; // set done processing 634 empty_count = 0; // we found work so reset empty counter 635 #endif 636 637 CHECK_TERMINATION; 638 639 // potentially reclaim some of the current queue's vector space if it is unused 640 reclaim( *current_queue ); 641 } // for 647 642 } 648 643 649 644 static inline void send( executor & this, request & req, unsigned long int ticket ) with(this) { 650 645 insert( request_queues[ticket], req); 651 646 } 652 647 653 648 static inline void send( actor & this, request & req ) { 654 655 649 DEBUG_ABORT( this.ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" ); 650 send( *__actor_executor_, req, this.ticket ); 656 651 } 657 652 658 653 static inline void __reset_stats() { 659 660 661 662 663 664 665 666 667 668 654 #ifdef ACTOR_STATS 655 __total_tries = 0; 656 __total_stolen = 0; 657 __all_gulps = 0; 658 __total_failed_swaps = 0; 659 __total_empty_stolen = 0; 660 __all_processed = 0; 661 __num_actors_stats = 0; 662 __all_msgs_stolen = 0; 663 #endif 669 664 } 670 665 671 666 static inline void start_actor_system( size_t num_thds ) { 672 673 674 675 667 __reset_stats(); 668 __actor_executor_thd = active_thread(); 669 __actor_executor_ = alloc(); 670 (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 }; 676 671 } 677 672 … … 679 674 680 675 static inline void start_actor_system( executor & this ) { 681 682 683 684 676 __reset_stats(); 677 __actor_executor_thd = active_thread(); 678 __actor_executor_ = &this; 679 __actor_executor_passed = true; 685 680 } 686 681 687 682 static inline void stop_actor_system() { 688 park();// unparked when actor system is finished689 690 691 692 693 694 683 park( ); // unparked when actor system is finished 684 685 if ( !__actor_executor_passed ) delete( __actor_executor_ ); 686 __actor_executor_ = 0p; 687 __actor_executor_thd = 0p; 688 __next_ticket = 0; 689 __actor_executor_passed = false; 695 690 } 696 691 … … 698 693 // assigned at creation to __base_msg_finished to avoid unused message warning 699 694 message __base_msg_finished @= { .alloc : Finished }; 700 struct delete_m sg_t { inline message; } delete_msg = __base_msg_finished;695 struct delete_message_t { inline message; } delete_msg = __base_msg_finished; 701 696 struct destroy_msg_t { inline message; } destroy_msg = __base_msg_finished; 702 697 struct finished_msg_t { inline message; } finished_msg = __base_msg_finished; 703 698 704 allocation receive( actor & this, delete_m sg_t & msg ) { return Delete; }699 allocation receive( actor & this, delete_message_t & msg ) { return Delete; } 705 700 allocation receive( actor & this, destroy_msg_t & msg ) { return Destroy; } 706 701 allocation receive( actor & this, finished_msg_t & msg ) { return Finished; } 702 -
src/Parser/parser.yy
r2dfdae3 r508671e 10 10 // Created On : Sat Sep 1 20:22:55 2001 11 11 // Last Modified By : Peter A. Buhr 12 // Last Modified On : Tue Jun 20 22:10:31202313 // Update Count : 634 812 // Last Modified On : Sat Jun 17 18:53:24 2023 13 // Update Count : 6347 14 14 // 15 15 … … 689 689 // | RESUME '(' comma_expression ')' compound_statement 690 690 // { SemanticError( yylloc, "Resume expression is currently unimplemented." ); $$ = nullptr; } 691 | IDENTIFIER IDENTIFIER // invalid syntax rule 691 | IDENTIFIER IDENTIFIER // invalid syntax rules 692 692 { IdentifierBeforeIdentifier( *$1.str, *$2.str, "n expression" ); $$ = nullptr; } 693 | IDENTIFIER type_qualifier // invalid syntax rule 693 | IDENTIFIER type_qualifier // invalid syntax rules 694 694 { IdentifierBeforeType( *$1.str, "type qualifier" ); $$ = nullptr; } 695 | IDENTIFIER storage_class // invalid syntax rule 695 | IDENTIFIER storage_class // invalid syntax rules 696 696 { IdentifierBeforeType( *$1.str, "storage class" ); $$ = nullptr; } 697 | IDENTIFIER basic_type_name // invalid syntax rule 697 | IDENTIFIER basic_type_name // invalid syntax rules 698 698 { IdentifierBeforeType( *$1.str, "type" ); $$ = nullptr; } 699 | IDENTIFIER TYPEDEFname // invalid syntax rule 699 | IDENTIFIER TYPEDEFname // invalid syntax rules 700 700 { IdentifierBeforeType( *$1.str, "type" ); $$ = nullptr; } 701 | IDENTIFIER TYPEGENname // invalid syntax rule 701 | IDENTIFIER TYPEGENname // invalid syntax rules 702 702 { IdentifierBeforeType( *$1.str, "type" ); $$ = nullptr; } 703 703 ; … … 1275 1275 | DEFAULT ':' { $$ = new ClauseNode( build_default( yylloc ) ); } 1276 1276 // A semantic check is required to ensure only one default clause per switch/choose statement. 1277 | DEFAULT error // invalid syntax rule 1277 | DEFAULT error // invalid syntax rules 1278 1278 { SemanticError( yylloc, "syntax error, colon missing after default." ); $$ = nullptr; } 1279 1279 ; … … 1405 1405 else { SemanticError( yylloc, MISSING_HIGH ); $$ = nullptr; } 1406 1406 } 1407 | comma_expression updowneq comma_expression '~' '@' // CFA, invalid syntax rule 1407 | comma_expression updowneq comma_expression '~' '@' // CFA, invalid syntax rules 1408 1408 { SemanticError( yylloc, MISSING_ANON_FIELD ); $$ = nullptr; } 1409 | '@' updowneq '@' // CFA, invalid syntax rule 1409 | '@' updowneq '@' // CFA, invalid syntax rules 1410 1410 { SemanticError( yylloc, MISSING_ANON_FIELD ); $$ = nullptr; } 1411 | '@' updowneq comma_expression '~' '@' // CFA, invalid syntax rule 1411 | '@' updowneq comma_expression '~' '@' // CFA, invalid syntax rules 1412 1412 { SemanticError( yylloc, MISSING_ANON_FIELD ); $$ = nullptr; } 1413 | comma_expression updowneq '@' '~' '@' // CFA, invalid syntax rule 1413 | comma_expression updowneq '@' '~' '@' // CFA, invalid syntax rules 1414 1414 { SemanticError( yylloc, MISSING_ANON_FIELD ); $$ = nullptr; } 1415 | '@' updowneq '@' '~' '@' // CFA, invalid syntax rule 1415 | '@' updowneq '@' '~' '@' // CFA, invalid syntax rules 1416 1416 { SemanticError( yylloc, MISSING_ANON_FIELD ); $$ = nullptr; } 1417 1417 … … 1434 1434 else $$ = forCtrl( yylloc, $3, $1, $3->clone(), $4, nullptr, NEW_ONE ); 1435 1435 } 1436 | comma_expression ';' '@' updowneq '@' // CFA, invalid syntax rule 1436 | comma_expression ';' '@' updowneq '@' // CFA, invalid syntax rules 1437 1437 { SemanticError( yylloc, "syntax error, missing low/high value for up/down-to range so index is uninitialized." ); $$ = nullptr; } 1438 1438 1439 1439 | comma_expression ';' comma_expression updowneq comma_expression '~' comma_expression // CFA 1440 1440 { $$ = forCtrl( yylloc, $3, $1, UPDOWN( $4, $3->clone(), $5 ), $4, UPDOWN( $4, $5->clone(), $3->clone() ), $7 ); } 1441 | comma_expression ';' '@' updowneq comma_expression '~' comma_expression // CFA, invalid syntax rule 1441 | comma_expression ';' '@' updowneq comma_expression '~' comma_expression // CFA, invalid syntax rules 1442 1442 { 1443 1443 if ( $4 == OperKinds::LThan || $4 == OperKinds::LEThan ) { SemanticError( yylloc, MISSING_LOW ); $$ = nullptr; } … … 1452 1452 | comma_expression ';' comma_expression updowneq comma_expression '~' '@' // CFA 1453 1453 { $$ = forCtrl( yylloc, $3, $1, UPDOWN( $4, $3->clone(), $5 ), $4, UPDOWN( $4, $5->clone(), $3->clone() ), nullptr ); } 1454 | comma_expression ';' '@' updowneq comma_expression '~' '@' // CFA, invalid syntax rule 1454 | comma_expression ';' '@' updowneq comma_expression '~' '@' // CFA, invalid syntax rules 1455 1455 { 1456 1456 if ( $4 == OperKinds::LThan || $4 == OperKinds::LEThan ) { SemanticError( yylloc, MISSING_LOW ); $$ = nullptr; } … … 1511 1511 else $$ = forCtrl( yylloc, $1, $2, $3, nullptr, nullptr ); 1512 1512 } 1513 | declaration '@' updowneq '@' '~' '@' // CFA, invalid syntax rule 1513 | declaration '@' updowneq '@' '~' '@' // CFA, invalid syntax rules 1514 1514 { SemanticError( yylloc, "syntax error, missing low/high value for up/down-to range so index is uninitialized." ); $$ = nullptr; } 1515 1515 … … 1666 1666 { $$ = build_waitfor_timeout( yylloc, $1, $3, $4, maybe_build_compound( yylloc, $5 ) ); } 1667 1667 // "else" must be conditional after timeout or timeout is never triggered (i.e., it is meaningless) 1668 | wor_waitfor_clause wor when_clause_opt timeout statement wor ELSE statement // invalid syntax rule 1668 | wor_waitfor_clause wor when_clause_opt timeout statement wor ELSE statement // invalid syntax rules 1669 1669 { SemanticError( yylloc, "syntax error, else clause must be conditional after timeout or timeout never triggered." ); $$ = nullptr; } 1670 1670 | wor_waitfor_clause wor when_clause_opt timeout statement wor when_clause ELSE statement … … 1711 1711 { $$ = new ast::WaitUntilStmt::ClauseNode( ast::WaitUntilStmt::ClauseNode::Op::LEFT_OR, $1, build_waituntil_timeout( yylloc, $3, $4, maybe_build_compound( yylloc, $5 ) ) ); } 1712 1712 // "else" must be conditional after timeout or timeout is never triggered (i.e., it is meaningless) 1713 | wor_waituntil_clause wor when_clause_opt timeout statement wor ELSE statement // invalid syntax rule 1713 | wor_waituntil_clause wor when_clause_opt timeout statement wor ELSE statement // invalid syntax rules 1714 1714 { SemanticError( yylloc, "syntax error, else clause must be conditional after timeout or timeout never triggered." ); $$ = nullptr; } 1715 1715 | wor_waituntil_clause wor when_clause_opt timeout statement wor when_clause ELSE statement … … 3175 3175 | IDENTIFIER IDENTIFIER 3176 3176 { IdentifierBeforeIdentifier( *$1.str, *$2.str, " declaration" ); $$ = nullptr; } 3177 | IDENTIFIER type_qualifier // invalid syntax rule 3177 | IDENTIFIER type_qualifier // invalid syntax rules 3178 3178 { IdentifierBeforeType( *$1.str, "type qualifier" ); $$ = nullptr; } 3179 | IDENTIFIER storage_class // invalid syntax rule 3179 | IDENTIFIER storage_class // invalid syntax rules 3180 3180 { IdentifierBeforeType( *$1.str, "storage class" ); $$ = nullptr; } 3181 | IDENTIFIER basic_type_name // invalid syntax rule 3181 | IDENTIFIER basic_type_name // invalid syntax rules 3182 3182 { IdentifierBeforeType( *$1.str, "type" ); $$ = nullptr; } 3183 | IDENTIFIER TYPEDEFname // invalid syntax rule 3183 | IDENTIFIER TYPEDEFname // invalid syntax rules 3184 3184 { IdentifierBeforeType( *$1.str, "type" ); $$ = nullptr; } 3185 | IDENTIFIER TYPEGENname // invalid syntax rule 3185 | IDENTIFIER TYPEGENname // invalid syntax rules 3186 3186 { IdentifierBeforeType( *$1.str, "type" ); $$ = nullptr; } 3187 3187 | external_function_definition
Note: See TracChangeset
for help on using the changeset viewer.