Changeset 1eb239e4
- Timestamp:
- Aug 10, 2020, 2:43:02 PM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 8465b4d
- Parents:
- 6c144d8
- Location:
- libcfa/src/concurrency
- Files:
-
- 7 edited
Legend:
- Unmodified
- Added
- Removed
-
TabularUnified libcfa/src/concurrency/alarm.hfa ¶
r6c144d8 r1eb239e4 23 23 #include "time.hfa" 24 24 25 #include <containers/list.hfa>25 #include "containers/list.hfa" 26 26 27 27 struct $thread; -
TabularUnified libcfa/src/concurrency/io/setup.cfa ¶
r6c144d8 r1eb239e4 228 228 if( cluster_context ) { 229 229 cluster & cltr = *thrd.curr_cluster; 230 /* paranoid */ verify( cltr. nprocessors== 0 || &cltr == mainCluster );230 /* paranoid */ verify( cltr.idles.total == 0 || &cltr == mainCluster ); 231 231 /* paranoid */ verify( !ready_mutate_islocked() ); 232 232 -
TabularUnified libcfa/src/concurrency/kernel.cfa ¶
r6c144d8 r1eb239e4 86 86 // Kernel Scheduling logic 87 87 static $thread * __next_thread(cluster * this); 88 static bool __has_next_thread(cluster * this);88 static $thread * __next_thread_slow(cluster * this); 89 89 static void __run_thread(processor * this, $thread * dst); 90 static bool __wake_one(struct __processor_id_t * id, cluster * cltr); 91 static void __halt(processor * this); 92 bool __wake_proc(processor *); 90 static void __wake_one(struct __processor_id_t * id, cluster * cltr); 91 92 static void push (__cluster_idles & idles, processor & proc); 93 static void remove(__cluster_idles & idles, processor & proc); 94 static [unsigned idle, unsigned total, * processor] query( & __cluster_idles idles ); 95 93 96 94 97 //============================================================================================= … … 118 121 119 122 $thread * readyThread = 0p; 120 for( unsigned int spin_count = 0;; spin_count++ ) { 123 MAIN_LOOP: 124 for() { 121 125 // Try to get the next thread 122 126 readyThread = __next_thread( this->cltr ); 123 127 124 // Check if we actually found a thread 125 if( readyThread ) { 126 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 127 /* paranoid */ verifyf( readyThread->state == Ready || readyThread->preempted != __NO_PREEMPTION, "state : %d, preempted %d\n", readyThread->state, readyThread->preempted); 128 /* paranoid */ verifyf( readyThread->link.next == 0p, "Expected null got %p", readyThread->link.next ); 129 __builtin_prefetch( readyThread->context.SP ); 130 131 // We found a thread run it 132 __run_thread(this, readyThread); 133 134 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 128 if( !readyThread ) { 129 readyThread = __next_thread_slow( this->cltr ); 135 130 } 136 131 137 if(__atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST)) break; 138 132 HALT: 139 133 if( !readyThread ) { 140 // Block until a thread is ready 141 __halt(this); 134 // Don't block if we are done 135 if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP; 136 137 #if !defined(__CFA_NO_STATISTICS__) 138 __tls_stats()->ready.sleep.halts++; 139 #endif 140 141 // Push self to idle stack 142 push(this->cltr->idles, * this); 143 144 // Confirm the ready-queue is empty 145 readyThread = __next_thread_slow( this->cltr ); 146 if( readyThread ) { 147 // A thread was found, cancel the halt 148 remove(this->cltr->idles, * this); 149 150 #if !defined(__CFA_NO_STATISTICS__) 151 __tls_stats()->ready.sleep.cancels++; 152 #endif 153 154 // continue the mai loop 155 break HALT; 156 } 157 158 #if !defined(__CFA_NO_STATISTICS__) 159 if(this->print_halts) { 160 __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 0\n", this->id, rdtscl()); 161 } 162 #endif 163 164 wait( this->idle ); 165 166 #if !defined(__CFA_NO_STATISTICS__) 167 if(this->print_halts) { 168 __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 1\n", this->id, rdtscl()); 169 } 170 #endif 171 172 // We were woken up, remove self from idle 173 remove(this->cltr->idles, * this); 174 175 // DON'T just proceed, start looking again 176 continue MAIN_LOOP; 142 177 } 178 179 /* paranoid */ verify( readyThread ); 180 181 // We found a thread run it 182 __run_thread(this, readyThread); 183 184 // Are we done? 185 if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP; 143 186 } 144 187 … … 165 208 // from the processor coroutine to the target thread 166 209 static void __run_thread(processor * this, $thread * thrd_dst) { 210 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 211 /* paranoid */ verifyf( thrd_dst->state == Ready || thrd_dst->preempted != __NO_PREEMPTION, "state : %d, preempted %d\n", thrd_dst->state, thrd_dst->preempted); 212 /* paranoid */ verifyf( thrd_dst->link.next == 0p, "Expected null got %p", thrd_dst->link.next ); 213 __builtin_prefetch( thrd_dst->context.SP ); 214 167 215 $coroutine * proc_cor = get_coroutine(this->runner); 168 216 … … 244 292 proc_cor->state = Active; 245 293 kernelTLS.this_thread = 0p; 294 295 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 246 296 } 247 297 … … 300 350 ready_schedule_lock ( id ); 301 351 push( thrd->curr_cluster, thrd ); 302 303 #if !defined(__CFA_NO_STATISTICS__) 304 bool woke = 305 #endif 306 __wake_one(id, thrd->curr_cluster); 307 308 #if !defined(__CFA_NO_STATISTICS__) 309 if(woke) __tls_stats()->ready.sleep.wakes++; 310 #endif 352 __wake_one(id, thrd->curr_cluster); 311 353 ready_schedule_unlock( id ); 312 354 … … 315 357 316 358 // KERNEL ONLY 317 static $thread * __next_thread(cluster * this) with( *this ) {359 static inline $thread * __next_thread(cluster * this) with( *this ) { 318 360 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 319 361 320 362 ready_schedule_lock ( (__processor_id_t*)kernelTLS.this_processor ); 321 $thread * head = pop( this );363 $thread * thrd = pop( this ); 322 364 ready_schedule_unlock( (__processor_id_t*)kernelTLS.this_processor ); 323 365 324 366 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 325 return head;367 return thrd; 326 368 } 327 369 328 370 // KERNEL ONLY 329 static bool __has_next_thread(cluster * this) with( *this ) {371 static inline $thread * __next_thread_slow(cluster * this) with( *this ) { 330 372 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 331 373 332 374 ready_schedule_lock ( (__processor_id_t*)kernelTLS.this_processor ); 333 bool not_empty = query( this );375 $thread * thrd = pop_slow( this ); 334 376 ready_schedule_unlock( (__processor_id_t*)kernelTLS.this_processor ); 335 377 336 378 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 337 return not_empty;379 return thrd; 338 380 } 339 381 … … 425 467 //============================================================================================= 426 468 // Wake a thread from the front if there are any 427 static bool __wake_one(struct __processor_id_t * id, cluster * this) { 469 static void __wake_one(struct __processor_id_t * id, cluster * this) { 470 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 428 471 /* paranoid */ verify( ready_schedule_islocked( id ) ); 429 472 430 473 // Check if there is a sleeping processor 431 processor * p = pop(this->idles); 474 processor * p; 475 unsigned idle; 476 unsigned total; 477 [idle, total, p] = query(this->idles); 432 478 433 479 // If no one is sleeping, we are done 434 if( 0p == p ) return false;480 if( idle == 0 ) return; 435 481 436 482 // We found a processor, wake it up 437 483 post( p->idle ); 438 484 439 return true; 485 #if !defined(__CFA_NO_STATISTICS__) 486 __tls_stats()->ready.sleep.wakes++; 487 #endif 488 489 /* paranoid */ verify( ready_schedule_islocked( id ) ); 490 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 491 492 return; 440 493 } 441 494 442 495 // Unconditionnaly wake a thread 443 bool__wake_proc(processor * this) {496 void __wake_proc(processor * this) { 444 497 __cfadbg_print_safe(runtime_core, "Kernel : waking Processor %p\n", this); 445 498 … … 448 501 bool ret = post( this->idle ); 449 502 enable_interrupts( __cfaabi_dbg_ctx ); 450 451 return ret; 452 } 453 454 static void __halt(processor * this) with( *this ) { 455 if( do_terminate ) return; 456 457 #if !defined(__CFA_NO_STATISTICS__) 458 __tls_stats()->ready.sleep.halts++; 459 #endif 460 // Push self to queue 461 push(cltr->idles, *this); 462 463 // Makre sure we don't miss a thread 464 if( __has_next_thread(cltr) ) { 465 // A thread was posted, make sure a processor is woken up 466 struct __processor_id_t *id = (struct __processor_id_t *) this; 467 ready_schedule_lock ( id ); 468 __wake_one( id, cltr ); 469 ready_schedule_unlock( id ); 470 #if !defined(__CFA_NO_STATISTICS__) 471 __tls_stats()->ready.sleep.cancels++; 472 #endif 473 } 474 475 #if !defined(__CFA_NO_STATISTICS__) 476 if(this->print_halts) { 477 __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 0\n", this->id, rdtscl()); 478 } 479 #endif 480 481 wait( idle ); 482 483 #if !defined(__CFA_NO_STATISTICS__) 484 if(this->print_halts) { 485 __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 1\n", this->id, rdtscl()); 486 } 487 #endif 503 } 504 505 static void push (__cluster_idles & this, processor & proc) { 506 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 507 lock( this ); 508 this.idle++; 509 /* paranoid */ verify( this.idle <= this.total ); 510 511 insert_first(this.list, proc); 512 unlock( this ); 513 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 514 } 515 516 static void remove(__cluster_idles & this, processor & proc) { 517 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 518 lock( this ); 519 this.idle--; 520 /* paranoid */ verify( this.idle >= 0 ); 521 522 remove(proc); 523 unlock( this ); 524 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 525 } 526 527 static [unsigned idle, unsigned total, * processor] query( & __cluster_idles this ) { 528 for() { 529 uint64_t l = __atomic_load_n(&this.lock, __ATOMIC_SEQ_CST); 530 if( 1 == (l % 2) ) { Pause(); continue; } 531 unsigned idle = this.idle; 532 unsigned total = this.total; 533 processor * proc = &this.list`first; 534 if(l != __atomic_load_n(&this.lock, __ATOMIC_SEQ_CST)) { Pause(); continue; } 535 return [idle, total, proc]; 536 } 488 537 } 489 538 -
TabularUnified libcfa/src/concurrency/kernel.hfa ¶
r6c144d8 r1eb239e4 20 20 #include "coroutine.hfa" 21 21 22 #include "containers/ stackLockFree.hfa"22 #include "containers/list.hfa" 23 23 24 24 extern "C" { … … 99 99 100 100 // Link lists fields 101 Link(processor) link;101 DLISTED_MGD_IMPL_IN(processor) 102 102 103 103 #if !defined(__CFA_NO_STATISTICS__) … … 119 119 static inline void ?{}(processor & this, const char name[]) { this{name, *mainCluster }; } 120 120 121 static inline Link(processor) * ?`next( processor * this ) { return &this->link; } 121 DLISTED_MGD_IMPL_OUT(processor) 122 122 123 123 //----------------------------------------------------------------------------- … … 206 206 void ^?{}(__ready_queue_t & this); 207 207 208 // Idle Sleep 209 struct __cluster_idles { 210 // Spin lock protecting the queue 211 volatile uint64_t lock; 212 213 // Total number of processors 214 unsigned total; 215 216 // Total number of idle processors 217 unsigned idle; 218 219 // List of idle processors 220 dlist(processor, processor) list; 221 }; 222 208 223 //----------------------------------------------------------------------------- 209 224 // Cluster … … 219 234 220 235 // List of idle processors 221 StackLF(processor) idles; 222 volatile unsigned int nprocessors; 236 __cluster_idles idles; 223 237 224 238 // List of threads -
TabularUnified libcfa/src/concurrency/kernel/startup.cfa ¶
r6c144d8 r1eb239e4 87 87 //----------------------------------------------------------------------------- 88 88 // Other Forward Declarations 89 extern bool__wake_proc(processor *);89 extern void __wake_proc(processor *); 90 90 91 91 //----------------------------------------------------------------------------- … … 475 475 #endif 476 476 477 int target = __atomic_add_fetch( &cltr->nprocessors, 1u, __ATOMIC_SEQ_CST ); 477 lock( this.cltr->idles ); 478 int target = this.cltr->idles.total += 1u; 479 unlock( this.cltr->idles ); 478 480 479 481 id = doregister((__processor_id_t*)&this); … … 493 495 // Not a ctor, it just preps the destruction but should not destroy members 494 496 static void deinit(processor & this) { 495 496 int target = __atomic_sub_fetch( &this.cltr->nprocessors, 1u, __ATOMIC_SEQ_CST ); 497 lock( this.cltr->idles ); 498 int target = this.cltr->idles.total -= 1u; 499 unlock( this.cltr->idles ); 497 500 498 501 // Lock the RWlock so no-one pushes/pops while we are changing the queue … … 501 504 // Adjust the ready queue size 502 505 ready_queue_shrink( this.cltr, target ); 503 504 // Make sure we aren't on the idle queue505 unsafe_remove( this.cltr->idles, &this );506 506 507 507 // Unlock the RWlock … … 545 545 //----------------------------------------------------------------------------- 546 546 // Cluster 547 static void ?{}(__cluster_idles & this) { 548 this.lock = 0; 549 this.idle = 0; 550 this.total = 0; 551 (this.list){}; 552 } 553 547 554 void ?{}(cluster & this, const char name[], Duration preemption_rate, unsigned num_io, const io_context_params & io_params) with( this ) { 548 555 this.name = name; 549 556 this.preemption_rate = preemption_rate; 550 this.nprocessors = 0;551 557 ready_queue{}; 552 558 -
TabularUnified libcfa/src/concurrency/kernel_private.hfa ¶
r6c144d8 r1eb239e4 121 121 void unregister( struct __processor_id_t * proc ); 122 122 123 //----------------------------------------------------------------------- 124 // Cluster idle lock/unlock 125 static inline void lock(__cluster_idles & this) { 126 for() { 127 uint64_t l = this.lock; 128 if( 129 (0 == (l % 2)) 130 && __atomic_compare_exchange_n(&this.lock, &l, l + 1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) 131 ) return; 132 Pause(); 133 } 134 } 135 136 static inline void unlock(__cluster_idles & this) { 137 /* paranoid */ verify( 1 == (this.lock % 2) ); 138 __atomic_fetch_add( &this.lock, 1, __ATOMIC_SEQ_CST ); 139 } 140 123 141 //======================================================================= 124 142 // Reader-writer lock implementation … … 248 266 // pop thread from the ready queue of a cluster 249 267 // returns 0p if empty 268 // May return 0p spuriously 250 269 __attribute__((hot)) struct $thread * pop(struct cluster * cltr); 270 271 //----------------------------------------------------------------------- 272 // pop thread from the ready queue of a cluster 273 // returns 0p if empty 274 // guaranteed to find any threads added before this call 275 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr); 251 276 252 277 //----------------------------------------------------------------------- -
TabularUnified libcfa/src/concurrency/ready_queue.cfa ¶
r6c144d8 r1eb239e4 17 17 // #define __CFA_DEBUG_PRINT_READY_QUEUE__ 18 18 19 // #define USE_SNZI 20 19 21 #include "bits/defs.hfa" 20 22 #include "kernel_private.hfa" … … 192 194 void ^?{}(__ready_queue_t & this) with (this) { 193 195 verify( 1 == lanes.count ); 194 verify( !query( snzi ) ); 196 #ifdef USE_SNZI 197 verify( !query( snzi ) ); 198 #endif 195 199 free(lanes.data); 196 200 } … … 198 202 //----------------------------------------------------------------------- 199 203 __attribute__((hot)) bool query(struct cluster * cltr) { 200 return query(cltr->ready_queue.snzi); 204 #ifdef USE_SNZI 205 return query(cltr->ready_queue.snzi); 206 #endif 207 return true; 201 208 } 202 209 … … 262 269 bool lane_first = push(lanes.data[i], thrd); 263 270 264 // If this lane used to be empty we need to do more 265 if(lane_first) { 266 // Check if the entire queue used to be empty 267 first = !query(snzi); 268 269 // Update the snzi 270 arrive( snzi, i ); 271 } 271 #ifdef USE_SNZI 272 // If this lane used to be empty we need to do more 273 if(lane_first) { 274 // Check if the entire queue used to be empty 275 first = !query(snzi); 276 277 // Update the snzi 278 arrive( snzi, i ); 279 } 280 #endif 272 281 273 282 // Unlock and return … … 294 303 __attribute__((hot)) $thread * pop(struct cluster * cltr) with (cltr->ready_queue) { 295 304 /* paranoid */ verify( lanes.count > 0 ); 305 unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED ); 296 306 #if defined(BIAS) 297 307 // Don't bother trying locally too much … … 300 310 301 311 // As long as the list is not empty, try finding a lane that isn't empty and pop from it 302 while( query(snzi) ) { 312 #ifdef USE_SNZI 313 while( query(snzi) ) { 314 #else 315 for(25) { 316 #endif 303 317 // Pick two lists at random 304 318 unsigned i,j; … … 336 350 #endif 337 351 338 i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );339 j %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );352 i %= count; 353 j %= count; 340 354 341 355 // try popping from the 2 picked lists … … 353 367 } 354 368 369 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) { 370 /* paranoid */ verify( lanes.count > 0 ); 371 unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED ); 372 unsigned offset = __tls_rand(); 373 for(i; count) { 374 unsigned idx = (offset + i) % count; 375 struct $thread * thrd = try_pop(cltr, idx); 376 if(thrd) { 377 return thrd; 378 } 379 } 380 381 // All lanes where empty return 0p 382 return 0p; 383 } 384 385 355 386 //----------------------------------------------------------------------- 356 387 // Given 2 indexes, pick the list with the oldest push an try to pop from it … … 394 425 /* paranoid */ verify(lane.lock); 395 426 396 // If this was the last element in the lane 397 if(emptied) { 398 depart( snzi, w ); 399 } 427 #ifdef USE_SNZI 428 // If this was the last element in the lane 429 if(emptied) { 430 depart( snzi, w ); 431 } 432 #endif 400 433 401 434 // Unlock and return … … 430 463 431 464 removed = true; 432 if(emptied) { 433 depart( snzi, i ); 434 } 465 #ifdef USE_SNZI 466 if(emptied) { 467 depart( snzi, i ); 468 } 469 #endif 435 470 } 436 471 __atomic_unlock(&lane.lock); … … 494 529 // grow the ready queue 495 530 with( cltr->ready_queue ) { 496 ^(snzi){}; 531 #ifdef USE_SNZI 532 ^(snzi){}; 533 #endif 497 534 498 535 // Find new count … … 516 553 lanes.count = ncount; 517 554 518 // Re-create the snzi 519 snzi{ log2( lanes.count / 8 ) }; 520 for( idx; (size_t)lanes.count ) { 521 if( !is_empty(lanes.data[idx]) ) { 522 arrive(snzi, idx); 523 } 524 } 555 #ifdef USE_SNZI 556 // Re-create the snzi 557 snzi{ log2( lanes.count / 8 ) }; 558 for( idx; (size_t)lanes.count ) { 559 if( !is_empty(lanes.data[idx]) ) { 560 arrive(snzi, idx); 561 } 562 } 563 #endif 525 564 } 526 565 … … 542 581 543 582 with( cltr->ready_queue ) { 544 ^(snzi){}; 583 #ifdef USE_SNZI 584 ^(snzi){}; 585 #endif 545 586 546 587 // Remember old count … … 596 637 } 597 638 598 // Re-create the snzi 599 snzi{ log2( lanes.count / 8 ) }; 600 for( idx; (size_t)lanes.count ) { 601 if( !is_empty(lanes.data[idx]) ) { 602 arrive(snzi, idx); 603 } 604 } 639 #ifdef USE_SNZI 640 // Re-create the snzi 641 snzi{ log2( lanes.count / 8 ) }; 642 for( idx; (size_t)lanes.count ) { 643 if( !is_empty(lanes.data[idx]) ) { 644 arrive(snzi, idx); 645 } 646 } 647 #endif 605 648 } 606 649
Note: See TracChangeset
for help on using the changeset viewer.