- File:
-
- 1 edited
-
libcfa/src/concurrency/ready_queue.cfa (modified) (26 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/ready_queue.cfa
rb808625 rfc59df78 17 17 // #define __CFA_DEBUG_PRINT_READY_QUEUE__ 18 18 19 // #define USE_MPSC 19 20 20 21 #define USE_RELAXED_FIFO … … 92 93 this.alloc = 0; 93 94 this.ready = 0; 95 this.lock = false; 94 96 this.data = alloc(this.max); 95 this.write_lock = false; 96 97 98 /*paranoid*/ verify( 0 == (((uintptr_t)(this.data )) % 64) ); 99 /*paranoid*/ verify( 0 == (((uintptr_t)(this.data + 1)) % 64) ); 97 100 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc)); 98 101 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready)); … … 103 106 } 104 107 108 void ?{}( __scheduler_lock_id_t & this, __processor_id_t * proc ) { 109 this.handle = proc; 110 this.lock = false; 111 #ifdef __CFA_WITH_VERIFY__ 112 this.owned = false; 113 #endif 114 } 105 115 106 116 //======================================================================= 107 117 // Lock-Free registering/unregistering of threads 108 unsigned register_proc_id( void) with(*__scheduler_lock) {118 void register_proc_id( struct __processor_id_t * proc ) with(*__scheduler_lock) { 109 119 __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc); 110 bool * handle = (bool *)&kernelTLS().sched_lock;111 120 112 121 // Step - 1 : check if there is already space in the data … … 115 124 // Check among all the ready 116 125 for(uint_fast32_t i = 0; i < s; i++) { 117 bool * volatile * cell = (bool * volatile *)&data[i]; // Cforall is bugged and the double volatiles causes problems 118 /* paranoid */ verify( handle != *cell ); 119 120 bool * null = 0p; // Re-write every loop since compare thrashes it 121 if( __atomic_load_n(cell, (int)__ATOMIC_RELAXED) == null 122 && __atomic_compare_exchange_n( cell, &null, handle, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 123 /* paranoid */ verify(i < ready); 124 /* paranoid */ verify( (kernelTLS().sched_id = i, true) ); 125 return i; 126 __processor_id_t * null = 0p; // Re-write every loop since compare thrashes it 127 if( __atomic_load_n(&data[i].handle, (int)__ATOMIC_RELAXED) == null 128 && __atomic_compare_exchange_n( &data[i].handle, &null, proc, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 129 /*paranoid*/ verify(i < ready); 130 /*paranoid*/ verify(0 == (__alignof__(data[i]) % cache_line_size)); 131 /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0); 132 proc->id = i; 126 133 } 127 134 } … … 134 141 135 142 // Step - 3 : Mark space as used and then publish it. 136 data[n] = handle; 143 __scheduler_lock_id_t * storage = (__scheduler_lock_id_t *)&data[n]; 144 (*storage){ proc }; 137 145 while() { 138 146 unsigned copy = n; … … 146 154 147 155 // Return new spot. 148 /* paranoid */ verify(n < ready); 149 /* paranoid */ verify( (kernelTLS().sched_id = n, true) ); 150 return n; 151 } 152 153 void unregister_proc_id( unsigned id ) with(*__scheduler_lock) { 154 /* paranoid */ verify(id < ready); 155 /* paranoid */ verify(id == kernelTLS().sched_id); 156 /* paranoid */ verify(data[id] == &kernelTLS().sched_lock); 157 158 bool * volatile * cell = (bool * volatile *)&data[id]; // Cforall is bugged and the double volatiles causes problems 159 160 __atomic_store_n(cell, 0p, __ATOMIC_RELEASE); 156 /*paranoid*/ verify(n < ready); 157 /*paranoid*/ verify(__alignof__(data[n]) == (2 * cache_line_size)); 158 /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0); 159 proc->id = n; 160 } 161 162 void unregister_proc_id( struct __processor_id_t * proc ) with(*__scheduler_lock) { 163 unsigned id = proc->id; 164 /*paranoid*/ verify(id < ready); 165 /*paranoid*/ verify(proc == __atomic_load_n(&data[id].handle, __ATOMIC_RELAXED)); 166 __atomic_store_n(&data[id].handle, 0p, __ATOMIC_RELEASE); 161 167 162 168 __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc); … … 168 174 uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) { 169 175 /* paranoid */ verify( ! __preemption_enabled() ); 170 /* paranoid */ verify( ! kernelTLS().sched_lock );171 176 172 177 // Step 1 : lock global lock 173 178 // It is needed to avoid processors that register mid Critical-Section 174 179 // to simply lock their own lock and enter. 175 __atomic_acquire( & write_lock );180 __atomic_acquire( &lock ); 176 181 177 182 // Step 2 : lock per-proc lock … … 181 186 uint_fast32_t s = ready; 182 187 for(uint_fast32_t i = 0; i < s; i++) { 183 volatile bool * llock = data[i]; 184 if(llock) __atomic_acquire( llock ); 188 __atomic_acquire( &data[i].lock ); 185 189 } 186 190 … … 199 203 // Alternative solution : return s in write_lock and pass it to write_unlock 200 204 for(uint_fast32_t i = 0; i < last_s; i++) { 201 v olatile bool * llock = data[i];202 if(llock) __atomic_store_n(llock, (bool)false, __ATOMIC_RELEASE);205 verify(data[i].lock); 206 __atomic_store_n(&data[i].lock, (bool)false, __ATOMIC_RELEASE); 203 207 } 204 208 205 209 // Step 2 : release global lock 206 /*paranoid*/ assert(true == write_lock);207 __atomic_store_n(& write_lock, (bool)false, __ATOMIC_RELEASE);210 /*paranoid*/ assert(true == lock); 211 __atomic_store_n(&lock, (bool)false, __ATOMIC_RELEASE); 208 212 209 213 /* paranoid */ verify( ! __preemption_enabled() ); … … 249 253 } 250 254 251 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd , bool push_local) with (cltr->ready_queue) {255 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) { 252 256 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr); 253 257 254 const bool external = !push_local ||(!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);258 const bool external = (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr); 255 259 /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count ); 260 261 // write timestamp 262 thrd->link.ts = rdtscl(); 256 263 257 264 bool local; … … 273 280 #endif 274 281 282 #if defined(USE_MPSC) 283 // mpsc always succeeds 284 } while( false ); 285 #else 275 286 // If we can't lock it retry 276 287 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); 288 #endif 277 289 278 290 // Actually push it 279 291 push(lanes.data[i], thrd); 280 292 281 // Unlock and return 282 __atomic_unlock( &lanes.data[i].lock ); 293 #if !defined(USE_MPSC) 294 // Unlock and return 295 __atomic_unlock( &lanes.data[i].lock ); 296 #endif 283 297 284 298 // Mark the current index in the tls rng instance as having an item … … 336 350 #endif 337 351 #if defined(USE_WORK_STEALING) 338 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd , bool push_local) with (cltr->ready_queue) {352 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) { 339 353 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr); 340 354 341 // #define USE_PREFERRED 342 #if !defined(USE_PREFERRED) 343 const bool external = !push_local || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr); 355 const bool external = (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr); 344 356 /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count ); 345 #else 346 unsigned preferred = thrd->preferred; 347 const bool external = push_local || (!kernelTLS().this_processor) || preferred == -1u || thrd->curr_cluster != cltr; 348 /* paranoid */ verifyf(external || preferred < lanes.count, "Invalid preferred queue %u for %u lanes", preferred, lanes.count ); 349 350 unsigned r = preferred % READYQ_SHARD_FACTOR; 351 const unsigned start = preferred - r; 352 #endif 357 358 // write timestamp 359 thrd->link.ts = rdtscl(); 353 360 354 361 // Try to pick a lane and lock it … … 364 371 } 365 372 else { 366 #if !defined(USE_PREFERRED) 367 processor * proc = kernelTLS().this_processor; 368 unsigned r = proc->rdq.its++; 369 i = proc->rdq.id + (r % READYQ_SHARD_FACTOR); 370 #else 371 i = start + (r++ % READYQ_SHARD_FACTOR); 372 #endif 373 processor * proc = kernelTLS().this_processor; 374 unsigned r = proc->rdq.its++; 375 i = proc->rdq.id + (r % READYQ_SHARD_FACTOR); 373 376 } 377 378 379 #if defined(USE_MPSC) 380 // mpsc always succeeds 381 } while( false ); 382 #else 374 383 // If we can't lock it retry 375 384 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); 385 #endif 376 386 377 387 // Actually push it 378 388 push(lanes.data[i], thrd); 379 389 380 // Unlock and return 381 __atomic_unlock( &lanes.data[i].lock ); 390 #if !defined(USE_MPSC) 391 // Unlock and return 392 __atomic_unlock( &lanes.data[i].lock ); 393 #endif 382 394 383 395 #if !defined(__CFA_NO_STATISTICS__) … … 398 410 399 411 if(proc->rdq.target == -1u) { 400 unsigned long long min = ts(lanes.data[proc->rdq.id]);401 for(int i = 0; i < READYQ_SHARD_FACTOR; i++) {402 unsigned long long tsc = ts(lanes.data[proc->rdq.id + i]);403 if(tsc < min) min = tsc;404 }405 proc->rdq.cutoff = min;406 412 proc->rdq.target = __tls_rand() % lanes.count; 413 unsigned it1 = proc->rdq.itr; 414 unsigned it2 = proc->rdq.itr + 1; 415 unsigned idx1 = proc->rdq.id + (it1 % READYQ_SHARD_FACTOR); 416 unsigned idx2 = proc->rdq.id + (it2 % READYQ_SHARD_FACTOR); 417 unsigned long long tsc1 = ts(lanes.data[idx1]); 418 unsigned long long tsc2 = ts(lanes.data[idx2]); 419 proc->rdq.cutoff = min(tsc1, tsc2); 420 if(proc->rdq.cutoff == 0) proc->rdq.cutoff = -1ull; 407 421 } 408 422 else { 409 423 unsigned target = proc->rdq.target; 410 424 proc->rdq.target = -1u; 411 const unsigned long long bias = 0; //2_500_000_000; 412 const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff; 413 if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) { 425 if(lanes.tscs[target].tv < proc->rdq.cutoff) { 414 426 $thread * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help)); 415 427 if(t) return t; … … 418 430 419 431 for(READYQ_SHARD_FACTOR) { 420 unsigned i = proc->rdq.id + ( proc->rdq.itr++% READYQ_SHARD_FACTOR);432 unsigned i = proc->rdq.id + (--proc->rdq.itr % READYQ_SHARD_FACTOR); 421 433 if($thread * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t; 422 434 } … … 450 462 // If list looks empty retry 451 463 if( is_empty(lane) ) { 464 __STATS( stats.espec++; ) 452 465 return 0p; 453 466 } … … 455 468 // If we can't get the lock retry 456 469 if( !__atomic_try_acquire(&lane.lock) ) { 470 __STATS( stats.elock++; ) 457 471 return 0p; 458 472 } … … 461 475 if( is_empty(lane) ) { 462 476 __atomic_unlock(&lane.lock); 477 __STATS( stats.eempty++; ) 463 478 return 0p; 464 479 } … … 466 481 // Actually pop the list 467 482 struct $thread * thrd; 468 unsigned long long tsv; 469 [thrd, tsv] = pop(lane); 483 thrd = pop(lane); 470 484 471 485 /* paranoid */ verify(thrd); 472 /* paranoid */ verify(tsv);473 486 /* paranoid */ verify(lane.lock); 474 487 … … 480 493 481 494 #if defined(USE_WORK_STEALING) 482 lanes.tscs[w].tv = t sv;495 lanes.tscs[w].tv = thrd->link.ts; 483 496 #endif 484 485 thrd->preferred = w;486 497 487 498 // return the popped thread … … 511 522 // Check that all the intrusive queues in the data structure are still consistent 512 523 static void check( __ready_queue_t & q ) with (q) { 513 #if defined(__CFA_WITH_VERIFY__) 524 #if defined(__CFA_WITH_VERIFY__) && !defined(USE_MPSC) 514 525 { 515 526 for( idx ; lanes.count ) { … … 517 528 assert(!lanes.data[idx].lock); 518 529 519 if(is_empty(sl)) { 520 assert( sl.anchor.next == 0p ); 521 assert( sl.anchor.ts == 0 ); 522 assert( mock_head(sl) == sl.prev ); 523 } else { 524 assert( sl.anchor.next != 0p ); 525 assert( sl.anchor.ts != 0 ); 526 assert( mock_head(sl) != sl.prev ); 527 } 530 assert(head(sl)->link.prev == 0p ); 531 assert(head(sl)->link.next->link.prev == head(sl) ); 532 assert(tail(sl)->link.next == 0p ); 533 assert(tail(sl)->link.prev->link.next == tail(sl) ); 534 535 if(is_empty(sl)) { 536 assert(tail(sl)->link.prev == head(sl)); 537 assert(head(sl)->link.next == tail(sl)); 538 } else { 539 assert(tail(sl)->link.prev != head(sl)); 540 assert(head(sl)->link.next != tail(sl)); 541 } 528 542 } 529 543 } … … 546 560 // fixes the list so that the pointers back to anchors aren't left dangling 547 561 static inline void fix(__intrusive_lane_t & ll) { 548 if(is_empty(ll)) { 549 verify(ll.anchor.next == 0p); 550 ll.prev = mock_head(ll); 551 } 552 } 553 554 static void assign_list(unsigned & value, dlist(processor) & list, unsigned count) { 562 #if !defined(USE_MPSC) 563 // if the list is not empty then follow he pointer and fix its reverse 564 if(!is_empty(ll)) { 565 head(ll)->link.next->link.prev = head(ll); 566 tail(ll)->link.prev->link.next = tail(ll); 567 } 568 // Otherwise just reset the list 569 else { 570 verify(tail(ll)->link.next == 0p); 571 tail(ll)->link.prev = head(ll); 572 head(ll)->link.next = tail(ll); 573 verify(head(ll)->link.prev == 0p); 574 } 575 #endif 576 } 577 578 static void assign_list(unsigned & value, dlist(processor, processor) & list, unsigned count) { 555 579 processor * it = &list`first; 556 580 for(unsigned i = 0; i < count; i++) { … … 573 597 lanes.tscs = alloc(lanes.count, lanes.tscs`realloc); 574 598 for(i; lanes.count) { 575 unsigned long long tsc = ts(lanes.data[i]); 576 lanes.tscs[i].tv = tsc != 0 ? tsc : rdtscl(); 599 lanes.tscs[i].tv = ts(lanes.data[i]); 577 600 } 578 601 #endif … … 663 686 while(!is_empty(lanes.data[idx])) { 664 687 struct $thread * thrd; 665 unsigned long long _; 666 [thrd, _] = pop(lanes.data[idx]); 667 668 push(cltr, thrd, true); 688 thrd = pop(lanes.data[idx]); 689 690 push(cltr, thrd); 669 691 670 692 // for printing count the number of displaced threads … … 703 725 /* paranoid */ verify( ready_mutate_islocked() ); 704 726 } 705 706 #if !defined(__CFA_NO_STATISTICS__)707 unsigned cnt(const __ready_queue_t & this, unsigned idx) {708 /* paranoid */ verify(this.lanes.count > idx);709 return this.lanes.data[idx].cnt;710 }711 #endif
Note:
See TracChangeset
for help on using the changeset viewer.