Changeset b56ad5e for libcfa/src/concurrency/kernel.cfa
- Timestamp:
- Feb 4, 2022, 10:10:34 PM (4 years ago)
- Branches:
- ADT, ast-experimental, enum, forall-pointer-decay, master, pthread-emulation, qualifiedEnum
- Children:
- f8143a6
- Parents:
- 5f3ba11 (diff), 67e86ae6 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/kernel.cfa
r5f3ba11 rb56ad5e 42 42 43 43 #if !defined(__CFA_NO_STATISTICS__) 44 #define __STATS ( ...) __VA_ARGS__44 #define __STATS_DEF( ...) __VA_ARGS__ 45 45 #else 46 #define __STATS ( ...)46 #define __STATS_DEF( ...) 47 47 #endif 48 48 … … 122 122 static thread$ * __next_thread(cluster * this); 123 123 static thread$ * __next_thread_slow(cluster * this); 124 static thread$ * __next_thread_search(cluster * this); 124 125 static inline bool __must_unpark( thread$ * thrd ) __attribute((nonnull(1))); 125 126 static void __run_thread(processor * this, thread$ * dst); … … 187 188 MAIN_LOOP: 188 189 for() { 189 #define OLD_MAIN 1190 #if OLD_MAIN191 190 // Check if there is pending io 192 191 __maybe_io_drain( this ); … … 196 195 197 196 if( !readyThread ) { 197 __IO_STATS__(true, io.flush.idle++; ) 198 198 __cfa_io_flush( this, 0 ); 199 199 200 readyThread = __next_thread( this->cltr ); 201 } 202 203 if( !readyThread ) for(5) { 204 __IO_STATS__(true, io.flush.idle++; ) 205 200 206 readyThread = __next_thread_slow( this->cltr ); 207 208 if( readyThread ) break; 209 210 __cfa_io_flush( this, 0 ); 201 211 } 202 212 … … 206 216 if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP; 207 217 208 #if !defined(__CFA_NO_STATISTICS__)209 __tls_stats()->ready.sleep.halts++;210 #endif211 212 218 // Push self to idle stack 213 219 if(!mark_idle(this->cltr->procs, * this)) continue MAIN_LOOP; 214 220 215 221 // Confirm the ready-queue is empty 216 readyThread = __next_thread_s low( this->cltr );222 readyThread = __next_thread_search( this->cltr ); 217 223 if( readyThread ) { 218 224 // A thread was found, cancel the halt 219 225 mark_awake(this->cltr->procs, * this); 220 226 221 #if !defined(__CFA_NO_STATISTICS__) 222 __tls_stats()->ready.sleep.cancels++; 223 #endif 227 __STATS__(true, ready.sleep.cancels++; ) 224 228 225 229 // continue the mai loop … … 248 252 249 253 if(this->io.pending && !this->io.dirty) { 254 __IO_STATS__(true, io.flush.dirty++; ) 250 255 __cfa_io_flush( this, 0 ); 251 256 } 252 253 #else254 #warning new kernel loop255 SEARCH: {256 /* paranoid */ verify( ! __preemption_enabled() );257 258 // First, lock the scheduler since we are searching for a thread259 ready_schedule_lock();260 261 // Try to get the next thread262 readyThread = pop_fast( this->cltr );263 if(readyThread) { ready_schedule_unlock(); break SEARCH; }264 265 // If we can't find a thread, might as well flush any outstanding I/O266 if(this->io.pending) { __cfa_io_flush( this, 0 ); }267 268 // Spin a little on I/O, just in case269 for(5) {270 __maybe_io_drain( this );271 readyThread = pop_fast( this->cltr );272 if(readyThread) { ready_schedule_unlock(); break SEARCH; }273 }274 275 // no luck, try stealing a few times276 for(5) {277 if( __maybe_io_drain( this ) ) {278 readyThread = pop_fast( this->cltr );279 } else {280 readyThread = pop_slow( this->cltr );281 }282 if(readyThread) { ready_schedule_unlock(); break SEARCH; }283 }284 285 // still no luck, search for a thread286 readyThread = pop_search( this->cltr );287 if(readyThread) { ready_schedule_unlock(); break SEARCH; }288 289 // Don't block if we are done290 if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) {291 ready_schedule_unlock();292 break MAIN_LOOP;293 }294 295 __STATS( __tls_stats()->ready.sleep.halts++; )296 297 // Push self to idle stack298 ready_schedule_unlock();299 if(!mark_idle(this->cltr->procs, * this)) goto SEARCH;300 ready_schedule_lock();301 302 // Confirm the ready-queue is empty303 __maybe_io_drain( this );304 readyThread = pop_search( this->cltr );305 ready_schedule_unlock();306 307 if( readyThread ) {308 // A thread was found, cancel the halt309 mark_awake(this->cltr->procs, * this);310 311 __STATS( __tls_stats()->ready.sleep.cancels++; )312 313 // continue the main loop314 break SEARCH;315 }316 317 __STATS( if(this->print_halts) __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 0\n", this->unique_id, rdtscl()); )318 __cfadbg_print_safe(runtime_core, "Kernel : core %p waiting on eventfd %d\n", this, this->idle_fd);319 320 {321 eventfd_t val;322 ssize_t ret = read( this->idle_fd, &val, sizeof(val) );323 if(ret < 0) {324 switch((int)errno) {325 case EAGAIN:326 #if EAGAIN != EWOULDBLOCK327 case EWOULDBLOCK:328 #endif329 case EINTR:330 // No need to do anything special here, just assume it's a legitimate wake-up331 break;332 default:333 abort( "KERNEL : internal error, read failure on idle eventfd, error(%d) %s.", (int)errno, strerror( (int)errno ) );334 }335 }336 }337 338 __STATS( if(this->print_halts) __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 1\n", this->unique_id, rdtscl()); )339 340 // We were woken up, remove self from idle341 mark_awake(this->cltr->procs, * this);342 343 // DON'T just proceed, start looking again344 continue MAIN_LOOP;345 }346 347 RUN_THREAD:348 /* paranoid */ verify( ! __preemption_enabled() );349 /* paranoid */ verify( readyThread );350 351 // Reset io dirty bit352 this->io.dirty = false;353 354 // We found a thread run it355 __run_thread(this, readyThread);356 357 // Are we done?358 if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP;359 360 if(this->io.pending && !this->io.dirty) {361 __cfa_io_flush( this, 0 );362 }363 364 ready_schedule_lock();365 __maybe_io_drain( this );366 ready_schedule_unlock();367 #endif368 257 } 369 258 … … 476 365 break RUNNING; 477 366 case TICKET_UNBLOCK: 478 #if !defined(__CFA_NO_STATISTICS__) 479 __tls_stats()->ready.threads.threads++; 480 #endif 367 __STATS__(true, ready.threads.threads++; ) 481 368 // This is case 2, the racy case, someone tried to run this thread before it finished blocking 482 369 // In this case, just run it again. … … 493 380 __cfadbg_print_safe(runtime_core, "Kernel : core %p finished running thread %p\n", this, thrd_dst); 494 381 495 #if !defined(__CFA_NO_STATISTICS__) 496 __tls_stats()->ready.threads.threads--; 497 #endif 382 __STATS__(true, ready.threads.threads--; ) 498 383 499 384 /* paranoid */ verify( ! __preemption_enabled() ); … … 506 391 thread$ * thrd_src = kernelTLS().this_thread; 507 392 508 __STATS ( thrd_src->last_proc = kernelTLS().this_processor; )393 __STATS_DEF( thrd_src->last_proc = kernelTLS().this_processor; ) 509 394 510 395 // Run the thread on this processor … … 558 443 // Dereference the thread now because once we push it, there is not guaranteed it's still valid. 559 444 struct cluster * cl = thrd->curr_cluster; 560 __STATS (bool outside = hint == UNPARK_LOCAL && thrd->last_proc && thrd->last_proc != kernelTLS().this_processor; )445 __STATS_DEF(bool outside = hint == UNPARK_LOCAL && thrd->last_proc && thrd->last_proc != kernelTLS().this_processor; ) 561 446 562 447 // push the thread to the cluster ready-queue … … 609 494 610 495 ready_schedule_lock(); 611 thread$ * thrd; 612 for(25) { 613 thrd = pop_slow( this ); 614 if(thrd) goto RET; 615 } 616 thrd = pop_search( this ); 617 618 RET: 496 thread$ * thrd = pop_slow( this ); 497 ready_schedule_unlock(); 498 499 /* paranoid */ verify( ! __preemption_enabled() ); 500 return thrd; 501 } 502 503 // KERNEL ONLY 504 static inline thread$ * __next_thread_search(cluster * this) with( *this ) { 505 /* paranoid */ verify( ! __preemption_enabled() ); 506 507 ready_schedule_lock(); 508 thread$ * thrd = pop_search( this ); 619 509 ready_schedule_unlock(); 620 510 … … 732 622 // Wake a thread from the front if there are any 733 623 static void __wake_one(cluster * this) { 624 eventfd_t val; 625 734 626 /* paranoid */ verify( ! __preemption_enabled() ); 735 627 /* paranoid */ verify( ready_schedule_islocked() ); 736 628 737 629 // Check if there is a sleeping processor 738 // int fd = __atomic_load_n(&this->procs.fd, __ATOMIC_SEQ_CST); 739 int fd = 0; 740 if( __atomic_load_n(&this->procs.fd, __ATOMIC_SEQ_CST) != 0 ) { 741 fd = __atomic_exchange_n(&this->procs.fd, 0, __ATOMIC_RELAXED); 742 } 743 744 // If no one is sleeping, we are done 745 if( fd == 0 ) return; 746 747 // We found a processor, wake it up 748 eventfd_t val; 749 val = 1; 750 eventfd_write( fd, val ); 751 752 #if !defined(__CFA_NO_STATISTICS__) 753 if( kernelTLS().this_stats ) { 754 __tls_stats()->ready.sleep.wakes++; 755 } 756 else { 757 __atomic_fetch_add(&this->stats->ready.sleep.wakes, 1, __ATOMIC_RELAXED); 758 } 759 #endif 630 struct __fd_waitctx * fdp = __atomic_load_n(&this->procs.fdw, __ATOMIC_SEQ_CST); 631 632 // If no one is sleeping: we are done 633 if( fdp == 0p ) return; 634 635 int fd = 1; 636 if( __atomic_load_n(&fdp->fd, __ATOMIC_SEQ_CST) != 1 ) { 637 fd = __atomic_exchange_n(&fdp->fd, 1, __ATOMIC_RELAXED); 638 } 639 640 switch(fd) { 641 case 0: 642 // If the processor isn't ready to sleep then the exchange will already wake it up 643 #if !defined(__CFA_NO_STATISTICS__) 644 if( kernelTLS().this_stats ) { __tls_stats()->ready.sleep.early++; 645 } else { __atomic_fetch_add(&this->stats->ready.sleep.early, 1, __ATOMIC_RELAXED); } 646 #endif 647 break; 648 case 1: 649 // If someone else already said they will wake them: we are done 650 #if !defined(__CFA_NO_STATISTICS__) 651 if( kernelTLS().this_stats ) { __tls_stats()->ready.sleep.seen++; 652 } else { __atomic_fetch_add(&this->stats->ready.sleep.seen, 1, __ATOMIC_RELAXED); } 653 #endif 654 break; 655 default: 656 // If the processor was ready to sleep, we need to wake it up with an actual write 657 val = 1; 658 eventfd_write( fd, val ); 659 660 #if !defined(__CFA_NO_STATISTICS__) 661 if( kernelTLS().this_stats ) { __tls_stats()->ready.sleep.wakes++; 662 } else { __atomic_fetch_add(&this->stats->ready.sleep.wakes, 1, __ATOMIC_RELAXED); } 663 #endif 664 break; 665 } 760 666 761 667 /* paranoid */ verify( ready_schedule_islocked() ); … … 770 676 771 677 __cfadbg_print_safe(runtime_core, "Kernel : waking Processor %p\n", this); 678 679 this->idle_wctx.fd = 1; 772 680 773 681 eventfd_t val; … … 779 687 780 688 static void idle_sleep(processor * this, io_future_t & future, iovec & iov) { 689 // Tell everyone we are ready to go do sleep 690 for() { 691 int expected = this->idle_wctx.fd; 692 693 // Someone already told us to wake-up! No time for a nap. 694 if(expected == 1) { return; } 695 696 // Try to mark that we are going to sleep 697 if(__atomic_compare_exchange_n(&this->idle_wctx.fd, &expected, this->idle_fd, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) { 698 // Every one agreed, taking a nap 699 break; 700 } 701 } 702 703 781 704 #if !defined(CFA_WITH_IO_URING_IDLE) 782 705 #if !defined(__CFA_NO_STATISTICS__) … … 825 748 826 749 static bool mark_idle(__cluster_proc_list & this, processor & proc) { 750 __STATS__(true, ready.sleep.halts++; ) 751 752 proc.idle_wctx.fd = 0; 753 827 754 /* paranoid */ verify( ! __preemption_enabled() ); 828 755 if(!try_lock( this )) return false; … … 832 759 insert_first(this.idles, proc); 833 760 834 __atomic_store_n(&this.fd , proc.idle_fd, __ATOMIC_SEQ_CST);761 __atomic_store_n(&this.fdw, &proc.idle_wctx, __ATOMIC_SEQ_CST); 835 762 unlock( this ); 836 763 /* paranoid */ verify( ! __preemption_enabled() ); … … 848 775 849 776 { 850 int fd= 0;851 if(!this.idles`isEmpty) fd = this.idles`first.idle_fd;852 __atomic_store_n(&this.fd , fd, __ATOMIC_SEQ_CST);777 struct __fd_waitctx * wctx = 0; 778 if(!this.idles`isEmpty) wctx = &this.idles`first.idle_wctx; 779 __atomic_store_n(&this.fdw, wctx, __ATOMIC_SEQ_CST); 853 780 } 854 781 … … 914 841 unsigned tail = *ctx->cq.tail; 915 842 if(head == tail) return false; 916 #if OLD_MAIN 917 ready_schedule_lock(); 918 ret = __cfa_io_drain( proc ); 919 ready_schedule_unlock(); 920 #else 921 ret = __cfa_io_drain( proc ); 922 #endif 843 ready_schedule_lock(); 844 ret = __cfa_io_drain( proc ); 845 ready_schedule_unlock(); 923 846 #endif 924 847 return ret;
Note:
See TracChangeset
for help on using the changeset viewer.