Changeset 8d66610 for libcfa/src/concurrency/ready_queue.cfa
- Timestamp:
- May 21, 2021, 4:48:10 PM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- f1bce515
- Parents:
- 5407cdc (diff), 7404cdc (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)links above to see all the changes relative to each parent. - File:
-
- 1 edited
-
libcfa/src/concurrency/ready_queue.cfa (modified) (26 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/ready_queue.cfa
r5407cdc r8d66610 17 17 // #define __CFA_DEBUG_PRINT_READY_QUEUE__ 18 18 19 // #define USE_MPSC20 19 21 20 #define USE_RELAXED_FIFO … … 93 92 this.alloc = 0; 94 93 this.ready = 0; 95 this.lock = false;96 94 this.data = alloc(this.max); 97 98 /*paranoid*/ verify( 0 == (((uintptr_t)(this.data )) % 64) ); 99 /*paranoid*/ verify( 0 == (((uintptr_t)(this.data + 1)) % 64) ); 95 this.write_lock = false; 96 100 97 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc)); 101 98 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready)); … … 106 103 } 107 104 108 void ?{}( __scheduler_lock_id_t & this, __processor_id_t * proc ) {109 this.handle = proc;110 this.lock = false;111 #ifdef __CFA_WITH_VERIFY__112 this.owned = false;113 #endif114 }115 105 116 106 //======================================================================= 117 107 // Lock-Free registering/unregistering of threads 118 void register_proc_id( struct __processor_id_t * proc) with(*__scheduler_lock) {108 unsigned register_proc_id( void ) with(*__scheduler_lock) { 119 109 __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc); 110 bool * handle = (bool *)&kernelTLS().sched_lock; 120 111 121 112 // Step - 1 : check if there is already space in the data … … 124 115 // Check among all the ready 125 116 for(uint_fast32_t i = 0; i < s; i++) { 126 __processor_id_t * null = 0p; // Re-write every loop since compare thrashes it 127 if( __atomic_load_n(&data[i].handle, (int)__ATOMIC_RELAXED) == null 128 && __atomic_compare_exchange_n( &data[i].handle, &null, proc, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 129 /*paranoid*/ verify(i < ready); 130 /*paranoid*/ verify(0 == (__alignof__(data[i]) % cache_line_size)); 131 /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0); 132 proc->id = i; 117 bool * volatile * cell = (bool * volatile *)&data[i]; // Cforall is bugged and the double volatiles causes problems 118 /* paranoid */ verify( handle != *cell ); 119 120 bool * null = 0p; // Re-write every loop since compare thrashes it 121 if( __atomic_load_n(cell, (int)__ATOMIC_RELAXED) == null 122 && __atomic_compare_exchange_n( cell, &null, handle, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 123 /* paranoid */ verify(i < ready); 124 /* paranoid */ verify( (kernelTLS().sched_id = i, true) ); 125 return i; 133 126 } 134 127 } … … 141 134 142 135 // Step - 3 : Mark space as used and then publish it. 143 __scheduler_lock_id_t * storage = (__scheduler_lock_id_t *)&data[n]; 144 (*storage){ proc }; 136 data[n] = handle; 145 137 while() { 146 138 unsigned copy = n; … … 154 146 155 147 // Return new spot. 156 /*paranoid*/ verify(n < ready); 157 /*paranoid*/ verify(__alignof__(data[n]) == (2 * cache_line_size)); 158 /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0); 159 proc->id = n; 160 } 161 162 void unregister_proc_id( struct __processor_id_t * proc ) with(*__scheduler_lock) { 163 unsigned id = proc->id; 164 /*paranoid*/ verify(id < ready); 165 /*paranoid*/ verify(proc == __atomic_load_n(&data[id].handle, __ATOMIC_RELAXED)); 166 __atomic_store_n(&data[id].handle, 0p, __ATOMIC_RELEASE); 148 /* paranoid */ verify(n < ready); 149 /* paranoid */ verify( (kernelTLS().sched_id = n, true) ); 150 return n; 151 } 152 153 void unregister_proc_id( unsigned id ) with(*__scheduler_lock) { 154 /* paranoid */ verify(id < ready); 155 /* paranoid */ verify(id == kernelTLS().sched_id); 156 /* paranoid */ verify(data[id] == &kernelTLS().sched_lock); 157 158 bool * volatile * cell = (bool * volatile *)&data[id]; // Cforall is bugged and the double volatiles causes problems 159 160 __atomic_store_n(cell, 0p, __ATOMIC_RELEASE); 167 161 168 162 __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc); … … 174 168 uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) { 175 169 /* paranoid */ verify( ! __preemption_enabled() ); 170 /* paranoid */ verify( ! kernelTLS().sched_lock ); 176 171 177 172 // Step 1 : lock global lock 178 173 // It is needed to avoid processors that register mid Critical-Section 179 174 // to simply lock their own lock and enter. 180 __atomic_acquire( & lock );175 __atomic_acquire( &write_lock ); 181 176 182 177 // Step 2 : lock per-proc lock … … 186 181 uint_fast32_t s = ready; 187 182 for(uint_fast32_t i = 0; i < s; i++) { 188 __atomic_acquire( &data[i].lock ); 183 volatile bool * llock = data[i]; 184 if(llock) __atomic_acquire( llock ); 189 185 } 190 186 … … 203 199 // Alternative solution : return s in write_lock and pass it to write_unlock 204 200 for(uint_fast32_t i = 0; i < last_s; i++) { 205 v erify(data[i].lock);206 __atomic_store_n(&data[i].lock, (bool)false, __ATOMIC_RELEASE);201 volatile bool * llock = data[i]; 202 if(llock) __atomic_store_n(llock, (bool)false, __ATOMIC_RELEASE); 207 203 } 208 204 209 205 // Step 2 : release global lock 210 /*paranoid*/ assert(true == lock);211 __atomic_store_n(& lock, (bool)false, __ATOMIC_RELEASE);206 /*paranoid*/ assert(true == write_lock); 207 __atomic_store_n(&write_lock, (bool)false, __ATOMIC_RELEASE); 212 208 213 209 /* paranoid */ verify( ! __preemption_enabled() ); … … 253 249 } 254 250 255 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd ) with (cltr->ready_queue) {251 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) { 256 252 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr); 257 253 258 const bool external = (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);254 const bool external = !push_local || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr); 259 255 /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count ); 260 261 // write timestamp262 thrd->link.ts = rdtscl();263 256 264 257 bool local; … … 280 273 #endif 281 274 282 #if defined(USE_MPSC)283 // mpsc always succeeds284 } while( false );285 #else286 275 // If we can't lock it retry 287 276 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); 288 #endif289 277 290 278 // Actually push it 291 279 push(lanes.data[i], thrd); 292 280 293 #if !defined(USE_MPSC) 294 // Unlock and return 295 __atomic_unlock( &lanes.data[i].lock ); 296 #endif 281 // Unlock and return 282 __atomic_unlock( &lanes.data[i].lock ); 297 283 298 284 // Mark the current index in the tls rng instance as having an item … … 350 336 #endif 351 337 #if defined(USE_WORK_STEALING) 352 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd ) with (cltr->ready_queue) {338 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) { 353 339 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr); 354 340 355 const bool external = (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr); 341 // #define USE_PREFERRED 342 #if !defined(USE_PREFERRED) 343 const bool external = !push_local || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr); 356 344 /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count ); 357 358 // write timestamp 359 thrd->link.ts = rdtscl(); 345 #else 346 unsigned preferred = thrd->preferred; 347 const bool external = push_local || (!kernelTLS().this_processor) || preferred == -1u || thrd->curr_cluster != cltr; 348 /* paranoid */ verifyf(external || preferred < lanes.count, "Invalid preferred queue %u for %u lanes", preferred, lanes.count ); 349 350 unsigned r = preferred % READYQ_SHARD_FACTOR; 351 const unsigned start = preferred - r; 352 #endif 360 353 361 354 // Try to pick a lane and lock it … … 371 364 } 372 365 else { 373 processor * proc = kernelTLS().this_processor; 374 unsigned r = proc->rdq.its++; 375 i = proc->rdq.id + (r % READYQ_SHARD_FACTOR); 366 #if !defined(USE_PREFERRED) 367 processor * proc = kernelTLS().this_processor; 368 unsigned r = proc->rdq.its++; 369 i = proc->rdq.id + (r % READYQ_SHARD_FACTOR); 370 #else 371 i = start + (r++ % READYQ_SHARD_FACTOR); 372 #endif 376 373 } 377 378 379 #if defined(USE_MPSC)380 // mpsc always succeeds381 } while( false );382 #else383 374 // If we can't lock it retry 384 375 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); 385 #endif386 376 387 377 // Actually push it 388 378 push(lanes.data[i], thrd); 389 379 390 #if !defined(USE_MPSC) 391 // Unlock and return 392 __atomic_unlock( &lanes.data[i].lock ); 393 #endif 380 // Unlock and return 381 __atomic_unlock( &lanes.data[i].lock ); 394 382 395 383 #if !defined(__CFA_NO_STATISTICS__) … … 410 398 411 399 if(proc->rdq.target == -1u) { 400 unsigned long long min = ts(lanes.data[proc->rdq.id]); 401 for(int i = 0; i < READYQ_SHARD_FACTOR; i++) { 402 unsigned long long tsc = ts(lanes.data[proc->rdq.id + i]); 403 if(tsc < min) min = tsc; 404 } 405 proc->rdq.cutoff = min; 412 406 proc->rdq.target = __tls_rand() % lanes.count; 413 unsigned it1 = proc->rdq.itr;414 unsigned it2 = proc->rdq.itr + 1;415 unsigned idx1 = proc->rdq.id + (it1 % READYQ_SHARD_FACTOR);416 unsigned idx2 = proc->rdq.id + (it2 % READYQ_SHARD_FACTOR);417 unsigned long long tsc1 = ts(lanes.data[idx1]);418 unsigned long long tsc2 = ts(lanes.data[idx2]);419 proc->rdq.cutoff = min(tsc1, tsc2);420 if(proc->rdq.cutoff == 0) proc->rdq.cutoff = -1ull;421 407 } 422 408 else { 423 409 unsigned target = proc->rdq.target; 424 410 proc->rdq.target = -1u; 425 if(lanes.tscs[target].tv < proc->rdq.cutoff) { 411 const unsigned long long bias = 0; //2_500_000_000; 412 const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff; 413 if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) { 426 414 $thread * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help)); 427 415 if(t) return t; … … 430 418 431 419 for(READYQ_SHARD_FACTOR) { 432 unsigned i = proc->rdq.id + ( --proc->rdq.itr% READYQ_SHARD_FACTOR);420 unsigned i = proc->rdq.id + (proc->rdq.itr++ % READYQ_SHARD_FACTOR); 433 421 if($thread * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t; 434 422 } … … 462 450 // If list looks empty retry 463 451 if( is_empty(lane) ) { 464 __STATS( stats.espec++; )465 452 return 0p; 466 453 } … … 468 455 // If we can't get the lock retry 469 456 if( !__atomic_try_acquire(&lane.lock) ) { 470 __STATS( stats.elock++; )471 457 return 0p; 472 458 } … … 475 461 if( is_empty(lane) ) { 476 462 __atomic_unlock(&lane.lock); 477 __STATS( stats.eempty++; )478 463 return 0p; 479 464 } … … 481 466 // Actually pop the list 482 467 struct $thread * thrd; 483 thrd = pop(lane); 468 unsigned long long tsv; 469 [thrd, tsv] = pop(lane); 484 470 485 471 /* paranoid */ verify(thrd); 472 /* paranoid */ verify(tsv); 486 473 /* paranoid */ verify(lane.lock); 487 474 … … 493 480 494 481 #if defined(USE_WORK_STEALING) 495 lanes.tscs[w].tv = t hrd->link.ts;482 lanes.tscs[w].tv = tsv; 496 483 #endif 484 485 thrd->preferred = w; 497 486 498 487 // return the popped thread … … 522 511 // Check that all the intrusive queues in the data structure are still consistent 523 512 static void check( __ready_queue_t & q ) with (q) { 524 #if defined(__CFA_WITH_VERIFY__) && !defined(USE_MPSC)513 #if defined(__CFA_WITH_VERIFY__) 525 514 { 526 515 for( idx ; lanes.count ) { … … 528 517 assert(!lanes.data[idx].lock); 529 518 530 assert(head(sl)->link.prev == 0p ); 531 assert(head(sl)->link.next->link.prev == head(sl) ); 532 assert(tail(sl)->link.next == 0p ); 533 assert(tail(sl)->link.prev->link.next == tail(sl) ); 534 535 if(is_empty(sl)) { 536 assert(tail(sl)->link.prev == head(sl)); 537 assert(head(sl)->link.next == tail(sl)); 538 } else { 539 assert(tail(sl)->link.prev != head(sl)); 540 assert(head(sl)->link.next != tail(sl)); 541 } 519 if(is_empty(sl)) { 520 assert( sl.anchor.next == 0p ); 521 assert( sl.anchor.ts == 0 ); 522 assert( mock_head(sl) == sl.prev ); 523 } else { 524 assert( sl.anchor.next != 0p ); 525 assert( sl.anchor.ts != 0 ); 526 assert( mock_head(sl) != sl.prev ); 527 } 542 528 } 543 529 } … … 560 546 // fixes the list so that the pointers back to anchors aren't left dangling 561 547 static inline void fix(__intrusive_lane_t & ll) { 562 #if !defined(USE_MPSC) 563 // if the list is not empty then follow he pointer and fix its reverse 564 if(!is_empty(ll)) { 565 head(ll)->link.next->link.prev = head(ll); 566 tail(ll)->link.prev->link.next = tail(ll); 567 } 568 // Otherwise just reset the list 569 else { 570 verify(tail(ll)->link.next == 0p); 571 tail(ll)->link.prev = head(ll); 572 head(ll)->link.next = tail(ll); 573 verify(head(ll)->link.prev == 0p); 574 } 575 #endif 576 } 577 578 static void assign_list(unsigned & value, dlist(processor, processor) & list, unsigned count) { 548 if(is_empty(ll)) { 549 verify(ll.anchor.next == 0p); 550 ll.prev = mock_head(ll); 551 } 552 } 553 554 static void assign_list(unsigned & value, dlist(processor) & list, unsigned count) { 579 555 processor * it = &list`first; 580 556 for(unsigned i = 0; i < count; i++) { … … 597 573 lanes.tscs = alloc(lanes.count, lanes.tscs`realloc); 598 574 for(i; lanes.count) { 599 lanes.tscs[i].tv = ts(lanes.data[i]); 575 unsigned long long tsc = ts(lanes.data[i]); 576 lanes.tscs[i].tv = tsc != 0 ? tsc : rdtscl(); 600 577 } 601 578 #endif … … 686 663 while(!is_empty(lanes.data[idx])) { 687 664 struct $thread * thrd; 688 thrd = pop(lanes.data[idx]); 689 690 push(cltr, thrd); 665 unsigned long long _; 666 [thrd, _] = pop(lanes.data[idx]); 667 668 push(cltr, thrd, true); 691 669 692 670 // for printing count the number of displaced threads … … 725 703 /* paranoid */ verify( ready_mutate_islocked() ); 726 704 } 705 706 #if !defined(__CFA_NO_STATISTICS__) 707 unsigned cnt(const __ready_queue_t & this, unsigned idx) { 708 /* paranoid */ verify(this.lanes.count > idx); 709 return this.lanes.data[idx].cnt; 710 } 711 #endif
Note:
See TracChangeset
for help on using the changeset viewer.