Changeset 949339b for libcfa/src/concurrency/ready_queue.cfa
- Timestamp:
- Sep 27, 2021, 2:09:55 PM (4 years ago)
- Branches:
- ADT, ast-experimental, enum, forall-pointer-decay, master, pthread-emulation, qualifiedEnum
- Children:
- cc287800
- Parents:
- 4e28d2e9 (diff), 056cbdb (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/ready_queue.cfa
r4e28d2e9 r949339b 100 100 #define __kernel_rseq_unregister rseq_unregister_current_thread 101 101 #elif defined(CFA_HAVE_LINUX_RSEQ_H) 102 void __kernel_raw_rseq_register (void);103 void __kernel_raw_rseq_unregister(void);102 static void __kernel_raw_rseq_register (void); 103 static void __kernel_raw_rseq_unregister(void); 104 104 105 105 #define __kernel_rseq_register __kernel_raw_rseq_register … … 246 246 // Cforall Ready Queue used for scheduling 247 247 //======================================================================= 248 unsigned long long moving_average(unsigned long long nval, unsigned long long oval) { 249 const unsigned long long tw = 16; 250 const unsigned long long nw = 4; 251 const unsigned long long ow = tw - nw; 252 return ((nw * nval) + (ow * oval)) / tw; 253 } 254 248 255 void ?{}(__ready_queue_t & this) with (this) { 249 256 #if defined(USE_CPU_WORK_STEALING) … … 251 258 lanes.data = alloc( lanes.count ); 252 259 lanes.tscs = alloc( lanes.count ); 260 lanes.help = alloc( cpu_info.hthrd_count ); 253 261 254 262 for( idx; (size_t)lanes.count ) { 255 263 (lanes.data[idx]){}; 256 264 lanes.tscs[idx].tv = rdtscl(); 265 lanes.tscs[idx].ma = rdtscl(); 266 } 267 for( idx; (size_t)cpu_info.hthrd_count ) { 268 lanes.help[idx].src = 0; 269 lanes.help[idx].dst = 0; 270 lanes.help[idx].tri = 0; 257 271 } 258 272 #else 259 273 lanes.data = 0p; 260 274 lanes.tscs = 0p; 275 lanes.help = 0p; 261 276 lanes.count = 0; 262 277 #endif … … 270 285 free(lanes.data); 271 286 free(lanes.tscs); 287 free(lanes.help); 272 288 } 273 289 274 290 //----------------------------------------------------------------------- 275 291 #if defined(USE_CPU_WORK_STEALING) 276 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, bool push_local) with (cltr->ready_queue) {292 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) { 277 293 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr); 278 294 279 295 processor * const proc = kernelTLS().this_processor; 280 const bool external = !push_local || (!proc) || (cltr != proc->cltr); 281 296 const bool external = (!proc) || (cltr != proc->cltr); 297 298 // Figure out the current cpu and make sure it is valid 282 299 const int cpu = __kernel_getcpu(); 283 300 /* paranoid */ verify(cpu >= 0); … … 285 302 /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count); 286 303 287 const cpu_map_entry_t & map = cpu_info.llc_map[cpu]; 304 // Figure out where thread was last time and make sure it's 305 /* paranoid */ verify(thrd->preferred >= 0); 306 /* paranoid */ verify(thrd->preferred < cpu_info.hthrd_count); 307 /* paranoid */ verify(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count); 308 const int prf = thrd->preferred * READYQ_SHARD_FACTOR; 309 310 const cpu_map_entry_t & map; 311 choose(hint) { 312 case UNPARK_LOCAL : &map = &cpu_info.llc_map[cpu]; 313 case UNPARK_REMOTE: &map = &cpu_info.llc_map[prf]; 314 } 288 315 /* paranoid */ verify(map.start * READYQ_SHARD_FACTOR < lanes.count); 289 316 /* paranoid */ verify(map.self * READYQ_SHARD_FACTOR < lanes.count); … … 296 323 if(unlikely(external)) { r = __tls_rand(); } 297 324 else { r = proc->rdq.its++; } 298 i = start + (r % READYQ_SHARD_FACTOR); 325 choose(hint) { 326 case UNPARK_LOCAL : i = start + (r % READYQ_SHARD_FACTOR); 327 case UNPARK_REMOTE: i = prf + (r % READYQ_SHARD_FACTOR); 328 } 299 329 // If we can't lock it retry 300 330 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); … … 332 362 processor * const proc = kernelTLS().this_processor; 333 363 const int start = map.self * READYQ_SHARD_FACTOR; 364 const unsigned long long ctsc = rdtscl(); 334 365 335 366 // Did we already have a help target 336 367 if(proc->rdq.target == -1u) { 337 // if We don't have a 338 unsigned long long min = ts(lanes.data[start]); 368 unsigned long long max = 0; 339 369 for(i; READYQ_SHARD_FACTOR) { 340 unsigned long long tsc = ts(lanes.data[start + i]); 341 if(tsc < min) min = tsc; 342 } 343 proc->rdq.cutoff = min; 344 370 unsigned long long tsc = moving_average(ctsc - ts(lanes.data[start + i]), lanes.tscs[start + i].ma); 371 if(tsc > max) max = tsc; 372 } 373 proc->rdq.cutoff = (max + 2 * max) / 2; 345 374 /* paranoid */ verify(lanes.count < 65536); // The following code assumes max 65536 cores. 346 375 /* paranoid */ verify(map.count < 65536); // The following code assumes max 65536 cores. 347 376 348 if(0 == (__tls_rand() % 10 _000)) {377 if(0 == (__tls_rand() % 100)) { 349 378 proc->rdq.target = __tls_rand() % lanes.count; 350 379 } else { … … 358 387 } 359 388 else { 360 const unsigned long long bias = 0; //2_500_000_000; 361 const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff; 389 unsigned long long max = 0; 390 for(i; READYQ_SHARD_FACTOR) { 391 unsigned long long tsc = moving_average(ctsc - ts(lanes.data[start + i]), lanes.tscs[start + i].ma); 392 if(tsc > max) max = tsc; 393 } 394 const unsigned long long cutoff = (max + 2 * max) / 2; 362 395 { 363 396 unsigned target = proc->rdq.target; 364 397 proc->rdq.target = -1u; 365 if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) { 398 lanes.help[target / READYQ_SHARD_FACTOR].tri++; 399 if(moving_average(ctsc - lanes.tscs[target].tv, lanes.tscs[target].ma) > cutoff) { 366 400 thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help)); 367 401 proc->rdq.last = target; 368 402 if(t) return t; 403 else proc->rdq.target = -1u; 369 404 } 405 else proc->rdq.target = -1u; 370 406 } 371 407 … … 428 464 } 429 465 430 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, bool push_local) with (cltr->ready_queue) {466 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) { 431 467 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr); 432 468 433 const bool external = !push_local|| (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);469 const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr); 434 470 /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count ); 435 471 … … 515 551 #endif 516 552 #if defined(USE_WORK_STEALING) 517 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, bool push_local) with (cltr->ready_queue) {553 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) { 518 554 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr); 519 555 520 556 // #define USE_PREFERRED 521 557 #if !defined(USE_PREFERRED) 522 const bool external = !push_local|| (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);558 const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr); 523 559 /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count ); 524 560 #else 525 561 unsigned preferred = thrd->preferred; 526 const bool external = push_local|| (!kernelTLS().this_processor) || preferred == -1u || thrd->curr_cluster != cltr;562 const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || preferred == -1u || thrd->curr_cluster != cltr; 527 563 /* paranoid */ verifyf(external || preferred < lanes.count, "Invalid preferred queue %u for %u lanes", preferred, lanes.count ); 528 564 … … 645 681 // Actually pop the list 646 682 struct thread$ * thrd; 683 unsigned long long tsc_before = ts(lane); 647 684 unsigned long long tsv; 648 685 [thrd, tsv] = pop(lane); … … 658 695 __STATS( stats.success++; ) 659 696 660 #if defined(USE_WORK_STEALING) 697 #if defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING) 698 unsigned long long now = rdtscl(); 661 699 lanes.tscs[w].tv = tsv; 700 lanes.tscs[w].ma = moving_average(now > tsc_before ? now - tsc_before : 0, lanes.tscs[w].ma); 662 701 #endif 663 702 664 thrd->preferred = w; 703 #if defined(USE_CPU_WORK_STEALING) 704 thrd->preferred = w / READYQ_SHARD_FACTOR; 705 #else 706 thrd->preferred = w; 707 #endif 665 708 666 709 // return the popped thread … … 688 731 689 732 //----------------------------------------------------------------------- 733 // get preferred ready for new thread 734 unsigned ready_queue_new_preferred() { 735 unsigned pref = 0; 736 if(struct thread$ * thrd = publicTLS_get( this_thread )) { 737 pref = thrd->preferred; 738 } 739 else { 740 #if defined(USE_CPU_WORK_STEALING) 741 pref = __kernel_getcpu(); 742 #endif 743 } 744 745 #if defined(USE_CPU_WORK_STEALING) 746 /* paranoid */ verify(pref >= 0); 747 /* paranoid */ verify(pref < cpu_info.hthrd_count); 748 #endif 749 750 return pref; 751 } 752 753 //----------------------------------------------------------------------- 690 754 // Check that all the intrusive queues in the data structure are still consistent 691 755 static void check( __ready_queue_t & q ) with (q) { … … 915 979 extern void __enable_interrupts_hard(); 916 980 917 void __kernel_raw_rseq_register (void) {981 static void __kernel_raw_rseq_register (void) { 918 982 /* paranoid */ verify( __cfaabi_rseq.cpu_id == RSEQ_CPU_ID_UNINITIALIZED ); 919 983 … … 933 997 } 934 998 935 void __kernel_raw_rseq_unregister(void) {999 static void __kernel_raw_rseq_unregister(void) { 936 1000 /* paranoid */ verify( __cfaabi_rseq.cpu_id >= 0 ); 937 1001
Note:
See TracChangeset
for help on using the changeset viewer.