- File:
-
- 1 edited
-
libcfa/src/concurrency/ready_queue.cfa (modified) (30 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/ready_queue.cfa
r1f45c7d r75c7252 67 67 #endif 68 68 69 static inline struct $thread* try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats));70 static inline struct $thread* try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats));71 static inline struct $thread* search(struct cluster * cltr);69 static inline struct thread$ * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)); 70 static inline struct thread$ * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)); 71 static inline struct thread$ * search(struct cluster * cltr); 72 72 static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred); 73 73 … … 100 100 #define __kernel_rseq_unregister rseq_unregister_current_thread 101 101 #elif defined(CFA_HAVE_LINUX_RSEQ_H) 102 void __kernel_raw_rseq_register (void);103 void __kernel_raw_rseq_unregister(void);102 static void __kernel_raw_rseq_register (void); 103 static void __kernel_raw_rseq_unregister(void); 104 104 105 105 #define __kernel_rseq_register __kernel_raw_rseq_register … … 246 246 // Cforall Ready Queue used for scheduling 247 247 //======================================================================= 248 unsigned long long moving_average(unsigned long long nval, unsigned long long oval) { 249 const unsigned long long tw = 16; 250 const unsigned long long nw = 4; 251 const unsigned long long ow = tw - nw; 252 return ((nw * nval) + (ow * oval)) / tw; 253 } 254 248 255 void ?{}(__ready_queue_t & this) with (this) { 249 256 #if defined(USE_CPU_WORK_STEALING) … … 251 258 lanes.data = alloc( lanes.count ); 252 259 lanes.tscs = alloc( lanes.count ); 260 lanes.help = alloc( cpu_info.hthrd_count ); 253 261 254 262 for( idx; (size_t)lanes.count ) { 255 263 (lanes.data[idx]){}; 256 264 lanes.tscs[idx].tv = rdtscl(); 265 lanes.tscs[idx].ma = rdtscl(); 266 } 267 for( idx; (size_t)cpu_info.hthrd_count ) { 268 lanes.help[idx].src = 0; 269 lanes.help[idx].dst = 0; 270 lanes.help[idx].tri = 0; 257 271 } 258 272 #else 259 273 lanes.data = 0p; 260 274 lanes.tscs = 0p; 275 lanes.help = 0p; 261 276 lanes.count = 0; 262 277 #endif … … 270 285 free(lanes.data); 271 286 free(lanes.tscs); 287 free(lanes.help); 272 288 } 273 289 274 290 //----------------------------------------------------------------------- 275 291 #if defined(USE_CPU_WORK_STEALING) 276 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {292 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) { 277 293 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr); 278 294 279 295 processor * const proc = kernelTLS().this_processor; 280 const bool external = !push_local || (!proc) || (cltr != proc->cltr); 281 296 const bool external = (!proc) || (cltr != proc->cltr); 297 298 // Figure out the current cpu and make sure it is valid 282 299 const int cpu = __kernel_getcpu(); 283 300 /* paranoid */ verify(cpu >= 0); … … 285 302 /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count); 286 303 287 const cpu_map_entry_t & map = cpu_info.llc_map[cpu]; 304 // Figure out where thread was last time and make sure it's 305 /* paranoid */ verify(thrd->preferred >= 0); 306 /* paranoid */ verify(thrd->preferred < cpu_info.hthrd_count); 307 /* paranoid */ verify(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count); 308 const int prf = thrd->preferred * READYQ_SHARD_FACTOR; 309 310 const cpu_map_entry_t & map; 311 choose(hint) { 312 case UNPARK_LOCAL : &map = &cpu_info.llc_map[cpu]; 313 case UNPARK_REMOTE: &map = &cpu_info.llc_map[prf]; 314 } 288 315 /* paranoid */ verify(map.start * READYQ_SHARD_FACTOR < lanes.count); 289 316 /* paranoid */ verify(map.self * READYQ_SHARD_FACTOR < lanes.count); … … 296 323 if(unlikely(external)) { r = __tls_rand(); } 297 324 else { r = proc->rdq.its++; } 298 i = start + (r % READYQ_SHARD_FACTOR); 325 choose(hint) { 326 case UNPARK_LOCAL : i = start + (r % READYQ_SHARD_FACTOR); 327 case UNPARK_REMOTE: i = prf + (r % READYQ_SHARD_FACTOR); 328 } 299 329 // If we can't lock it retry 300 330 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); … … 316 346 317 347 // Pop from the ready queue from a given cluster 318 __attribute__((hot)) $thread* pop_fast(struct cluster * cltr) with (cltr->ready_queue) {348 __attribute__((hot)) thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) { 319 349 /* paranoid */ verify( lanes.count > 0 ); 320 350 /* paranoid */ verify( kernelTLS().this_processor ); … … 332 362 processor * const proc = kernelTLS().this_processor; 333 363 const int start = map.self * READYQ_SHARD_FACTOR; 364 const unsigned long long ctsc = rdtscl(); 334 365 335 366 // Did we already have a help target 336 367 if(proc->rdq.target == -1u) { 337 // if We don't have a 338 unsigned long long min = ts(lanes.data[start]); 368 unsigned long long max = 0; 339 369 for(i; READYQ_SHARD_FACTOR) { 340 unsigned long long tsc = ts(lanes.data[start + i]); 341 if(tsc < min) min = tsc; 342 } 343 proc->rdq.cutoff = min; 344 370 unsigned long long tsc = moving_average(ctsc - ts(lanes.data[start + i]), lanes.tscs[start + i].ma); 371 if(tsc > max) max = tsc; 372 } 373 proc->rdq.cutoff = (max + 2 * max) / 2; 345 374 /* paranoid */ verify(lanes.count < 65536); // The following code assumes max 65536 cores. 346 375 /* paranoid */ verify(map.count < 65536); // The following code assumes max 65536 cores. 347 376 348 if(0 == (__tls_rand() % 10 _000)) {377 if(0 == (__tls_rand() % 100)) { 349 378 proc->rdq.target = __tls_rand() % lanes.count; 350 379 } else { … … 358 387 } 359 388 else { 360 const unsigned long long bias = 0; //2_500_000_000; 361 const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff; 389 unsigned long long max = 0; 390 for(i; READYQ_SHARD_FACTOR) { 391 unsigned long long tsc = moving_average(ctsc - ts(lanes.data[start + i]), lanes.tscs[start + i].ma); 392 if(tsc > max) max = tsc; 393 } 394 const unsigned long long cutoff = (max + 2 * max) / 2; 362 395 { 363 396 unsigned target = proc->rdq.target; 364 397 proc->rdq.target = -1u; 365 if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) { 366 $thread * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help)); 398 lanes.help[target / READYQ_SHARD_FACTOR].tri++; 399 if(moving_average(ctsc - lanes.tscs[target].tv, lanes.tscs[target].ma) > cutoff) { 400 thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help)); 367 401 proc->rdq.last = target; 368 402 if(t) return t; 403 else proc->rdq.target = -1u; 369 404 } 405 else proc->rdq.target = -1u; 370 406 } 371 407 372 408 unsigned last = proc->rdq.last; 373 409 if(last != -1u && lanes.tscs[last].tv < cutoff && ts(lanes.data[last]) < cutoff) { 374 $thread* t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.help));410 thread$ * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.help)); 375 411 if(t) return t; 376 412 } … … 382 418 for(READYQ_SHARD_FACTOR) { 383 419 unsigned i = start + (proc->rdq.itr++ % READYQ_SHARD_FACTOR); 384 if( $thread* t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;420 if(thread$ * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t; 385 421 } 386 422 … … 389 425 } 390 426 391 __attribute__((hot)) struct $thread* pop_slow(struct cluster * cltr) with (cltr->ready_queue) {427 __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) with (cltr->ready_queue) { 392 428 processor * const proc = kernelTLS().this_processor; 393 429 unsigned last = proc->rdq.last; 394 430 if(last != -1u) { 395 struct $thread* t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.steal));431 struct thread$ * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.steal)); 396 432 if(t) return t; 397 433 proc->rdq.last = -1u; … … 401 437 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal)); 402 438 } 403 __attribute__((hot)) struct $thread* pop_search(struct cluster * cltr) {439 __attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) { 404 440 return search(cltr); 405 441 } … … 428 464 } 429 465 430 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {466 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) { 431 467 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr); 432 468 433 const bool external = !push_local|| (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);469 const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr); 434 470 /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count ); 435 471 … … 475 511 476 512 // Pop from the ready queue from a given cluster 477 __attribute__((hot)) $thread* pop_fast(struct cluster * cltr) with (cltr->ready_queue) {513 __attribute__((hot)) thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) { 478 514 /* paranoid */ verify( lanes.count > 0 ); 479 515 /* paranoid */ verify( kernelTLS().this_processor ); … … 499 535 500 536 // try popping from the 2 picked lists 501 struct $thread* thrd = try_pop(cltr, i, j __STATS(, *(locali || localj ? &__tls_stats()->ready.pop.local : &__tls_stats()->ready.pop.help)));537 struct thread$ * thrd = try_pop(cltr, i, j __STATS(, *(locali || localj ? &__tls_stats()->ready.pop.local : &__tls_stats()->ready.pop.help))); 502 538 if(thrd) { 503 539 return thrd; … … 509 545 } 510 546 511 __attribute__((hot)) struct $thread* pop_slow(struct cluster * cltr) { return pop_fast(cltr); }512 __attribute__((hot)) struct $thread* pop_search(struct cluster * cltr) {547 __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) { return pop_fast(cltr); } 548 __attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) { 513 549 return search(cltr); 514 550 } 515 551 #endif 516 552 #if defined(USE_WORK_STEALING) 517 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {553 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) { 518 554 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr); 519 555 520 556 // #define USE_PREFERRED 521 557 #if !defined(USE_PREFERRED) 522 const bool external = !push_local|| (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);558 const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr); 523 559 /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count ); 524 560 #else 525 561 unsigned preferred = thrd->preferred; 526 const bool external = push_local|| (!kernelTLS().this_processor) || preferred == -1u || thrd->curr_cluster != cltr;562 const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || preferred == -1u || thrd->curr_cluster != cltr; 527 563 /* paranoid */ verifyf(external || preferred < lanes.count, "Invalid preferred queue %u for %u lanes", preferred, lanes.count ); 528 564 … … 569 605 570 606 // Pop from the ready queue from a given cluster 571 __attribute__((hot)) $thread* pop_fast(struct cluster * cltr) with (cltr->ready_queue) {607 __attribute__((hot)) thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) { 572 608 /* paranoid */ verify( lanes.count > 0 ); 573 609 /* paranoid */ verify( kernelTLS().this_processor ); … … 591 627 const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff; 592 628 if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) { 593 $thread* t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));629 thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help)); 594 630 if(t) return t; 595 631 } … … 598 634 for(READYQ_SHARD_FACTOR) { 599 635 unsigned i = proc->rdq.id + (proc->rdq.itr++ % READYQ_SHARD_FACTOR); 600 if( $thread* t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;636 if(thread$ * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t; 601 637 } 602 638 return 0p; 603 639 } 604 640 605 __attribute__((hot)) struct $thread* pop_slow(struct cluster * cltr) with (cltr->ready_queue) {641 __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) with (cltr->ready_queue) { 606 642 unsigned i = __tls_rand() % lanes.count; 607 643 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal)); 608 644 } 609 645 610 __attribute__((hot)) struct $thread* pop_search(struct cluster * cltr) with (cltr->ready_queue) {646 __attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) with (cltr->ready_queue) { 611 647 return search(cltr); 612 648 } … … 621 657 //----------------------------------------------------------------------- 622 658 // try to pop from a lane given by index w 623 static inline struct $thread* try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {659 static inline struct thread$ * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) { 624 660 __STATS( stats.attempt++; ) 625 661 … … 644 680 645 681 // Actually pop the list 646 struct $thread * thrd; 682 struct thread$ * thrd; 683 unsigned long long tsc_before = ts(lane); 647 684 unsigned long long tsv; 648 685 [thrd, tsv] = pop(lane); … … 658 695 __STATS( stats.success++; ) 659 696 660 #if defined(USE_WORK_STEALING) 697 #if defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING) 698 unsigned long long now = rdtscl(); 661 699 lanes.tscs[w].tv = tsv; 700 lanes.tscs[w].ma = moving_average(now > tsc_before ? now - tsc_before : 0, lanes.tscs[w].ma); 662 701 #endif 663 702 664 thrd->preferred = w; 703 #if defined(USE_CPU_WORK_STEALING) 704 thrd->preferred = w / READYQ_SHARD_FACTOR; 705 #else 706 thrd->preferred = w; 707 #endif 665 708 666 709 // return the popped thread … … 671 714 // try to pop from any lanes making sure you don't miss any threads push 672 715 // before the start of the function 673 static inline struct $thread* search(struct cluster * cltr) with (cltr->ready_queue) {716 static inline struct thread$ * search(struct cluster * cltr) with (cltr->ready_queue) { 674 717 /* paranoid */ verify( lanes.count > 0 ); 675 718 unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED ); … … 677 720 for(i; count) { 678 721 unsigned idx = (offset + i) % count; 679 struct $thread* thrd = try_pop(cltr, idx __STATS(, __tls_stats()->ready.pop.search));722 struct thread$ * thrd = try_pop(cltr, idx __STATS(, __tls_stats()->ready.pop.search)); 680 723 if(thrd) { 681 724 return thrd; … … 685 728 // All lanes where empty return 0p 686 729 return 0p; 730 } 731 732 //----------------------------------------------------------------------- 733 // get preferred ready for new thread 734 unsigned ready_queue_new_preferred() { 735 unsigned pref = 0; 736 if(struct thread$ * thrd = publicTLS_get( this_thread )) { 737 pref = thrd->preferred; 738 } 739 else { 740 #if defined(USE_CPU_WORK_STEALING) 741 pref = __kernel_getcpu(); 742 #endif 743 } 744 745 #if defined(USE_CPU_WORK_STEALING) 746 /* paranoid */ verify(pref >= 0); 747 /* paranoid */ verify(pref < cpu_info.hthrd_count); 748 #endif 749 750 return pref; 687 751 } 688 752 … … 712 776 //----------------------------------------------------------------------- 713 777 // Given 2 indexes, pick the list with the oldest push an try to pop from it 714 static inline struct $thread* try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {778 static inline struct thread$ * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) { 715 779 // Pick the bet list 716 780 int w = i; … … 847 911 // As long as we can pop from this lane to push the threads somewhere else in the queue 848 912 while(!is_empty(lanes.data[idx])) { 849 struct $thread* thrd;913 struct thread$ * thrd; 850 914 unsigned long long _; 851 915 [thrd, _] = pop(lanes.data[idx]); … … 915 979 extern void __enable_interrupts_hard(); 916 980 917 void __kernel_raw_rseq_register (void) {981 static void __kernel_raw_rseq_register (void) { 918 982 /* paranoid */ verify( __cfaabi_rseq.cpu_id == RSEQ_CPU_ID_UNINITIALIZED ); 919 983 … … 933 997 } 934 998 935 void __kernel_raw_rseq_unregister(void) {999 static void __kernel_raw_rseq_unregister(void) { 936 1000 /* paranoid */ verify( __cfaabi_rseq.cpu_id >= 0 ); 937 1001
Note:
See TracChangeset
for help on using the changeset viewer.