Changeset 5407cdc for libcfa/src/concurrency/kernel.cfa
- Timestamp:
- Apr 28, 2021, 4:56:50 PM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 8d66610
- Parents:
- feacef9 (diff), b7fd2db6 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)links above to see all the changes relative to each parent. - File:
-
- 1 edited
-
libcfa/src/concurrency/kernel.cfa (modified) (33 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/kernel.cfa
rfeacef9 r5407cdc 22 22 #include <signal.h> 23 23 #include <unistd.h> 24 extern "C" { 25 #include <sys/eventfd.h> 26 } 24 27 25 28 //CFA Includes … … 31 34 #include "invoke.h" 32 35 36 #if !defined(__CFA_NO_STATISTICS__) 37 #define __STATS( ...) __VA_ARGS__ 38 #else 39 #define __STATS( ...) 40 #endif 33 41 34 42 //----------------------------------------------------------------------------- … … 107 115 static $thread * __next_thread(cluster * this); 108 116 static $thread * __next_thread_slow(cluster * this); 117 static inline bool __must_unpark( $thread * thrd ) __attribute((nonnull(1))); 109 118 static void __run_thread(processor * this, $thread * dst); 110 119 static void __wake_one(cluster * cltr); 111 120 112 static void push (__cluster_idles & idles, processor & proc); 113 static void remove(__cluster_idles & idles, processor & proc); 114 static [unsigned idle, unsigned total, * processor] query( & __cluster_idles idles ); 115 121 static void mark_idle (__cluster_proc_list & idles, processor & proc); 122 static void mark_awake(__cluster_proc_list & idles, processor & proc); 123 static [unsigned idle, unsigned total, * processor] query_idles( & __cluster_proc_list idles ); 124 125 extern void __cfa_io_start( processor * ); 126 extern bool __cfa_io_drain( processor * ); 127 extern void __cfa_io_flush( processor * ); 128 extern void __cfa_io_stop ( processor * ); 129 static inline bool __maybe_io_drain( processor * ); 130 131 extern void __disable_interrupts_hard(); 132 extern void __enable_interrupts_hard(); 133 134 static inline void __disable_interrupts_checked() { 135 /* paranoid */ verify( __preemption_enabled() ); 136 disable_interrupts(); 137 /* paranoid */ verify( ! __preemption_enabled() ); 138 } 139 140 static inline void __enable_interrupts_checked( bool poll = true ) { 141 /* paranoid */ verify( ! __preemption_enabled() ); 142 enable_interrupts( poll ); 143 /* paranoid */ verify( __preemption_enabled() ); 144 } 116 145 117 146 //============================================================================================= … … 129 158 verify(this); 130 159 160 __cfa_io_start( this ); 161 131 162 __cfadbg_print_safe(runtime_core, "Kernel : core %p starting\n", this); 132 163 #if !defined(__CFA_NO_STATISTICS__) … … 140 171 preemption_scope scope = { this }; 141 172 142 #if !defined(__CFA_NO_STATISTICS__) 143 unsigned long long last_tally = rdtscl(); 144 #endif 145 173 __STATS( unsigned long long last_tally = rdtscl(); ) 174 175 // if we need to run some special setup, now is the time to do it. 176 if(this->init.thrd) { 177 this->init.thrd->curr_cluster = this->cltr; 178 __run_thread(this, this->init.thrd); 179 } 146 180 147 181 __cfadbg_print_safe(runtime_core, "Kernel : core %p started\n", this); … … 150 184 MAIN_LOOP: 151 185 for() { 186 // Check if there is pending io 187 __maybe_io_drain( this ); 188 152 189 // Try to get the next thread 153 190 readyThread = __next_thread( this->cltr ); 154 191 155 192 if( !readyThread ) { 193 __cfa_io_flush( this ); 156 194 readyThread = __next_thread_slow( this->cltr ); 157 195 } … … 167 205 168 206 // Push self to idle stack 169 push(this->cltr->idles, * this);207 mark_idle(this->cltr->procs, * this); 170 208 171 209 // Confirm the ready-queue is empty … … 173 211 if( readyThread ) { 174 212 // A thread was found, cancel the halt 175 remove(this->cltr->idles, * this);213 mark_awake(this->cltr->procs, * this); 176 214 177 215 #if !defined(__CFA_NO_STATISTICS__) … … 189 227 #endif 190 228 191 wait( this->idle ); 229 __cfadbg_print_safe(runtime_core, "Kernel : core %p waiting on eventfd %d\n", this, this->idle); 230 231 __disable_interrupts_hard(); 232 eventfd_t val; 233 eventfd_read( this->idle, &val ); 234 __enable_interrupts_hard(); 192 235 193 236 #if !defined(__CFA_NO_STATISTICS__) … … 198 241 199 242 // We were woken up, remove self from idle 200 remove(this->cltr->idles, * this);243 mark_awake(this->cltr->procs, * this); 201 244 202 245 // DON'T just proceed, start looking again … … 205 248 206 249 /* paranoid */ verify( readyThread ); 250 251 // Reset io dirty bit 252 this->io.dirty = false; 207 253 208 254 // We found a thread run it … … 219 265 } 220 266 #endif 267 268 if(this->io.pending && !this->io.dirty) { 269 __cfa_io_flush( this ); 270 } 271 272 // SEARCH: { 273 // /* paranoid */ verify( ! __preemption_enabled() ); 274 // /* paranoid */ verify( kernelTLS().this_proc_id ); 275 276 // // First, lock the scheduler since we are searching for a thread 277 278 // // Try to get the next thread 279 // ready_schedule_lock(); 280 // readyThread = pop_fast( this->cltr ); 281 // ready_schedule_unlock(); 282 // if(readyThread) { break SEARCH; } 283 284 // // If we can't find a thread, might as well flush any outstanding I/O 285 // if(this->io.pending) { __cfa_io_flush( this ); } 286 287 // // Spin a little on I/O, just in case 288 // for(25) { 289 // __maybe_io_drain( this ); 290 // ready_schedule_lock(); 291 // readyThread = pop_fast( this->cltr ); 292 // ready_schedule_unlock(); 293 // if(readyThread) { break SEARCH; } 294 // } 295 296 // // no luck, try stealing a few times 297 // for(25) { 298 // if( __maybe_io_drain( this ) ) { 299 // ready_schedule_lock(); 300 // readyThread = pop_fast( this->cltr ); 301 // } else { 302 // ready_schedule_lock(); 303 // readyThread = pop_slow( this->cltr ); 304 // } 305 // ready_schedule_unlock(); 306 // if(readyThread) { break SEARCH; } 307 // } 308 309 // // still no luck, search for a thread 310 // ready_schedule_lock(); 311 // readyThread = pop_search( this->cltr ); 312 // ready_schedule_unlock(); 313 // if(readyThread) { break SEARCH; } 314 315 // // Don't block if we are done 316 // if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP; 317 318 // __STATS( __tls_stats()->ready.sleep.halts++; ) 319 320 // // Push self to idle stack 321 // mark_idle(this->cltr->procs, * this); 322 323 // // Confirm the ready-queue is empty 324 // __maybe_io_drain( this ); 325 // ready_schedule_lock(); 326 // readyThread = pop_search( this->cltr ); 327 // ready_schedule_unlock(); 328 329 // if( readyThread ) { 330 // // A thread was found, cancel the halt 331 // mark_awake(this->cltr->procs, * this); 332 333 // __STATS( __tls_stats()->ready.sleep.cancels++; ) 334 335 // // continue the main loop 336 // break SEARCH; 337 // } 338 339 // __STATS( if(this->print_halts) __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 0\n", this->id, rdtscl()); ) 340 // __cfadbg_print_safe(runtime_core, "Kernel : core %p waiting on eventfd %d\n", this, this->idle); 341 342 // // __disable_interrupts_hard(); 343 // eventfd_t val; 344 // eventfd_read( this->idle, &val ); 345 // // __enable_interrupts_hard(); 346 347 // __STATS( if(this->print_halts) __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 1\n", this->id, rdtscl()); ) 348 349 // // We were woken up, remove self from idle 350 // mark_awake(this->cltr->procs, * this); 351 352 // // DON'T just proceed, start looking again 353 // continue MAIN_LOOP; 354 // } 355 356 // RUN_THREAD: 357 // /* paranoid */ verify( kernelTLS().this_proc_id ); 358 // /* paranoid */ verify( ! __preemption_enabled() ); 359 // /* paranoid */ verify( readyThread ); 360 361 // // Reset io dirty bit 362 // this->io.dirty = false; 363 364 // // We found a thread run it 365 // __run_thread(this, readyThread); 366 367 // // Are we done? 368 // if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP; 369 370 // #if !defined(__CFA_NO_STATISTICS__) 371 // unsigned long long curr = rdtscl(); 372 // if(curr > (last_tally + 500000000)) { 373 // __tally_stats(this->cltr->stats, __cfaabi_tls.this_stats); 374 // last_tally = curr; 375 // } 376 // #endif 377 378 // if(this->io.pending && !this->io.dirty) { 379 // __cfa_io_flush( this ); 380 // } 381 382 // // Check if there is pending io 383 // __maybe_io_drain( this ); 221 384 } 222 385 … … 224 387 } 225 388 389 __cfa_io_stop( this ); 390 226 391 post( this->terminated ); 392 227 393 228 394 if(this == mainProcessor) { … … 247 413 /* paranoid */ verifyf( thrd_dst->link.next == 0p, "Expected null got %p", thrd_dst->link.next ); 248 414 __builtin_prefetch( thrd_dst->context.SP ); 415 416 __cfadbg_print_safe(runtime_core, "Kernel : core %p running thread %p (%s)\n", this, thrd_dst, thrd_dst->self_cor.name); 249 417 250 418 $coroutine * proc_cor = get_coroutine(this->runner); … … 297 465 if(unlikely(thrd_dst->preempted != __NO_PREEMPTION)) { 298 466 // The thread was preempted, reschedule it and reset the flag 299 __schedule_thread( thrd_dst );467 schedule_thread$( thrd_dst ); 300 468 break RUNNING; 301 469 } … … 318 486 break RUNNING; 319 487 case TICKET_UNBLOCK: 488 #if !defined(__CFA_NO_STATISTICS__) 489 __tls_stats()->ready.threads.threads++; 490 __push_stat( __tls_stats(), __tls_stats()->ready.threads.threads, false, "Processor", this ); 491 #endif 320 492 // This is case 2, the racy case, someone tried to run this thread before it finished blocking 321 493 // In this case, just run it again. … … 330 502 proc_cor->state = Active; 331 503 504 __cfadbg_print_safe(runtime_core, "Kernel : core %p finished running thread %p\n", this, thrd_dst); 505 506 #if !defined(__CFA_NO_STATISTICS__) 507 __tls_stats()->ready.threads.threads--; 508 __push_stat( __tls_stats(), __tls_stats()->ready.threads.threads, false, "Processor", this ); 509 #endif 510 332 511 /* paranoid */ verify( ! __preemption_enabled() ); 333 512 } … … 339 518 $thread * thrd_src = kernelTLS().this_thread; 340 519 341 #if !defined(__CFA_NO_STATISTICS__) 342 struct processor * last_proc = kernelTLS().this_processor; 343 #endif 520 __STATS( thrd_src->last_proc = kernelTLS().this_processor; ) 344 521 345 522 // Run the thread on this processor … … 360 537 361 538 #if !defined(__CFA_NO_STATISTICS__) 362 if(last_proc != kernelTLS().this_processor) { 539 /* paranoid */ verify( thrd_src->last_proc != 0p ); 540 if(thrd_src->last_proc != kernelTLS().this_processor) { 363 541 __tls_stats()->ready.threads.migration++; 364 542 } … … 373 551 // Scheduler routines 374 552 // KERNEL ONLY 375 void __schedule_thread( $thread * thrd ) {553 static void __schedule_thread( $thread * thrd ) { 376 554 /* paranoid */ verify( ! __preemption_enabled() ); 377 555 /* paranoid */ verify( kernelTLS().this_proc_id ); 556 /* paranoid */ verify( ready_schedule_islocked()); 378 557 /* paranoid */ verify( thrd ); 379 558 /* paranoid */ verify( thrd->state != Halted ); … … 391 570 if (thrd->preempted == __NO_PREEMPTION) thrd->state = Ready; 392 571 572 // Dereference the thread now because once we push it, there is not guaranteed it's still valid. 573 struct cluster * cl = thrd->curr_cluster; 574 __STATS(bool outside = thrd->last_proc && thrd->last_proc != kernelTLS().this_processor; ) 575 576 // push the thread to the cluster ready-queue 577 push( cl, thrd ); 578 579 // variable thrd is no longer safe to use 580 thrd = 0xdeaddeaddeaddeadp; 581 582 // wake the cluster using the save variable. 583 __wake_one( cl ); 584 585 #if !defined(__CFA_NO_STATISTICS__) 586 if( kernelTLS().this_stats ) { 587 __tls_stats()->ready.threads.threads++; 588 if(outside) { 589 __tls_stats()->ready.threads.extunpark++; 590 } 591 __push_stat( __tls_stats(), __tls_stats()->ready.threads.threads, false, "Processor", kernelTLS().this_processor ); 592 } 593 else { 594 __atomic_fetch_add(&cl->stats->ready.threads.threads, 1, __ATOMIC_RELAXED); 595 __atomic_fetch_add(&cl->stats->ready.threads.extunpark, 1, __ATOMIC_RELAXED); 596 __push_stat( cl->stats, cl->stats->ready.threads.threads, true, "Cluster", cl ); 597 } 598 #endif 599 600 /* paranoid */ verify( ready_schedule_islocked()); 601 /* paranoid */ verify( ! __preemption_enabled() ); 602 } 603 604 void schedule_thread$( $thread * thrd ) { 393 605 ready_schedule_lock(); 394 // Dereference the thread now because once we push it, there is not guaranteed it's still valid. 395 struct cluster * cl = thrd->curr_cluster; 396 397 // push the thread to the cluster ready-queue 398 push( cl, thrd ); 399 400 // variable thrd is no longer safe to use 401 402 // wake the cluster using the save variable. 403 __wake_one( cl ); 606 __schedule_thread( thrd ); 404 607 ready_schedule_unlock(); 405 406 /* paranoid */ verify( ! __preemption_enabled() );407 608 } 408 609 … … 413 614 414 615 ready_schedule_lock(); 415 $thread * thrd = pop ( this );616 $thread * thrd = pop_fast( this ); 416 617 ready_schedule_unlock(); 417 618 … … 427 628 428 629 ready_schedule_lock(); 429 $thread * thrd = pop_slow( this ); 630 $thread * thrd; 631 for(25) { 632 thrd = pop_slow( this ); 633 if(thrd) goto RET; 634 } 635 thrd = pop_search( this ); 636 637 RET: 430 638 ready_schedule_unlock(); 431 639 … … 435 643 } 436 644 437 void unpark( $thread * thrd ) { 438 if( !thrd ) return; 439 645 static inline bool __must_unpark( $thread * thrd ) { 440 646 int old_ticket = __atomic_fetch_add(&thrd->ticket, 1, __ATOMIC_SEQ_CST); 441 647 switch(old_ticket) { 442 648 case TICKET_RUNNING: 443 649 // Wake won the race, the thread will reschedule/rerun itself 444 break;650 return false; 445 651 case TICKET_BLOCKED: 446 652 /* paranoid */ verify( ! thrd->preempted != __NO_PREEMPTION ); 447 653 /* paranoid */ verify( thrd->state == Blocked ); 448 449 { 450 /* paranoid */ verify( publicTLS_get(this_proc_id) ); 451 bool full = publicTLS_get(this_proc_id)->full_proc; 452 if(full) disable_interrupts(); 453 454 /* paranoid */ verify( ! __preemption_enabled() ); 455 456 // Wake lost the race, 457 __schedule_thread( thrd ); 458 459 /* paranoid */ verify( ! __preemption_enabled() ); 460 461 if(full) enable_interrupts( __cfaabi_dbg_ctx ); 462 /* paranoid */ verify( publicTLS_get(this_proc_id) ); 463 } 464 465 break; 654 return true; 466 655 default: 467 656 // This makes no sense, something is wrong abort … … 470 659 } 471 660 661 void __kernel_unpark( $thread * thrd ) { 662 /* paranoid */ verify( ! __preemption_enabled() ); 663 /* paranoid */ verify( ready_schedule_islocked()); 664 665 if( !thrd ) return; 666 667 if(__must_unpark(thrd)) { 668 // Wake lost the race, 669 __schedule_thread( thrd ); 670 } 671 672 /* paranoid */ verify( ready_schedule_islocked()); 673 /* paranoid */ verify( ! __preemption_enabled() ); 674 } 675 676 void unpark( $thread * thrd ) { 677 if( !thrd ) return; 678 679 if(__must_unpark(thrd)) { 680 disable_interrupts(); 681 // Wake lost the race, 682 schedule_thread$( thrd ); 683 enable_interrupts(false); 684 } 685 } 686 472 687 void park( void ) { 473 /* paranoid */ verify( __preemption_enabled() ); 474 disable_interrupts(); 475 /* paranoid */ verify( ! __preemption_enabled() ); 476 /* paranoid */ verify( kernelTLS().this_thread->preempted == __NO_PREEMPTION ); 477 478 returnToKernel(); 479 480 /* paranoid */ verify( ! __preemption_enabled() ); 481 enable_interrupts( __cfaabi_dbg_ctx ); 482 /* paranoid */ verify( __preemption_enabled() ); 688 __disable_interrupts_checked(); 689 /* paranoid */ verify( kernelTLS().this_thread->preempted == __NO_PREEMPTION ); 690 returnToKernel(); 691 __enable_interrupts_checked(); 483 692 484 693 } … … 520 729 // KERNEL ONLY 521 730 bool force_yield( __Preemption_Reason reason ) { 522 /* paranoid */ verify( __preemption_enabled() ); 523 disable_interrupts(); 524 /* paranoid */ verify( ! __preemption_enabled() ); 525 526 $thread * thrd = kernelTLS().this_thread; 527 /* paranoid */ verify(thrd->state == Active); 528 529 // SKULLDUGGERY: It is possible that we are preempting this thread just before 530 // it was going to park itself. If that is the case and it is already using the 531 // intrusive fields then we can't use them to preempt the thread 532 // If that is the case, abandon the preemption. 533 bool preempted = false; 534 if(thrd->link.next == 0p) { 535 preempted = true; 536 thrd->preempted = reason; 537 returnToKernel(); 538 } 539 540 /* paranoid */ verify( ! __preemption_enabled() ); 541 enable_interrupts_noPoll(); 542 /* paranoid */ verify( __preemption_enabled() ); 543 731 __disable_interrupts_checked(); 732 $thread * thrd = kernelTLS().this_thread; 733 /* paranoid */ verify(thrd->state == Active); 734 735 // SKULLDUGGERY: It is possible that we are preempting this thread just before 736 // it was going to park itself. If that is the case and it is already using the 737 // intrusive fields then we can't use them to preempt the thread 738 // If that is the case, abandon the preemption. 739 bool preempted = false; 740 if(thrd->link.next == 0p) { 741 preempted = true; 742 thrd->preempted = reason; 743 returnToKernel(); 744 } 745 __enable_interrupts_checked( false ); 544 746 return preempted; 545 747 } … … 557 759 unsigned idle; 558 760 unsigned total; 559 [idle, total, p] = query (this->idles);761 [idle, total, p] = query_idles(this->procs); 560 762 561 763 // If no one is sleeping, we are done … … 563 765 564 766 // We found a processor, wake it up 565 post( p->idle ); 767 eventfd_t val; 768 val = 1; 769 eventfd_write( p->idle, val ); 566 770 567 771 #if !defined(__CFA_NO_STATISTICS__) 568 __tls_stats()->ready.sleep.wakes++; 772 if( kernelTLS().this_stats ) { 773 __tls_stats()->ready.sleep.wakes++; 774 } 775 else { 776 __atomic_fetch_add(&this->stats->ready.sleep.wakes, 1, __ATOMIC_RELAXED); 777 } 569 778 #endif 570 779 … … 579 788 __cfadbg_print_safe(runtime_core, "Kernel : waking Processor %p\n", this); 580 789 581 disable_interrupts();790 __disable_interrupts_checked(); 582 791 /* paranoid */ verify( ! __preemption_enabled() ); 583 post( this->idle ); 584 enable_interrupts( __cfaabi_dbg_ctx ); 585 } 586 587 static void push (__cluster_idles & this, processor & proc) { 792 eventfd_t val; 793 val = 1; 794 eventfd_write( this->idle, val ); 795 __enable_interrupts_checked(); 796 } 797 798 static void mark_idle(__cluster_proc_list & this, processor & proc) { 588 799 /* paranoid */ verify( ! __preemption_enabled() ); 589 800 lock( this ); 590 801 this.idle++; 591 802 /* paranoid */ verify( this.idle <= this.total ); 592 593 insert_first(this. list, proc);803 remove(proc); 804 insert_first(this.idles, proc); 594 805 unlock( this ); 595 806 /* paranoid */ verify( ! __preemption_enabled() ); 596 807 } 597 808 598 static void remove(__cluster_idles& this, processor & proc) {809 static void mark_awake(__cluster_proc_list & this, processor & proc) { 599 810 /* paranoid */ verify( ! __preemption_enabled() ); 600 811 lock( this ); 601 812 this.idle--; 602 813 /* paranoid */ verify( this.idle >= 0 ); 603 604 814 remove(proc); 815 insert_last(this.actives, proc); 605 816 unlock( this ); 606 817 /* paranoid */ verify( ! __preemption_enabled() ); 607 818 } 608 819 609 static [unsigned idle, unsigned total, * processor] query( & __cluster_idles this ) { 820 static [unsigned idle, unsigned total, * processor] query_idles( & __cluster_proc_list this ) { 821 /* paranoid */ verify( ! __preemption_enabled() ); 822 /* paranoid */ verify( ready_schedule_islocked() ); 823 610 824 for() { 611 825 uint64_t l = __atomic_load_n(&this.lock, __ATOMIC_SEQ_CST); … … 613 827 unsigned idle = this.idle; 614 828 unsigned total = this.total; 615 processor * proc = &this. list`first;829 processor * proc = &this.idles`first; 616 830 // Compiler fence is unnecessary, but gcc-8 and older incorrectly reorder code without it 617 831 asm volatile("": : :"memory"); … … 619 833 return [idle, total, proc]; 620 834 } 835 836 /* paranoid */ verify( ready_schedule_islocked() ); 837 /* paranoid */ verify( ! __preemption_enabled() ); 621 838 } 622 839 … … 664 881 // Kernel Utilities 665 882 //============================================================================================= 883 #if defined(CFA_HAVE_LINUX_IO_URING_H) 884 #include "io/types.hfa" 885 #endif 886 887 static inline bool __maybe_io_drain( processor * proc ) { 888 bool ret = false; 889 #if defined(CFA_HAVE_LINUX_IO_URING_H) 890 __cfadbg_print_safe(runtime_core, "Kernel : core %p checking io for ring %d\n", proc, proc->io.ctx->fd); 891 892 // Check if we should drain the queue 893 $io_context * ctx = proc->io.ctx; 894 unsigned head = *ctx->cq.head; 895 unsigned tail = *ctx->cq.tail; 896 if(head == tail) return false; 897 ready_schedule_lock(); 898 ret = __cfa_io_drain( proc ); 899 ready_schedule_unlock(); 900 #endif 901 return ret; 902 } 903 666 904 //----------------------------------------------------------------------------- 667 905 // Debug … … 691 929 __print_stats( this.stats, this.print_stats, "Cluster", this.name, (void*)&this ); 692 930 } 693 694 extern int __print_alarm_stats;695 void print_alarm_stats() {696 __print_alarm_stats = -1;697 }698 931 #endif 699 932 // Local Variables: //
Note:
See TracChangeset
for help on using the changeset viewer.