Changes in / [00e9be9:99b2407]
- Location:
- libcfa/src/concurrency
- Files:
-
- 10 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/clib/cfathread.cfa
r00e9be9 r99b2407 243 243 // Mutex 244 244 struct cfathread_mutex { 245 single_acquisition_lock impl;245 fast_lock impl; 246 246 }; 247 247 int cfathread_mutex_init(cfathread_mutex_t *restrict mut, const cfathread_mutexattr_t *restrict) __attribute__((nonnull (1))) { *mut = new(); return 0; } … … 258 258 // Condition 259 259 struct cfathread_condition { 260 condition_variable( single_acquisition_lock) impl;260 condition_variable(fast_lock) impl; 261 261 }; 262 262 int cfathread_cond_init(cfathread_cond_t *restrict cond, const cfathread_condattr_t *restrict) __attribute__((nonnull (1))) { *cond = new(); return 0; } -
libcfa/src/concurrency/invoke.h
r00e9be9 r99b2407 148 148 struct $thread * prev; 149 149 volatile unsigned long long ts; 150 unsigned preferred; 150 151 }; 151 152 … … 199 200 } node; 200 201 202 struct processor * last_proc; 203 201 204 #if defined( __CFA_WITH_VERIFY__ ) 202 205 void * canary; -
libcfa/src/concurrency/io.cfa
r00e9be9 r99b2407 40 40 #include "kernel.hfa" 41 41 #include "kernel/fwd.hfa" 42 #include "kernel_private.hfa" 42 43 #include "io/types.hfa" 43 44 … … 89 90 static inline unsigned __flush( struct $io_context & ); 90 91 static inline __u32 __release_sqes( struct $io_context & ); 92 extern void __kernel_unpark( $thread * thrd ); 91 93 92 94 bool __cfa_io_drain( processor * proc ) { 93 95 /* paranoid */ verify( ! __preemption_enabled() ); 96 /* paranoid */ verify( ready_schedule_islocked() ); 94 97 /* paranoid */ verify( proc ); 95 98 /* paranoid */ verify( proc->io.ctx ); … … 115 118 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); 116 119 117 fulfil( *future, cqe.res);120 __kernel_unpark( fulfil( *future, cqe.res, false ) ); 118 121 } 119 122 … … 124 127 __atomic_store_n( ctx->cq.head, head + count, __ATOMIC_SEQ_CST ); 125 128 129 /* paranoid */ verify( ready_schedule_islocked() ); 126 130 /* paranoid */ verify( ! __preemption_enabled() ); 127 131 -
libcfa/src/concurrency/kernel.cfa
r00e9be9 r99b2407 34 34 #include "invoke.h" 35 35 36 #if !defined(__CFA_NO_STATISTICS__) 37 #define __STATS( ...) __VA_ARGS__ 38 #else 39 #define __STATS( ...) 40 #endif 36 41 37 42 //----------------------------------------------------------------------------- … … 166 171 preemption_scope scope = { this }; 167 172 168 #if !defined(__CFA_NO_STATISTICS__) 169 unsigned long long last_tally = rdtscl(); 170 #endif 173 __STATS( unsigned long long last_tally = rdtscl(); ) 171 174 172 175 // if we need to run some special setup, now is the time to do it. … … 266 269 __cfa_io_flush( this ); 267 270 } 271 272 // SEARCH: { 273 // /* paranoid */ verify( ! __preemption_enabled() ); 274 // /* paranoid */ verify( kernelTLS().this_proc_id ); 275 276 // // First, lock the scheduler since we are searching for a thread 277 278 // // Try to get the next thread 279 // ready_schedule_lock(); 280 // readyThread = pop_fast( this->cltr ); 281 // ready_schedule_unlock(); 282 // if(readyThread) { break SEARCH; } 283 284 // // If we can't find a thread, might as well flush any outstanding I/O 285 // if(this->io.pending) { __cfa_io_flush( this ); } 286 287 // // Spin a little on I/O, just in case 288 // for(25) { 289 // __maybe_io_drain( this ); 290 // ready_schedule_lock(); 291 // readyThread = pop_fast( this->cltr ); 292 // ready_schedule_unlock(); 293 // if(readyThread) { break SEARCH; } 294 // } 295 296 // // no luck, try stealing a few times 297 // for(25) { 298 // if( __maybe_io_drain( this ) ) { 299 // ready_schedule_lock(); 300 // readyThread = pop_fast( this->cltr ); 301 // } else { 302 // ready_schedule_lock(); 303 // readyThread = pop_slow( this->cltr ); 304 // } 305 // ready_schedule_unlock(); 306 // if(readyThread) { break SEARCH; } 307 // } 308 309 // // still no luck, search for a thread 310 // ready_schedule_lock(); 311 // readyThread = pop_search( this->cltr ); 312 // ready_schedule_unlock(); 313 // if(readyThread) { break SEARCH; } 314 315 // // Don't block if we are done 316 // if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP; 317 318 // __STATS( __tls_stats()->ready.sleep.halts++; ) 319 320 // // Push self to idle stack 321 // mark_idle(this->cltr->procs, * this); 322 323 // // Confirm the ready-queue is empty 324 // __maybe_io_drain( this ); 325 // ready_schedule_lock(); 326 // readyThread = pop_search( this->cltr ); 327 // ready_schedule_unlock(); 328 329 // if( readyThread ) { 330 // // A thread was found, cancel the halt 331 // mark_awake(this->cltr->procs, * this); 332 333 // __STATS( __tls_stats()->ready.sleep.cancels++; ) 334 335 // // continue the main loop 336 // break SEARCH; 337 // } 338 339 // __STATS( if(this->print_halts) __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 0\n", this->id, rdtscl()); ) 340 // __cfadbg_print_safe(runtime_core, "Kernel : core %p waiting on eventfd %d\n", this, this->idle); 341 342 // // __disable_interrupts_hard(); 343 // eventfd_t val; 344 // eventfd_read( this->idle, &val ); 345 // // __enable_interrupts_hard(); 346 347 // __STATS( if(this->print_halts) __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 1\n", this->id, rdtscl()); ) 348 349 // // We were woken up, remove self from idle 350 // mark_awake(this->cltr->procs, * this); 351 352 // // DON'T just proceed, start looking again 353 // continue MAIN_LOOP; 354 // } 355 356 // RUN_THREAD: 357 // /* paranoid */ verify( kernelTLS().this_proc_id ); 358 // /* paranoid */ verify( ! __preemption_enabled() ); 359 // /* paranoid */ verify( readyThread ); 360 361 // // Reset io dirty bit 362 // this->io.dirty = false; 363 364 // // We found a thread run it 365 // __run_thread(this, readyThread); 366 367 // // Are we done? 368 // if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP; 369 370 // #if !defined(__CFA_NO_STATISTICS__) 371 // unsigned long long curr = rdtscl(); 372 // if(curr > (last_tally + 500000000)) { 373 // __tally_stats(this->cltr->stats, __cfaabi_tls.this_stats); 374 // last_tally = curr; 375 // } 376 // #endif 377 378 // if(this->io.pending && !this->io.dirty) { 379 // __cfa_io_flush( this ); 380 // } 381 382 // // Check if there is pending io 383 // __maybe_io_drain( this ); 268 384 } 269 385 … … 402 518 $thread * thrd_src = kernelTLS().this_thread; 403 519 404 #if !defined(__CFA_NO_STATISTICS__) 405 struct processor * last_proc = kernelTLS().this_processor; 406 #endif 520 __STATS( thrd_src->last_proc = kernelTLS().this_processor; ) 407 521 408 522 // Run the thread on this processor … … 423 537 424 538 #if !defined(__CFA_NO_STATISTICS__) 425 if(last_proc != kernelTLS().this_processor) { 539 /* paranoid */ verify( thrd_src->last_proc != 0p ); 540 if(thrd_src->last_proc != kernelTLS().this_processor) { 426 541 __tls_stats()->ready.threads.migration++; 427 542 } … … 436 551 // Scheduler routines 437 552 // KERNEL ONLY 438 void __schedule_thread( $thread * thrd ) {553 static void __schedule_thread( $thread * thrd ) { 439 554 /* paranoid */ verify( ! __preemption_enabled() ); 440 555 /* paranoid */ verify( kernelTLS().this_proc_id ); … … 457 572 // Dereference the thread now because once we push it, there is not guaranteed it's still valid. 458 573 struct cluster * cl = thrd->curr_cluster; 574 __STATS(bool outside = thrd->last_proc && thrd->last_proc != kernelTLS().this_processor; ) 459 575 460 576 // push the thread to the cluster ready-queue … … 470 586 if( kernelTLS().this_stats ) { 471 587 __tls_stats()->ready.threads.threads++; 588 if(outside) { 589 __tls_stats()->ready.threads.extunpark++; 590 } 472 591 __push_stat( __tls_stats(), __tls_stats()->ready.threads.threads, false, "Processor", kernelTLS().this_processor ); 473 592 } 474 593 else { 475 594 __atomic_fetch_add(&cl->stats->ready.threads.threads, 1, __ATOMIC_RELAXED); 595 __atomic_fetch_add(&cl->stats->ready.threads.extunpark, 1, __ATOMIC_RELAXED); 476 596 __push_stat( cl->stats, cl->stats->ready.threads.threads, true, "Cluster", cl ); 477 597 } … … 508 628 509 629 ready_schedule_lock(); 510 $thread * thrd = pop_slow( this ); 630 $thread * thrd; 631 for(25) { 632 thrd = pop_slow( this ); 633 if(thrd) goto RET; 634 } 635 thrd = pop_search( this ); 636 637 RET: 511 638 ready_schedule_unlock(); 512 639 … … 532 659 } 533 660 661 void __kernel_unpark( $thread * thrd ) { 662 /* paranoid */ verify( ! __preemption_enabled() ); 663 /* paranoid */ verify( ready_schedule_islocked()); 664 665 if( !thrd ) return; 666 667 if(__must_unpark(thrd)) { 668 // Wake lost the race, 669 __schedule_thread( thrd ); 670 } 671 672 /* paranoid */ verify( ready_schedule_islocked()); 673 /* paranoid */ verify( ! __preemption_enabled() ); 674 } 675 534 676 void unpark( $thread * thrd ) { 535 677 if( !thrd ) return; … … 744 886 745 887 static inline bool __maybe_io_drain( processor * proc ) { 888 bool ret = false; 746 889 #if defined(CFA_HAVE_LINUX_IO_URING_H) 747 890 __cfadbg_print_safe(runtime_core, "Kernel : core %p checking io for ring %d\n", proc, proc->io.ctx->fd); … … 752 895 unsigned tail = *ctx->cq.tail; 753 896 if(head == tail) return false; 754 return __cfa_io_drain( proc ); 897 ready_schedule_lock(); 898 ret = __cfa_io_drain( proc ); 899 ready_schedule_unlock(); 755 900 #endif 901 return ret; 756 902 } 757 903 -
libcfa/src/concurrency/kernel/startup.cfa
r00e9be9 r99b2407 447 447 link.next = 0p; 448 448 link.prev = 0p; 449 link.preferred = -1u; 450 last_proc = 0p; 449 451 #if defined( __CFA_WITH_VERIFY__ ) 450 452 canary = 0x0D15EA5E0D15EA5Ep; -
libcfa/src/concurrency/kernel_private.hfa
r00e9be9 r99b2407 284 284 285 285 //----------------------------------------------------------------------- 286 // pop thread from the ready queueof a cluster286 // pop thread from the local queues of a cluster 287 287 // returns 0p if empty 288 288 // May return 0p spuriously … … 290 290 291 291 //----------------------------------------------------------------------- 292 // pop thread from the ready queue of a cluster 292 // pop thread from any ready queue of a cluster 293 // returns 0p if empty 294 // May return 0p spuriously 295 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr); 296 297 //----------------------------------------------------------------------- 298 // search all ready queues of a cluster for any thread 293 299 // returns 0p if empty 294 300 // guaranteed to find any threads added before this call 295 __attribute__((hot)) struct $thread * pop_s low(struct cluster * cltr);301 __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr); 296 302 297 303 //----------------------------------------------------------------------- -
libcfa/src/concurrency/ready_queue.cfa
r00e9be9 r99b2407 344 344 } 345 345 346 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) { 346 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) { return pop_fast(cltr); } 347 __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) { 347 348 return search(cltr); 348 349 } … … 436 437 437 438 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) { 438 for(25) { 439 unsigned i = __tls_rand() % lanes.count; 440 $thread * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal)); 441 if(t) return t; 442 } 443 439 unsigned i = __tls_rand() % lanes.count; 440 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal)); 441 } 442 443 __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) with (cltr->ready_queue) { 444 444 return search(cltr); 445 445 } -
libcfa/src/concurrency/stats.cfa
r00e9be9 r99b2407 38 38 stats->ready.pop.search.espec = 0; 39 39 stats->ready.threads.migration = 0; 40 stats->ready.threads.extunpark = 0; 40 41 stats->ready.threads.threads = 0; 41 42 stats->ready.sleep.halts = 0; … … 95 96 __atomic_fetch_add( &cltr->ready.pop.search.espec , proc->ready.pop.search.espec , __ATOMIC_SEQ_CST ); proc->ready.pop.search.espec = 0; 96 97 __atomic_fetch_add( &cltr->ready.threads.migration , proc->ready.threads.migration , __ATOMIC_SEQ_CST ); proc->ready.threads.migration = 0; 98 __atomic_fetch_add( &cltr->ready.threads.extunpark , proc->ready.threads.extunpark , __ATOMIC_SEQ_CST ); proc->ready.threads.extunpark = 0; 97 99 __atomic_fetch_add( &cltr->ready.threads.threads , proc->ready.threads.threads , __ATOMIC_SEQ_CST ); proc->ready.threads.threads = 0; 98 100 __atomic_fetch_add( &cltr->ready.sleep.halts , proc->ready.sleep.halts , __ATOMIC_SEQ_CST ); proc->ready.sleep.halts = 0; … … 132 134 uint64_t totalR = ready.pop.local.success + ready.pop.help.success + ready.pop.steal.success + ready.pop.search.success; 133 135 uint64_t totalS = ready.push.local.success + ready.push.share.success + ready.push.extrn.success; 134 sstr | "- totals : " | eng3(totalR) | "run," | eng3(totalS) | "schd (" | eng3(ready.push.extrn.success) | "ext," | eng3(ready.threads.migration) | "mig )";136 sstr | "- totals : " | eng3(totalR) | "run," | eng3(totalS) | "schd (" | eng3(ready.push.extrn.success) | "ext," | eng3(ready.threads.migration) | "mig," | eng3(ready.threads.extunpark) | " eupk)"; 135 137 136 138 double push_len = ((double)ready.push.local.attempt + ready.push.share.attempt + ready.push.extrn.attempt) / totalS; -
libcfa/src/concurrency/stats.hfa
r00e9be9 r99b2407 70 70 struct { 71 71 volatile uint64_t migration; 72 volatile uint64_t extunpark; 72 73 volatile int64_t threads; // number of threads in the system, includes only local change 73 74 } threads; -
libcfa/src/concurrency/thread.cfa
r00e9be9 r99b2407 39 39 link.next = 0p; 40 40 link.prev = 0p; 41 link.preferred = -1u; 42 last_proc = 0p; 41 43 #if defined( __CFA_WITH_VERIFY__ ) 42 44 canary = 0x0D15EA5E0D15EA5Ep;
Note: See TracChangeset
for help on using the changeset viewer.