- File:
-
- 1 edited
-
libcfa/src/concurrency/kernel.cfa (modified) (33 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/kernel.cfa
rb7fd2db6 r454f478 22 22 #include <signal.h> 23 23 #include <unistd.h> 24 extern "C" {25 #include <sys/eventfd.h>26 }27 24 28 25 //CFA Includes … … 34 31 #include "invoke.h" 35 32 36 #if !defined(__CFA_NO_STATISTICS__)37 #define __STATS( ...) __VA_ARGS__38 #else39 #define __STATS( ...)40 #endif41 33 42 34 //----------------------------------------------------------------------------- … … 115 107 static $thread * __next_thread(cluster * this); 116 108 static $thread * __next_thread_slow(cluster * this); 117 static inline bool __must_unpark( $thread * thrd ) __attribute((nonnull(1)));118 109 static void __run_thread(processor * this, $thread * dst); 119 110 static void __wake_one(cluster * cltr); 120 111 121 static void mark_idle (__cluster_proc_list & idles, processor & proc); 122 static void mark_awake(__cluster_proc_list & idles, processor & proc); 123 static [unsigned idle, unsigned total, * processor] query_idles( & __cluster_proc_list idles ); 124 125 extern void __cfa_io_start( processor * ); 126 extern bool __cfa_io_drain( processor * ); 127 extern void __cfa_io_flush( processor * ); 128 extern void __cfa_io_stop ( processor * ); 129 static inline bool __maybe_io_drain( processor * ); 130 131 extern void __disable_interrupts_hard(); 132 extern void __enable_interrupts_hard(); 133 134 static inline void __disable_interrupts_checked() { 135 /* paranoid */ verify( __preemption_enabled() ); 136 disable_interrupts(); 137 /* paranoid */ verify( ! __preemption_enabled() ); 138 } 139 140 static inline void __enable_interrupts_checked( bool poll = true ) { 141 /* paranoid */ verify( ! __preemption_enabled() ); 142 enable_interrupts( poll ); 143 /* paranoid */ verify( __preemption_enabled() ); 144 } 112 static void push (__cluster_idles & idles, processor & proc); 113 static void remove(__cluster_idles & idles, processor & proc); 114 static [unsigned idle, unsigned total, * processor] query( & __cluster_idles idles ); 115 145 116 146 117 //============================================================================================= … … 158 129 verify(this); 159 130 160 __cfa_io_start( this );161 162 131 __cfadbg_print_safe(runtime_core, "Kernel : core %p starting\n", this); 163 132 #if !defined(__CFA_NO_STATISTICS__) … … 171 140 preemption_scope scope = { this }; 172 141 173 __STATS( unsigned long long last_tally = rdtscl(); ) 174 175 // if we need to run some special setup, now is the time to do it. 176 if(this->init.thrd) { 177 this->init.thrd->curr_cluster = this->cltr; 178 __run_thread(this, this->init.thrd); 179 } 142 #if !defined(__CFA_NO_STATISTICS__) 143 unsigned long long last_tally = rdtscl(); 144 #endif 145 180 146 181 147 __cfadbg_print_safe(runtime_core, "Kernel : core %p started\n", this); … … 184 150 MAIN_LOOP: 185 151 for() { 186 // Check if there is pending io187 __maybe_io_drain( this );188 189 152 // Try to get the next thread 190 153 readyThread = __next_thread( this->cltr ); 191 154 192 155 if( !readyThread ) { 193 __cfa_io_flush( this );194 156 readyThread = __next_thread_slow( this->cltr ); 195 157 } … … 205 167 206 168 // Push self to idle stack 207 mark_idle(this->cltr->procs, * this);169 push(this->cltr->idles, * this); 208 170 209 171 // Confirm the ready-queue is empty … … 211 173 if( readyThread ) { 212 174 // A thread was found, cancel the halt 213 mark_awake(this->cltr->procs, * this);175 remove(this->cltr->idles, * this); 214 176 215 177 #if !defined(__CFA_NO_STATISTICS__) … … 227 189 #endif 228 190 229 __cfadbg_print_safe(runtime_core, "Kernel : core %p waiting on eventfd %d\n", this, this->idle); 230 231 __disable_interrupts_hard(); 232 eventfd_t val; 233 eventfd_read( this->idle, &val ); 234 __enable_interrupts_hard(); 191 wait( this->idle ); 235 192 236 193 #if !defined(__CFA_NO_STATISTICS__) … … 241 198 242 199 // We were woken up, remove self from idle 243 mark_awake(this->cltr->procs, * this);200 remove(this->cltr->idles, * this); 244 201 245 202 // DON'T just proceed, start looking again … … 248 205 249 206 /* paranoid */ verify( readyThread ); 250 251 // Reset io dirty bit252 this->io.dirty = false;253 207 254 208 // We found a thread run it … … 265 219 } 266 220 #endif 267 268 if(this->io.pending && !this->io.dirty) {269 __cfa_io_flush( this );270 }271 272 // SEARCH: {273 // /* paranoid */ verify( ! __preemption_enabled() );274 // /* paranoid */ verify( kernelTLS().this_proc_id );275 276 // // First, lock the scheduler since we are searching for a thread277 278 // // Try to get the next thread279 // ready_schedule_lock();280 // readyThread = pop_fast( this->cltr );281 // ready_schedule_unlock();282 // if(readyThread) { break SEARCH; }283 284 // // If we can't find a thread, might as well flush any outstanding I/O285 // if(this->io.pending) { __cfa_io_flush( this ); }286 287 // // Spin a little on I/O, just in case288 // for(25) {289 // __maybe_io_drain( this );290 // ready_schedule_lock();291 // readyThread = pop_fast( this->cltr );292 // ready_schedule_unlock();293 // if(readyThread) { break SEARCH; }294 // }295 296 // // no luck, try stealing a few times297 // for(25) {298 // if( __maybe_io_drain( this ) ) {299 // ready_schedule_lock();300 // readyThread = pop_fast( this->cltr );301 // } else {302 // ready_schedule_lock();303 // readyThread = pop_slow( this->cltr );304 // }305 // ready_schedule_unlock();306 // if(readyThread) { break SEARCH; }307 // }308 309 // // still no luck, search for a thread310 // ready_schedule_lock();311 // readyThread = pop_search( this->cltr );312 // ready_schedule_unlock();313 // if(readyThread) { break SEARCH; }314 315 // // Don't block if we are done316 // if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP;317 318 // __STATS( __tls_stats()->ready.sleep.halts++; )319 320 // // Push self to idle stack321 // mark_idle(this->cltr->procs, * this);322 323 // // Confirm the ready-queue is empty324 // __maybe_io_drain( this );325 // ready_schedule_lock();326 // readyThread = pop_search( this->cltr );327 // ready_schedule_unlock();328 329 // if( readyThread ) {330 // // A thread was found, cancel the halt331 // mark_awake(this->cltr->procs, * this);332 333 // __STATS( __tls_stats()->ready.sleep.cancels++; )334 335 // // continue the main loop336 // break SEARCH;337 // }338 339 // __STATS( if(this->print_halts) __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 0\n", this->id, rdtscl()); )340 // __cfadbg_print_safe(runtime_core, "Kernel : core %p waiting on eventfd %d\n", this, this->idle);341 342 // // __disable_interrupts_hard();343 // eventfd_t val;344 // eventfd_read( this->idle, &val );345 // // __enable_interrupts_hard();346 347 // __STATS( if(this->print_halts) __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 1\n", this->id, rdtscl()); )348 349 // // We were woken up, remove self from idle350 // mark_awake(this->cltr->procs, * this);351 352 // // DON'T just proceed, start looking again353 // continue MAIN_LOOP;354 // }355 356 // RUN_THREAD:357 // /* paranoid */ verify( kernelTLS().this_proc_id );358 // /* paranoid */ verify( ! __preemption_enabled() );359 // /* paranoid */ verify( readyThread );360 361 // // Reset io dirty bit362 // this->io.dirty = false;363 364 // // We found a thread run it365 // __run_thread(this, readyThread);366 367 // // Are we done?368 // if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP;369 370 // #if !defined(__CFA_NO_STATISTICS__)371 // unsigned long long curr = rdtscl();372 // if(curr > (last_tally + 500000000)) {373 // __tally_stats(this->cltr->stats, __cfaabi_tls.this_stats);374 // last_tally = curr;375 // }376 // #endif377 378 // if(this->io.pending && !this->io.dirty) {379 // __cfa_io_flush( this );380 // }381 382 // // Check if there is pending io383 // __maybe_io_drain( this );384 221 } 385 222 … … 387 224 } 388 225 389 __cfa_io_stop( this );390 391 226 post( this->terminated ); 392 393 227 394 228 if(this == mainProcessor) { … … 413 247 /* paranoid */ verifyf( thrd_dst->link.next == 0p, "Expected null got %p", thrd_dst->link.next ); 414 248 __builtin_prefetch( thrd_dst->context.SP ); 415 416 __cfadbg_print_safe(runtime_core, "Kernel : core %p running thread %p (%s)\n", this, thrd_dst, thrd_dst->self_cor.name);417 249 418 250 $coroutine * proc_cor = get_coroutine(this->runner); … … 465 297 if(unlikely(thrd_dst->preempted != __NO_PREEMPTION)) { 466 298 // The thread was preempted, reschedule it and reset the flag 467 schedule_thread$( thrd_dst );299 __schedule_thread( thrd_dst ); 468 300 break RUNNING; 469 301 } … … 486 318 break RUNNING; 487 319 case TICKET_UNBLOCK: 488 #if !defined(__CFA_NO_STATISTICS__)489 __tls_stats()->ready.threads.threads++;490 __push_stat( __tls_stats(), __tls_stats()->ready.threads.threads, false, "Processor", this );491 #endif492 320 // This is case 2, the racy case, someone tried to run this thread before it finished blocking 493 321 // In this case, just run it again. … … 502 330 proc_cor->state = Active; 503 331 504 __cfadbg_print_safe(runtime_core, "Kernel : core %p finished running thread %p\n", this, thrd_dst);505 506 #if !defined(__CFA_NO_STATISTICS__)507 __tls_stats()->ready.threads.threads--;508 __push_stat( __tls_stats(), __tls_stats()->ready.threads.threads, false, "Processor", this );509 #endif510 511 332 /* paranoid */ verify( ! __preemption_enabled() ); 512 333 } … … 518 339 $thread * thrd_src = kernelTLS().this_thread; 519 340 520 __STATS( thrd_src->last_proc = kernelTLS().this_processor; ) 341 #if !defined(__CFA_NO_STATISTICS__) 342 struct processor * last_proc = kernelTLS().this_processor; 343 #endif 521 344 522 345 // Run the thread on this processor … … 537 360 538 361 #if !defined(__CFA_NO_STATISTICS__) 539 /* paranoid */ verify( thrd_src->last_proc != 0p ); 540 if(thrd_src->last_proc != kernelTLS().this_processor) { 362 if(last_proc != kernelTLS().this_processor) { 541 363 __tls_stats()->ready.threads.migration++; 542 364 } … … 551 373 // Scheduler routines 552 374 // KERNEL ONLY 553 staticvoid __schedule_thread( $thread * thrd ) {375 void __schedule_thread( $thread * thrd ) { 554 376 /* paranoid */ verify( ! __preemption_enabled() ); 555 377 /* paranoid */ verify( kernelTLS().this_proc_id ); 556 /* paranoid */ verify( ready_schedule_islocked());557 378 /* paranoid */ verify( thrd ); 558 379 /* paranoid */ verify( thrd->state != Halted ); … … 570 391 if (thrd->preempted == __NO_PREEMPTION) thrd->state = Ready; 571 392 572 // Dereference the thread now because once we push it, there is not guaranteed it's still valid.573 struct cluster * cl = thrd->curr_cluster;574 __STATS(bool outside = thrd->last_proc && thrd->last_proc != kernelTLS().this_processor; )575 576 // push the thread to the cluster ready-queue577 push( cl, thrd );578 579 // variable thrd is no longer safe to use580 thrd = 0xdeaddeaddeaddeadp;581 582 // wake the cluster using the save variable.583 __wake_one( cl );584 585 #if !defined(__CFA_NO_STATISTICS__)586 if( kernelTLS().this_stats ) {587 __tls_stats()->ready.threads.threads++;588 if(outside) {589 __tls_stats()->ready.threads.extunpark++;590 }591 __push_stat( __tls_stats(), __tls_stats()->ready.threads.threads, false, "Processor", kernelTLS().this_processor );592 }593 else {594 __atomic_fetch_add(&cl->stats->ready.threads.threads, 1, __ATOMIC_RELAXED);595 __atomic_fetch_add(&cl->stats->ready.threads.extunpark, 1, __ATOMIC_RELAXED);596 __push_stat( cl->stats, cl->stats->ready.threads.threads, true, "Cluster", cl );597 }598 #endif599 600 /* paranoid */ verify( ready_schedule_islocked());601 /* paranoid */ verify( ! __preemption_enabled() );602 }603 604 void schedule_thread$( $thread * thrd ) {605 393 ready_schedule_lock(); 606 __schedule_thread( thrd ); 394 // Dereference the thread now because once we push it, there is not guaranteed it's still valid. 395 struct cluster * cl = thrd->curr_cluster; 396 397 // push the thread to the cluster ready-queue 398 push( cl, thrd ); 399 400 // variable thrd is no longer safe to use 401 402 // wake the cluster using the save variable. 403 __wake_one( cl ); 607 404 ready_schedule_unlock(); 405 406 /* paranoid */ verify( ! __preemption_enabled() ); 608 407 } 609 408 … … 614 413 615 414 ready_schedule_lock(); 616 $thread * thrd = pop _fast( this );415 $thread * thrd = pop( this ); 617 416 ready_schedule_unlock(); 618 417 … … 628 427 629 428 ready_schedule_lock(); 630 $thread * thrd; 631 for(25) { 632 thrd = pop_slow( this ); 633 if(thrd) goto RET; 634 } 635 thrd = pop_search( this ); 636 637 RET: 429 $thread * thrd = pop_slow( this ); 638 430 ready_schedule_unlock(); 639 431 … … 643 435 } 644 436 645 static inline bool __must_unpark( $thread * thrd ) { 437 void unpark( $thread * thrd ) { 438 if( !thrd ) return; 439 646 440 int old_ticket = __atomic_fetch_add(&thrd->ticket, 1, __ATOMIC_SEQ_CST); 647 441 switch(old_ticket) { 648 442 case TICKET_RUNNING: 649 443 // Wake won the race, the thread will reschedule/rerun itself 650 return false;444 break; 651 445 case TICKET_BLOCKED: 652 446 /* paranoid */ verify( ! thrd->preempted != __NO_PREEMPTION ); 653 447 /* paranoid */ verify( thrd->state == Blocked ); 654 return true; 448 449 { 450 /* paranoid */ verify( publicTLS_get(this_proc_id) ); 451 bool full = publicTLS_get(this_proc_id)->full_proc; 452 if(full) disable_interrupts(); 453 454 /* paranoid */ verify( ! __preemption_enabled() ); 455 456 // Wake lost the race, 457 __schedule_thread( thrd ); 458 459 /* paranoid */ verify( ! __preemption_enabled() ); 460 461 if(full) enable_interrupts( __cfaabi_dbg_ctx ); 462 /* paranoid */ verify( publicTLS_get(this_proc_id) ); 463 } 464 465 break; 655 466 default: 656 467 // This makes no sense, something is wrong abort … … 659 470 } 660 471 661 void __kernel_unpark( $thread * thrd ) {662 /* paranoid */ verify( ! __preemption_enabled() );663 /* paranoid */ verify( ready_schedule_islocked());664 665 if( !thrd ) return;666 667 if(__must_unpark(thrd)) {668 // Wake lost the race,669 __schedule_thread( thrd );670 }671 672 /* paranoid */ verify( ready_schedule_islocked());673 /* paranoid */ verify( ! __preemption_enabled() );674 }675 676 void unpark( $thread * thrd ) {677 if( !thrd ) return;678 679 if(__must_unpark(thrd)) {680 disable_interrupts();681 // Wake lost the race,682 schedule_thread$( thrd );683 enable_interrupts(false);684 }685 }686 687 472 void park( void ) { 688 __disable_interrupts_checked(); 689 /* paranoid */ verify( kernelTLS().this_thread->preempted == __NO_PREEMPTION ); 690 returnToKernel(); 691 __enable_interrupts_checked(); 473 /* paranoid */ verify( __preemption_enabled() ); 474 disable_interrupts(); 475 /* paranoid */ verify( ! __preemption_enabled() ); 476 /* paranoid */ verify( kernelTLS().this_thread->preempted == __NO_PREEMPTION ); 477 478 returnToKernel(); 479 480 /* paranoid */ verify( ! __preemption_enabled() ); 481 enable_interrupts( __cfaabi_dbg_ctx ); 482 /* paranoid */ verify( __preemption_enabled() ); 692 483 693 484 } … … 729 520 // KERNEL ONLY 730 521 bool force_yield( __Preemption_Reason reason ) { 731 __disable_interrupts_checked(); 732 $thread * thrd = kernelTLS().this_thread; 733 /* paranoid */ verify(thrd->state == Active); 734 735 // SKULLDUGGERY: It is possible that we are preempting this thread just before 736 // it was going to park itself. If that is the case and it is already using the 737 // intrusive fields then we can't use them to preempt the thread 738 // If that is the case, abandon the preemption. 739 bool preempted = false; 740 if(thrd->link.next == 0p) { 741 preempted = true; 742 thrd->preempted = reason; 743 returnToKernel(); 744 } 745 __enable_interrupts_checked( false ); 522 /* paranoid */ verify( __preemption_enabled() ); 523 disable_interrupts(); 524 /* paranoid */ verify( ! __preemption_enabled() ); 525 526 $thread * thrd = kernelTLS().this_thread; 527 /* paranoid */ verify(thrd->state == Active); 528 529 // SKULLDUGGERY: It is possible that we are preempting this thread just before 530 // it was going to park itself. If that is the case and it is already using the 531 // intrusive fields then we can't use them to preempt the thread 532 // If that is the case, abandon the preemption. 533 bool preempted = false; 534 if(thrd->link.next == 0p) { 535 preempted = true; 536 thrd->preempted = reason; 537 returnToKernel(); 538 } 539 540 /* paranoid */ verify( ! __preemption_enabled() ); 541 enable_interrupts_noPoll(); 542 /* paranoid */ verify( __preemption_enabled() ); 543 746 544 return preempted; 747 545 } … … 759 557 unsigned idle; 760 558 unsigned total; 761 [idle, total, p] = query _idles(this->procs);559 [idle, total, p] = query(this->idles); 762 560 763 561 // If no one is sleeping, we are done … … 765 563 766 564 // We found a processor, wake it up 767 eventfd_t val; 768 val = 1; 769 eventfd_write( p->idle, val ); 565 post( p->idle ); 770 566 771 567 #if !defined(__CFA_NO_STATISTICS__) 772 if( kernelTLS().this_stats ) { 773 __tls_stats()->ready.sleep.wakes++; 774 } 775 else { 776 __atomic_fetch_add(&this->stats->ready.sleep.wakes, 1, __ATOMIC_RELAXED); 777 } 568 __tls_stats()->ready.sleep.wakes++; 778 569 #endif 779 570 … … 788 579 __cfadbg_print_safe(runtime_core, "Kernel : waking Processor %p\n", this); 789 580 790 __disable_interrupts_checked();581 disable_interrupts(); 791 582 /* paranoid */ verify( ! __preemption_enabled() ); 792 eventfd_t val; 793 val = 1; 794 eventfd_write( this->idle, val ); 795 __enable_interrupts_checked(); 796 } 797 798 static void mark_idle(__cluster_proc_list & this, processor & proc) { 583 post( this->idle ); 584 enable_interrupts( __cfaabi_dbg_ctx ); 585 } 586 587 static void push (__cluster_idles & this, processor & proc) { 799 588 /* paranoid */ verify( ! __preemption_enabled() ); 800 589 lock( this ); 801 590 this.idle++; 802 591 /* paranoid */ verify( this.idle <= this.total ); 803 remove(proc); 804 insert_first(this. idles, proc);592 593 insert_first(this.list, proc); 805 594 unlock( this ); 806 595 /* paranoid */ verify( ! __preemption_enabled() ); 807 596 } 808 597 809 static void mark_awake(__cluster_proc_list& this, processor & proc) {598 static void remove(__cluster_idles & this, processor & proc) { 810 599 /* paranoid */ verify( ! __preemption_enabled() ); 811 600 lock( this ); 812 601 this.idle--; 813 602 /* paranoid */ verify( this.idle >= 0 ); 603 814 604 remove(proc); 815 insert_last(this.actives, proc);816 605 unlock( this ); 817 606 /* paranoid */ verify( ! __preemption_enabled() ); 818 607 } 819 608 820 static [unsigned idle, unsigned total, * processor] query_idles( & __cluster_proc_list this ) { 821 /* paranoid */ verify( ! __preemption_enabled() ); 822 /* paranoid */ verify( ready_schedule_islocked() ); 823 609 static [unsigned idle, unsigned total, * processor] query( & __cluster_idles this ) { 824 610 for() { 825 611 uint64_t l = __atomic_load_n(&this.lock, __ATOMIC_SEQ_CST); … … 827 613 unsigned idle = this.idle; 828 614 unsigned total = this.total; 829 processor * proc = &this. idles`first;615 processor * proc = &this.list`first; 830 616 // Compiler fence is unnecessary, but gcc-8 and older incorrectly reorder code without it 831 617 asm volatile("": : :"memory"); … … 833 619 return [idle, total, proc]; 834 620 } 835 836 /* paranoid */ verify( ready_schedule_islocked() );837 /* paranoid */ verify( ! __preemption_enabled() );838 621 } 839 622 … … 881 664 // Kernel Utilities 882 665 //============================================================================================= 883 #if defined(CFA_HAVE_LINUX_IO_URING_H)884 #include "io/types.hfa"885 #endif886 887 static inline bool __maybe_io_drain( processor * proc ) {888 bool ret = false;889 #if defined(CFA_HAVE_LINUX_IO_URING_H)890 __cfadbg_print_safe(runtime_core, "Kernel : core %p checking io for ring %d\n", proc, proc->io.ctx->fd);891 892 // Check if we should drain the queue893 $io_context * ctx = proc->io.ctx;894 unsigned head = *ctx->cq.head;895 unsigned tail = *ctx->cq.tail;896 if(head == tail) return false;897 ready_schedule_lock();898 ret = __cfa_io_drain( proc );899 ready_schedule_unlock();900 #endif901 return ret;902 }903 904 666 //----------------------------------------------------------------------------- 905 667 // Debug … … 929 691 __print_stats( this.stats, this.print_stats, "Cluster", this.name, (void*)&this ); 930 692 } 693 694 extern int __print_alarm_stats; 695 void print_alarm_stats() { 696 __print_alarm_stats = -1; 697 } 931 698 #endif 932 699 // Local Variables: //
Note:
See TracChangeset
for help on using the changeset viewer.