Changes in / [8465b4d:e2702fd]
- Location:
- libcfa/src/concurrency
- Files:
-
- 7 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/alarm.hfa
r8465b4d re2702fd 23 23 #include "time.hfa" 24 24 25 #include "containers/list.hfa"25 #include <containers/list.hfa> 26 26 27 27 struct $thread; -
libcfa/src/concurrency/io/setup.cfa
r8465b4d re2702fd 228 228 if( cluster_context ) { 229 229 cluster & cltr = *thrd.curr_cluster; 230 /* paranoid */ verify( cltr. idles.total== 0 || &cltr == mainCluster );230 /* paranoid */ verify( cltr.nprocessors == 0 || &cltr == mainCluster ); 231 231 /* paranoid */ verify( !ready_mutate_islocked() ); 232 232 -
libcfa/src/concurrency/kernel.cfa
r8465b4d re2702fd 86 86 // Kernel Scheduling logic 87 87 static $thread * __next_thread(cluster * this); 88 static $thread * __next_thread_slow(cluster * this);88 static bool __has_next_thread(cluster * this); 89 89 static void __run_thread(processor * this, $thread * dst); 90 static void __wake_one(struct __processor_id_t * id, cluster * cltr); 91 92 static void push (__cluster_idles & idles, processor & proc); 93 static void remove(__cluster_idles & idles, processor & proc); 94 static [unsigned idle, unsigned total, * processor] query( & __cluster_idles idles ); 95 90 static bool __wake_one(struct __processor_id_t * id, cluster * cltr); 91 static void __halt(processor * this); 92 bool __wake_proc(processor *); 96 93 97 94 //============================================================================================= … … 121 118 122 119 $thread * readyThread = 0p; 123 MAIN_LOOP: 124 for() { 120 for( unsigned int spin_count = 0;; spin_count++ ) { 125 121 // Try to get the next thread 126 122 readyThread = __next_thread( this->cltr ); 127 123 124 // Check if we actually found a thread 125 if( readyThread ) { 126 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 127 /* paranoid */ verifyf( readyThread->state == Ready || readyThread->preempted != __NO_PREEMPTION, "state : %d, preempted %d\n", readyThread->state, readyThread->preempted); 128 /* paranoid */ verifyf( readyThread->link.next == 0p, "Expected null got %p", readyThread->link.next ); 129 __builtin_prefetch( readyThread->context.SP ); 130 131 // We found a thread run it 132 __run_thread(this, readyThread); 133 134 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 135 } 136 137 if(__atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST)) break; 138 128 139 if( !readyThread ) { 129 readyThread = __next_thread_slow( this->cltr ); 140 // Block until a thread is ready 141 __halt(this); 130 142 } 131 132 HALT:133 if( !readyThread ) {134 // Don't block if we are done135 if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP;136 137 #if !defined(__CFA_NO_STATISTICS__)138 __tls_stats()->ready.sleep.halts++;139 #endif140 141 // Push self to idle stack142 push(this->cltr->idles, * this);143 144 // Confirm the ready-queue is empty145 readyThread = __next_thread_slow( this->cltr );146 if( readyThread ) {147 // A thread was found, cancel the halt148 remove(this->cltr->idles, * this);149 150 #if !defined(__CFA_NO_STATISTICS__)151 __tls_stats()->ready.sleep.cancels++;152 #endif153 154 // continue the mai loop155 break HALT;156 }157 158 #if !defined(__CFA_NO_STATISTICS__)159 if(this->print_halts) {160 __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 0\n", this->id, rdtscl());161 }162 #endif163 164 wait( this->idle );165 166 #if !defined(__CFA_NO_STATISTICS__)167 if(this->print_halts) {168 __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 1\n", this->id, rdtscl());169 }170 #endif171 172 // We were woken up, remove self from idle173 remove(this->cltr->idles, * this);174 175 // DON'T just proceed, start looking again176 continue MAIN_LOOP;177 }178 179 /* paranoid */ verify( readyThread );180 181 // We found a thread run it182 __run_thread(this, readyThread);183 184 // Are we done?185 if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP;186 143 } 187 144 … … 208 165 // from the processor coroutine to the target thread 209 166 static void __run_thread(processor * this, $thread * thrd_dst) { 210 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );211 /* paranoid */ verifyf( thrd_dst->state == Ready || thrd_dst->preempted != __NO_PREEMPTION, "state : %d, preempted %d\n", thrd_dst->state, thrd_dst->preempted);212 /* paranoid */ verifyf( thrd_dst->link.next == 0p, "Expected null got %p", thrd_dst->link.next );213 __builtin_prefetch( thrd_dst->context.SP );214 215 167 $coroutine * proc_cor = get_coroutine(this->runner); 216 168 … … 292 244 proc_cor->state = Active; 293 245 kernelTLS.this_thread = 0p; 294 295 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );296 246 } 297 247 … … 350 300 ready_schedule_lock ( id ); 351 301 push( thrd->curr_cluster, thrd ); 352 __wake_one(id, thrd->curr_cluster); 302 303 #if !defined(__CFA_NO_STATISTICS__) 304 bool woke = 305 #endif 306 __wake_one(id, thrd->curr_cluster); 307 308 #if !defined(__CFA_NO_STATISTICS__) 309 if(woke) __tls_stats()->ready.sleep.wakes++; 310 #endif 353 311 ready_schedule_unlock( id ); 354 312 … … 357 315 358 316 // KERNEL ONLY 359 static inline$thread * __next_thread(cluster * this) with( *this ) {317 static $thread * __next_thread(cluster * this) with( *this ) { 360 318 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 361 319 362 320 ready_schedule_lock ( (__processor_id_t*)kernelTLS.this_processor ); 363 $thread * thrd = pop( this );321 $thread * head = pop( this ); 364 322 ready_schedule_unlock( (__processor_id_t*)kernelTLS.this_processor ); 365 323 366 324 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 367 return thrd;325 return head; 368 326 } 369 327 370 328 // KERNEL ONLY 371 static inline $thread * __next_thread_slow(cluster * this) with( *this ) {329 static bool __has_next_thread(cluster * this) with( *this ) { 372 330 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 373 331 374 332 ready_schedule_lock ( (__processor_id_t*)kernelTLS.this_processor ); 375 $thread * thrd = pop_slow( this );333 bool not_empty = query( this ); 376 334 ready_schedule_unlock( (__processor_id_t*)kernelTLS.this_processor ); 377 335 378 336 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 379 return thrd;337 return not_empty; 380 338 } 381 339 … … 467 425 //============================================================================================= 468 426 // Wake a thread from the front if there are any 469 static void __wake_one(struct __processor_id_t * id, cluster * this) { 470 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 427 static bool __wake_one(struct __processor_id_t * id, cluster * this) { 471 428 /* paranoid */ verify( ready_schedule_islocked( id ) ); 472 429 473 430 // Check if there is a sleeping processor 474 processor * p; 475 unsigned idle; 476 unsigned total; 477 [idle, total, p] = query(this->idles); 431 processor * p = pop(this->idles); 478 432 479 433 // If no one is sleeping, we are done 480 if( idle == 0 ) return;434 if( 0p == p ) return false; 481 435 482 436 // We found a processor, wake it up 483 437 post( p->idle ); 484 438 485 #if !defined(__CFA_NO_STATISTICS__) 486 __tls_stats()->ready.sleep.wakes++; 487 #endif 488 489 /* paranoid */ verify( ready_schedule_islocked( id ) ); 490 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 491 492 return; 439 return true; 493 440 } 494 441 495 442 // Unconditionnaly wake a thread 496 void__wake_proc(processor * this) {443 bool __wake_proc(processor * this) { 497 444 __cfadbg_print_safe(runtime_core, "Kernel : waking Processor %p\n", this); 498 445 … … 501 448 bool ret = post( this->idle ); 502 449 enable_interrupts( __cfaabi_dbg_ctx ); 503 } 504 505 static void push (__cluster_idles & this, processor & proc) { 506 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 507 lock( this ); 508 this.idle++; 509 /* paranoid */ verify( this.idle <= this.total ); 510 511 insert_first(this.list, proc); 512 unlock( this ); 513 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 514 } 515 516 static void remove(__cluster_idles & this, processor & proc) { 517 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 518 lock( this ); 519 this.idle--; 520 /* paranoid */ verify( this.idle >= 0 ); 521 522 remove(proc); 523 unlock( this ); 524 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 525 } 526 527 static [unsigned idle, unsigned total, * processor] query( & __cluster_idles this ) { 528 for() { 529 uint64_t l = __atomic_load_n(&this.lock, __ATOMIC_SEQ_CST); 530 if( 1 == (l % 2) ) { Pause(); continue; } 531 unsigned idle = this.idle; 532 unsigned total = this.total; 533 processor * proc = &this.list`first; 534 if(l != __atomic_load_n(&this.lock, __ATOMIC_SEQ_CST)) { Pause(); continue; } 535 return [idle, total, proc]; 536 } 450 451 return ret; 452 } 453 454 static void __halt(processor * this) with( *this ) { 455 if( do_terminate ) return; 456 457 #if !defined(__CFA_NO_STATISTICS__) 458 __tls_stats()->ready.sleep.halts++; 459 #endif 460 // Push self to queue 461 push(cltr->idles, *this); 462 463 // Makre sure we don't miss a thread 464 if( __has_next_thread(cltr) ) { 465 // A thread was posted, make sure a processor is woken up 466 struct __processor_id_t *id = (struct __processor_id_t *) this; 467 ready_schedule_lock ( id ); 468 __wake_one( id, cltr ); 469 ready_schedule_unlock( id ); 470 #if !defined(__CFA_NO_STATISTICS__) 471 __tls_stats()->ready.sleep.cancels++; 472 #endif 473 } 474 475 #if !defined(__CFA_NO_STATISTICS__) 476 if(this->print_halts) { 477 __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 0\n", this->id, rdtscl()); 478 } 479 #endif 480 481 wait( idle ); 482 483 #if !defined(__CFA_NO_STATISTICS__) 484 if(this->print_halts) { 485 __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 1\n", this->id, rdtscl()); 486 } 487 #endif 537 488 } 538 489 -
libcfa/src/concurrency/kernel.hfa
r8465b4d re2702fd 20 20 #include "coroutine.hfa" 21 21 22 #include "containers/ list.hfa"22 #include "containers/stackLockFree.hfa" 23 23 24 24 extern "C" { … … 99 99 100 100 // Link lists fields 101 DLISTED_MGD_IMPL_IN(processor)101 Link(processor) link; 102 102 103 103 #if !defined(__CFA_NO_STATISTICS__) … … 119 119 static inline void ?{}(processor & this, const char name[]) { this{name, *mainCluster }; } 120 120 121 DLISTED_MGD_IMPL_OUT(processor) 121 static inline Link(processor) * ?`next( processor * this ) { return &this->link; } 122 122 123 123 //----------------------------------------------------------------------------- … … 206 206 void ^?{}(__ready_queue_t & this); 207 207 208 // Idle Sleep209 struct __cluster_idles {210 // Spin lock protecting the queue211 volatile uint64_t lock;212 213 // Total number of processors214 unsigned total;215 216 // Total number of idle processors217 unsigned idle;218 219 // List of idle processors220 dlist(processor, processor) list;221 };222 223 208 //----------------------------------------------------------------------------- 224 209 // Cluster … … 234 219 235 220 // List of idle processors 236 __cluster_idles idles; 221 StackLF(processor) idles; 222 volatile unsigned int nprocessors; 237 223 238 224 // List of threads -
libcfa/src/concurrency/kernel/startup.cfa
r8465b4d re2702fd 87 87 //----------------------------------------------------------------------------- 88 88 // Other Forward Declarations 89 extern void__wake_proc(processor *);89 extern bool __wake_proc(processor *); 90 90 91 91 //----------------------------------------------------------------------------- … … 475 475 #endif 476 476 477 lock( this.cltr->idles ); 478 int target = this.cltr->idles.total += 1u; 479 unlock( this.cltr->idles ); 477 int target = __atomic_add_fetch( &cltr->nprocessors, 1u, __ATOMIC_SEQ_CST ); 480 478 481 479 id = doregister((__processor_id_t*)&this); … … 495 493 // Not a ctor, it just preps the destruction but should not destroy members 496 494 static void deinit(processor & this) { 497 lock( this.cltr->idles ); 498 int target = this.cltr->idles.total -= 1u; 499 unlock( this.cltr->idles ); 495 496 int target = __atomic_sub_fetch( &this.cltr->nprocessors, 1u, __ATOMIC_SEQ_CST ); 500 497 501 498 // Lock the RWlock so no-one pushes/pops while we are changing the queue … … 504 501 // Adjust the ready queue size 505 502 ready_queue_shrink( this.cltr, target ); 503 504 // Make sure we aren't on the idle queue 505 unsafe_remove( this.cltr->idles, &this ); 506 506 507 507 // Unlock the RWlock … … 545 545 //----------------------------------------------------------------------------- 546 546 // Cluster 547 static void ?{}(__cluster_idles & this) {548 this.lock = 0;549 this.idle = 0;550 this.total = 0;551 (this.list){};552 }553 554 547 void ?{}(cluster & this, const char name[], Duration preemption_rate, unsigned num_io, const io_context_params & io_params) with( this ) { 555 548 this.name = name; 556 549 this.preemption_rate = preemption_rate; 550 this.nprocessors = 0; 557 551 ready_queue{}; 558 552 -
libcfa/src/concurrency/kernel_private.hfa
r8465b4d re2702fd 121 121 void unregister( struct __processor_id_t * proc ); 122 122 123 //-----------------------------------------------------------------------124 // Cluster idle lock/unlock125 static inline void lock(__cluster_idles & this) {126 for() {127 uint64_t l = this.lock;128 if(129 (0 == (l % 2))130 && __atomic_compare_exchange_n(&this.lock, &l, l + 1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)131 ) return;132 Pause();133 }134 }135 136 static inline void unlock(__cluster_idles & this) {137 /* paranoid */ verify( 1 == (this.lock % 2) );138 __atomic_fetch_add( &this.lock, 1, __ATOMIC_SEQ_CST );139 }140 141 123 //======================================================================= 142 124 // Reader-writer lock implementation … … 266 248 // pop thread from the ready queue of a cluster 267 249 // returns 0p if empty 268 // May return 0p spuriously269 250 __attribute__((hot)) struct $thread * pop(struct cluster * cltr); 270 271 //-----------------------------------------------------------------------272 // pop thread from the ready queue of a cluster273 // returns 0p if empty274 // guaranteed to find any threads added before this call275 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr);276 251 277 252 //----------------------------------------------------------------------- -
libcfa/src/concurrency/ready_queue.cfa
r8465b4d re2702fd 17 17 // #define __CFA_DEBUG_PRINT_READY_QUEUE__ 18 18 19 // #define USE_SNZI20 21 19 #include "bits/defs.hfa" 22 20 #include "kernel_private.hfa" … … 194 192 void ^?{}(__ready_queue_t & this) with (this) { 195 193 verify( 1 == lanes.count ); 196 #ifdef USE_SNZI 197 verify( !query( snzi ) ); 198 #endif 194 verify( !query( snzi ) ); 199 195 free(lanes.data); 200 196 } … … 202 198 //----------------------------------------------------------------------- 203 199 __attribute__((hot)) bool query(struct cluster * cltr) { 204 #ifdef USE_SNZI 205 return query(cltr->ready_queue.snzi); 206 #endif 207 return true; 200 return query(cltr->ready_queue.snzi); 208 201 } 209 202 … … 269 262 bool lane_first = push(lanes.data[i], thrd); 270 263 271 #ifdef USE_SNZI 272 // If this lane used to be empty we need to do more 273 if(lane_first) { 274 // Check if the entire queue used to be empty 275 first = !query(snzi); 276 277 // Update the snzi 278 arrive( snzi, i ); 279 } 280 #endif 264 // If this lane used to be empty we need to do more 265 if(lane_first) { 266 // Check if the entire queue used to be empty 267 first = !query(snzi); 268 269 // Update the snzi 270 arrive( snzi, i ); 271 } 281 272 282 273 // Unlock and return … … 303 294 __attribute__((hot)) $thread * pop(struct cluster * cltr) with (cltr->ready_queue) { 304 295 /* paranoid */ verify( lanes.count > 0 ); 305 unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );306 296 #if defined(BIAS) 307 297 // Don't bother trying locally too much … … 310 300 311 301 // As long as the list is not empty, try finding a lane that isn't empty and pop from it 312 #ifdef USE_SNZI 313 while( query(snzi) ) { 314 #else 315 for(25) { 316 #endif 302 while( query(snzi) ) { 317 303 // Pick two lists at random 318 304 unsigned i,j; … … 350 336 #endif 351 337 352 i %= count;353 j %= count;338 i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED ); 339 j %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED ); 354 340 355 341 // try popping from the 2 picked lists … … 367 353 } 368 354 369 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {370 /* paranoid */ verify( lanes.count > 0 );371 unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );372 unsigned offset = __tls_rand();373 for(i; count) {374 unsigned idx = (offset + i) % count;375 struct $thread * thrd = try_pop(cltr, idx);376 if(thrd) {377 return thrd;378 }379 }380 381 // All lanes where empty return 0p382 return 0p;383 }384 385 386 355 //----------------------------------------------------------------------- 387 356 // Given 2 indexes, pick the list with the oldest push an try to pop from it … … 425 394 /* paranoid */ verify(lane.lock); 426 395 427 #ifdef USE_SNZI 428 // If this was the last element in the lane 429 if(emptied) { 430 depart( snzi, w ); 431 } 432 #endif 396 // If this was the last element in the lane 397 if(emptied) { 398 depart( snzi, w ); 399 } 433 400 434 401 // Unlock and return … … 463 430 464 431 removed = true; 465 #ifdef USE_SNZI 466 if(emptied) { 467 depart( snzi, i ); 468 } 469 #endif 432 if(emptied) { 433 depart( snzi, i ); 434 } 470 435 } 471 436 __atomic_unlock(&lane.lock); … … 529 494 // grow the ready queue 530 495 with( cltr->ready_queue ) { 531 #ifdef USE_SNZI 532 ^(snzi){}; 533 #endif 496 ^(snzi){}; 534 497 535 498 // Find new count … … 553 516 lanes.count = ncount; 554 517 555 #ifdef USE_SNZI 556 // Re-create the snzi 557 snzi{ log2( lanes.count / 8 ) }; 558 for( idx; (size_t)lanes.count ) { 559 if( !is_empty(lanes.data[idx]) ) { 560 arrive(snzi, idx); 561 } 562 } 563 #endif 518 // Re-create the snzi 519 snzi{ log2( lanes.count / 8 ) }; 520 for( idx; (size_t)lanes.count ) { 521 if( !is_empty(lanes.data[idx]) ) { 522 arrive(snzi, idx); 523 } 524 } 564 525 } 565 526 … … 581 542 582 543 with( cltr->ready_queue ) { 583 #ifdef USE_SNZI 584 ^(snzi){}; 585 #endif 544 ^(snzi){}; 586 545 587 546 // Remember old count … … 637 596 } 638 597 639 #ifdef USE_SNZI 640 // Re-create the snzi 641 snzi{ log2( lanes.count / 8 ) }; 642 for( idx; (size_t)lanes.count ) { 643 if( !is_empty(lanes.data[idx]) ) { 644 arrive(snzi, idx); 645 } 646 } 647 #endif 598 // Re-create the snzi 599 snzi{ log2( lanes.count / 8 ) }; 600 for( idx; (size_t)lanes.count ) { 601 if( !is_empty(lanes.data[idx]) ) { 602 arrive(snzi, idx); 603 } 604 } 648 605 } 649 606
Note: See TracChangeset
for help on using the changeset viewer.