- File:
-
- 1 edited
-
libcfa/src/concurrency/ready_queue.cfa (modified) (30 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/ready_queue.cfa
r75c7252 r1f45c7d 67 67 #endif 68 68 69 static inline struct thread$* try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats));70 static inline struct thread$* try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats));71 static inline struct thread$* search(struct cluster * cltr);69 static inline struct $thread * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)); 70 static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)); 71 static inline struct $thread * search(struct cluster * cltr); 72 72 static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred); 73 73 … … 100 100 #define __kernel_rseq_unregister rseq_unregister_current_thread 101 101 #elif defined(CFA_HAVE_LINUX_RSEQ_H) 102 staticvoid __kernel_raw_rseq_register (void);103 staticvoid __kernel_raw_rseq_unregister(void);102 void __kernel_raw_rseq_register (void); 103 void __kernel_raw_rseq_unregister(void); 104 104 105 105 #define __kernel_rseq_register __kernel_raw_rseq_register … … 246 246 // Cforall Ready Queue used for scheduling 247 247 //======================================================================= 248 unsigned long long moving_average(unsigned long long nval, unsigned long long oval) {249 const unsigned long long tw = 16;250 const unsigned long long nw = 4;251 const unsigned long long ow = tw - nw;252 return ((nw * nval) + (ow * oval)) / tw;253 }254 255 248 void ?{}(__ready_queue_t & this) with (this) { 256 249 #if defined(USE_CPU_WORK_STEALING) … … 258 251 lanes.data = alloc( lanes.count ); 259 252 lanes.tscs = alloc( lanes.count ); 260 lanes.help = alloc( cpu_info.hthrd_count );261 253 262 254 for( idx; (size_t)lanes.count ) { 263 255 (lanes.data[idx]){}; 264 256 lanes.tscs[idx].tv = rdtscl(); 265 lanes.tscs[idx].ma = rdtscl();266 }267 for( idx; (size_t)cpu_info.hthrd_count ) {268 lanes.help[idx].src = 0;269 lanes.help[idx].dst = 0;270 lanes.help[idx].tri = 0;271 257 } 272 258 #else 273 259 lanes.data = 0p; 274 260 lanes.tscs = 0p; 275 lanes.help = 0p;276 261 lanes.count = 0; 277 262 #endif … … 285 270 free(lanes.data); 286 271 free(lanes.tscs); 287 free(lanes.help);288 272 } 289 273 290 274 //----------------------------------------------------------------------- 291 275 #if defined(USE_CPU_WORK_STEALING) 292 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) {276 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) { 293 277 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr); 294 278 295 279 processor * const proc = kernelTLS().this_processor; 296 const bool external = (!proc) || (cltr != proc->cltr); 297 298 // Figure out the current cpu and make sure it is valid 280 const bool external = !push_local || (!proc) || (cltr != proc->cltr); 281 299 282 const int cpu = __kernel_getcpu(); 300 283 /* paranoid */ verify(cpu >= 0); … … 302 285 /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count); 303 286 304 // Figure out where thread was last time and make sure it's 305 /* paranoid */ verify(thrd->preferred >= 0); 306 /* paranoid */ verify(thrd->preferred < cpu_info.hthrd_count); 307 /* paranoid */ verify(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count); 308 const int prf = thrd->preferred * READYQ_SHARD_FACTOR; 309 310 const cpu_map_entry_t & map; 311 choose(hint) { 312 case UNPARK_LOCAL : &map = &cpu_info.llc_map[cpu]; 313 case UNPARK_REMOTE: &map = &cpu_info.llc_map[prf]; 314 } 287 const cpu_map_entry_t & map = cpu_info.llc_map[cpu]; 315 288 /* paranoid */ verify(map.start * READYQ_SHARD_FACTOR < lanes.count); 316 289 /* paranoid */ verify(map.self * READYQ_SHARD_FACTOR < lanes.count); … … 323 296 if(unlikely(external)) { r = __tls_rand(); } 324 297 else { r = proc->rdq.its++; } 325 choose(hint) { 326 case UNPARK_LOCAL : i = start + (r % READYQ_SHARD_FACTOR); 327 case UNPARK_REMOTE: i = prf + (r % READYQ_SHARD_FACTOR); 328 } 298 i = start + (r % READYQ_SHARD_FACTOR); 329 299 // If we can't lock it retry 330 300 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); … … 346 316 347 317 // Pop from the ready queue from a given cluster 348 __attribute__((hot)) thread$* pop_fast(struct cluster * cltr) with (cltr->ready_queue) {318 __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) { 349 319 /* paranoid */ verify( lanes.count > 0 ); 350 320 /* paranoid */ verify( kernelTLS().this_processor ); … … 362 332 processor * const proc = kernelTLS().this_processor; 363 333 const int start = map.self * READYQ_SHARD_FACTOR; 364 const unsigned long long ctsc = rdtscl();365 334 366 335 // Did we already have a help target 367 336 if(proc->rdq.target == -1u) { 368 unsigned long long max = 0; 337 // if We don't have a 338 unsigned long long min = ts(lanes.data[start]); 369 339 for(i; READYQ_SHARD_FACTOR) { 370 unsigned long long tsc = moving_average(ctsc - ts(lanes.data[start + i]), lanes.tscs[start + i].ma); 371 if(tsc > max) max = tsc; 372 } 373 proc->rdq.cutoff = (max + 2 * max) / 2; 340 unsigned long long tsc = ts(lanes.data[start + i]); 341 if(tsc < min) min = tsc; 342 } 343 proc->rdq.cutoff = min; 344 374 345 /* paranoid */ verify(lanes.count < 65536); // The following code assumes max 65536 cores. 375 346 /* paranoid */ verify(map.count < 65536); // The following code assumes max 65536 cores. 376 347 377 if(0 == (__tls_rand() % 10 0)) {348 if(0 == (__tls_rand() % 10_000)) { 378 349 proc->rdq.target = __tls_rand() % lanes.count; 379 350 } else { … … 387 358 } 388 359 else { 389 unsigned long long max = 0; 390 for(i; READYQ_SHARD_FACTOR) { 391 unsigned long long tsc = moving_average(ctsc - ts(lanes.data[start + i]), lanes.tscs[start + i].ma); 392 if(tsc > max) max = tsc; 393 } 394 const unsigned long long cutoff = (max + 2 * max) / 2; 360 const unsigned long long bias = 0; //2_500_000_000; 361 const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff; 395 362 { 396 363 unsigned target = proc->rdq.target; 397 364 proc->rdq.target = -1u; 398 lanes.help[target / READYQ_SHARD_FACTOR].tri++; 399 if(moving_average(ctsc - lanes.tscs[target].tv, lanes.tscs[target].ma) > cutoff) { 400 thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help)); 365 if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) { 366 $thread * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help)); 401 367 proc->rdq.last = target; 402 368 if(t) return t; 403 else proc->rdq.target = -1u;404 369 } 405 else proc->rdq.target = -1u;406 370 } 407 371 408 372 unsigned last = proc->rdq.last; 409 373 if(last != -1u && lanes.tscs[last].tv < cutoff && ts(lanes.data[last]) < cutoff) { 410 thread$* t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.help));374 $thread * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.help)); 411 375 if(t) return t; 412 376 } … … 418 382 for(READYQ_SHARD_FACTOR) { 419 383 unsigned i = start + (proc->rdq.itr++ % READYQ_SHARD_FACTOR); 420 if( thread$* t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;384 if($thread * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t; 421 385 } 422 386 … … 425 389 } 426 390 427 __attribute__((hot)) struct thread$* pop_slow(struct cluster * cltr) with (cltr->ready_queue) {391 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) { 428 392 processor * const proc = kernelTLS().this_processor; 429 393 unsigned last = proc->rdq.last; 430 394 if(last != -1u) { 431 struct thread$* t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.steal));395 struct $thread * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.steal)); 432 396 if(t) return t; 433 397 proc->rdq.last = -1u; … … 437 401 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal)); 438 402 } 439 __attribute__((hot)) struct thread$* pop_search(struct cluster * cltr) {403 __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) { 440 404 return search(cltr); 441 405 } … … 464 428 } 465 429 466 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) {430 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) { 467 431 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr); 468 432 469 const bool external = (hint != UNPARK_LOCAL)|| (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);433 const bool external = !push_local || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr); 470 434 /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count ); 471 435 … … 511 475 512 476 // Pop from the ready queue from a given cluster 513 __attribute__((hot)) thread$* pop_fast(struct cluster * cltr) with (cltr->ready_queue) {477 __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) { 514 478 /* paranoid */ verify( lanes.count > 0 ); 515 479 /* paranoid */ verify( kernelTLS().this_processor ); … … 535 499 536 500 // try popping from the 2 picked lists 537 struct thread$* thrd = try_pop(cltr, i, j __STATS(, *(locali || localj ? &__tls_stats()->ready.pop.local : &__tls_stats()->ready.pop.help)));501 struct $thread * thrd = try_pop(cltr, i, j __STATS(, *(locali || localj ? &__tls_stats()->ready.pop.local : &__tls_stats()->ready.pop.help))); 538 502 if(thrd) { 539 503 return thrd; … … 545 509 } 546 510 547 __attribute__((hot)) struct thread$* pop_slow(struct cluster * cltr) { return pop_fast(cltr); }548 __attribute__((hot)) struct thread$* pop_search(struct cluster * cltr) {511 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) { return pop_fast(cltr); } 512 __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) { 549 513 return search(cltr); 550 514 } 551 515 #endif 552 516 #if defined(USE_WORK_STEALING) 553 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) {517 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) { 554 518 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr); 555 519 556 520 // #define USE_PREFERRED 557 521 #if !defined(USE_PREFERRED) 558 const bool external = (hint != UNPARK_LOCAL)|| (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);522 const bool external = !push_local || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr); 559 523 /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count ); 560 524 #else 561 525 unsigned preferred = thrd->preferred; 562 const bool external = (hint != UNPARK_LOCAL)|| (!kernelTLS().this_processor) || preferred == -1u || thrd->curr_cluster != cltr;526 const bool external = push_local || (!kernelTLS().this_processor) || preferred == -1u || thrd->curr_cluster != cltr; 563 527 /* paranoid */ verifyf(external || preferred < lanes.count, "Invalid preferred queue %u for %u lanes", preferred, lanes.count ); 564 528 … … 605 569 606 570 // Pop from the ready queue from a given cluster 607 __attribute__((hot)) thread$* pop_fast(struct cluster * cltr) with (cltr->ready_queue) {571 __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) { 608 572 /* paranoid */ verify( lanes.count > 0 ); 609 573 /* paranoid */ verify( kernelTLS().this_processor ); … … 627 591 const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff; 628 592 if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) { 629 thread$* t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));593 $thread * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help)); 630 594 if(t) return t; 631 595 } … … 634 598 for(READYQ_SHARD_FACTOR) { 635 599 unsigned i = proc->rdq.id + (proc->rdq.itr++ % READYQ_SHARD_FACTOR); 636 if( thread$* t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;600 if($thread * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t; 637 601 } 638 602 return 0p; 639 603 } 640 604 641 __attribute__((hot)) struct thread$* pop_slow(struct cluster * cltr) with (cltr->ready_queue) {605 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) { 642 606 unsigned i = __tls_rand() % lanes.count; 643 607 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal)); 644 608 } 645 609 646 __attribute__((hot)) struct thread$* pop_search(struct cluster * cltr) with (cltr->ready_queue) {610 __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) with (cltr->ready_queue) { 647 611 return search(cltr); 648 612 } … … 657 621 //----------------------------------------------------------------------- 658 622 // try to pop from a lane given by index w 659 static inline struct thread$* try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {623 static inline struct $thread * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) { 660 624 __STATS( stats.attempt++; ) 661 625 … … 680 644 681 645 // Actually pop the list 682 struct thread$ * thrd; 683 unsigned long long tsc_before = ts(lane); 646 struct $thread * thrd; 684 647 unsigned long long tsv; 685 648 [thrd, tsv] = pop(lane); … … 695 658 __STATS( stats.success++; ) 696 659 697 #if defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING) 698 unsigned long long now = rdtscl(); 660 #if defined(USE_WORK_STEALING) 699 661 lanes.tscs[w].tv = tsv; 700 lanes.tscs[w].ma = moving_average(now > tsc_before ? now - tsc_before : 0, lanes.tscs[w].ma);701 662 #endif 702 663 703 #if defined(USE_CPU_WORK_STEALING) 704 thrd->preferred = w / READYQ_SHARD_FACTOR; 705 #else 706 thrd->preferred = w; 707 #endif 664 thrd->preferred = w; 708 665 709 666 // return the popped thread … … 714 671 // try to pop from any lanes making sure you don't miss any threads push 715 672 // before the start of the function 716 static inline struct thread$* search(struct cluster * cltr) with (cltr->ready_queue) {673 static inline struct $thread * search(struct cluster * cltr) with (cltr->ready_queue) { 717 674 /* paranoid */ verify( lanes.count > 0 ); 718 675 unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED ); … … 720 677 for(i; count) { 721 678 unsigned idx = (offset + i) % count; 722 struct thread$* thrd = try_pop(cltr, idx __STATS(, __tls_stats()->ready.pop.search));679 struct $thread * thrd = try_pop(cltr, idx __STATS(, __tls_stats()->ready.pop.search)); 723 680 if(thrd) { 724 681 return thrd; … … 728 685 // All lanes where empty return 0p 729 686 return 0p; 730 }731 732 //-----------------------------------------------------------------------733 // get preferred ready for new thread734 unsigned ready_queue_new_preferred() {735 unsigned pref = 0;736 if(struct thread$ * thrd = publicTLS_get( this_thread )) {737 pref = thrd->preferred;738 }739 else {740 #if defined(USE_CPU_WORK_STEALING)741 pref = __kernel_getcpu();742 #endif743 }744 745 #if defined(USE_CPU_WORK_STEALING)746 /* paranoid */ verify(pref >= 0);747 /* paranoid */ verify(pref < cpu_info.hthrd_count);748 #endif749 750 return pref;751 687 } 752 688 … … 776 712 //----------------------------------------------------------------------- 777 713 // Given 2 indexes, pick the list with the oldest push an try to pop from it 778 static inline struct thread$* try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {714 static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) { 779 715 // Pick the bet list 780 716 int w = i; … … 911 847 // As long as we can pop from this lane to push the threads somewhere else in the queue 912 848 while(!is_empty(lanes.data[idx])) { 913 struct thread$* thrd;849 struct $thread * thrd; 914 850 unsigned long long _; 915 851 [thrd, _] = pop(lanes.data[idx]); … … 979 915 extern void __enable_interrupts_hard(); 980 916 981 staticvoid __kernel_raw_rseq_register (void) {917 void __kernel_raw_rseq_register (void) { 982 918 /* paranoid */ verify( __cfaabi_rseq.cpu_id == RSEQ_CPU_ID_UNINITIALIZED ); 983 919 … … 997 933 } 998 934 999 staticvoid __kernel_raw_rseq_unregister(void) {935 void __kernel_raw_rseq_unregister(void) { 1000 936 /* paranoid */ verify( __cfaabi_rseq.cpu_id >= 0 ); 1001 937
Note:
See TracChangeset
for help on using the changeset viewer.