- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/kernel.cfa
rc9c1c1cb rae7adbc4 42 42 43 43 #if !defined(__CFA_NO_STATISTICS__) 44 #define __STATS _DEF( ...) __VA_ARGS__44 #define __STATS( ...) __VA_ARGS__ 45 45 #else 46 #define __STATS _DEF( ...)46 #define __STATS( ...) 47 47 #endif 48 48 … … 122 122 static thread$ * __next_thread(cluster * this); 123 123 static thread$ * __next_thread_slow(cluster * this); 124 static thread$ * __next_thread_search(cluster * this);125 124 static inline bool __must_unpark( thread$ * thrd ) __attribute((nonnull(1))); 126 125 static void __run_thread(processor * this, thread$ * dst); … … 188 187 MAIN_LOOP: 189 188 for() { 189 #define OLD_MAIN 1 190 #if OLD_MAIN 190 191 // Check if there is pending io 191 192 __maybe_io_drain( this ); … … 195 196 196 197 if( !readyThread ) { 197 __IO_STATS__(true, io.flush.idle++; )198 198 __cfa_io_flush( this, 0 ); 199 199 200 readyThread = __next_thread( this->cltr );201 }202 203 if( !readyThread ) for(5) {204 __IO_STATS__(true, io.flush.idle++; )205 206 200 readyThread = __next_thread_slow( this->cltr ); 207 208 if( readyThread ) break;209 210 __cfa_io_flush( this, 0 );211 201 } 212 202 … … 216 206 if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP; 217 207 208 #if !defined(__CFA_NO_STATISTICS__) 209 __tls_stats()->ready.sleep.halts++; 210 #endif 211 218 212 // Push self to idle stack 219 213 if(!mark_idle(this->cltr->procs, * this)) continue MAIN_LOOP; 220 214 221 215 // Confirm the ready-queue is empty 222 readyThread = __next_thread_s earch( this->cltr );216 readyThread = __next_thread_slow( this->cltr ); 223 217 if( readyThread ) { 224 218 // A thread was found, cancel the halt 225 219 mark_awake(this->cltr->procs, * this); 226 220 227 __STATS__(true, ready.sleep.cancels++; ) 221 #if !defined(__CFA_NO_STATISTICS__) 222 __tls_stats()->ready.sleep.cancels++; 223 #endif 228 224 229 225 // continue the mai loop … … 252 248 253 249 if(this->io.pending && !this->io.dirty) { 254 __IO_STATS__(true, io.flush.dirty++; )255 250 __cfa_io_flush( this, 0 ); 256 251 } 252 253 #else 254 #warning new kernel loop 255 SEARCH: { 256 /* paranoid */ verify( ! __preemption_enabled() ); 257 258 // First, lock the scheduler since we are searching for a thread 259 ready_schedule_lock(); 260 261 // Try to get the next thread 262 readyThread = pop_fast( this->cltr ); 263 if(readyThread) { ready_schedule_unlock(); break SEARCH; } 264 265 // If we can't find a thread, might as well flush any outstanding I/O 266 if(this->io.pending) { __cfa_io_flush( this, 0 ); } 267 268 // Spin a little on I/O, just in case 269 for(5) { 270 __maybe_io_drain( this ); 271 readyThread = pop_fast( this->cltr ); 272 if(readyThread) { ready_schedule_unlock(); break SEARCH; } 273 } 274 275 // no luck, try stealing a few times 276 for(5) { 277 if( __maybe_io_drain( this ) ) { 278 readyThread = pop_fast( this->cltr ); 279 } else { 280 readyThread = pop_slow( this->cltr ); 281 } 282 if(readyThread) { ready_schedule_unlock(); break SEARCH; } 283 } 284 285 // still no luck, search for a thread 286 readyThread = pop_search( this->cltr ); 287 if(readyThread) { ready_schedule_unlock(); break SEARCH; } 288 289 // Don't block if we are done 290 if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) { 291 ready_schedule_unlock(); 292 break MAIN_LOOP; 293 } 294 295 __STATS( __tls_stats()->ready.sleep.halts++; ) 296 297 // Push self to idle stack 298 ready_schedule_unlock(); 299 if(!mark_idle(this->cltr->procs, * this)) goto SEARCH; 300 ready_schedule_lock(); 301 302 // Confirm the ready-queue is empty 303 __maybe_io_drain( this ); 304 readyThread = pop_search( this->cltr ); 305 ready_schedule_unlock(); 306 307 if( readyThread ) { 308 // A thread was found, cancel the halt 309 mark_awake(this->cltr->procs, * this); 310 311 __STATS( __tls_stats()->ready.sleep.cancels++; ) 312 313 // continue the main loop 314 break SEARCH; 315 } 316 317 __STATS( if(this->print_halts) __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 0\n", this->unique_id, rdtscl()); ) 318 __cfadbg_print_safe(runtime_core, "Kernel : core %p waiting on eventfd %d\n", this, this->idle_fd); 319 320 { 321 eventfd_t val; 322 ssize_t ret = read( this->idle_fd, &val, sizeof(val) ); 323 if(ret < 0) { 324 switch((int)errno) { 325 case EAGAIN: 326 #if EAGAIN != EWOULDBLOCK 327 case EWOULDBLOCK: 328 #endif 329 case EINTR: 330 // No need to do anything special here, just assume it's a legitimate wake-up 331 break; 332 default: 333 abort( "KERNEL : internal error, read failure on idle eventfd, error(%d) %s.", (int)errno, strerror( (int)errno ) ); 334 } 335 } 336 } 337 338 __STATS( if(this->print_halts) __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 1\n", this->unique_id, rdtscl()); ) 339 340 // We were woken up, remove self from idle 341 mark_awake(this->cltr->procs, * this); 342 343 // DON'T just proceed, start looking again 344 continue MAIN_LOOP; 345 } 346 347 RUN_THREAD: 348 /* paranoid */ verify( ! __preemption_enabled() ); 349 /* paranoid */ verify( readyThread ); 350 351 // Reset io dirty bit 352 this->io.dirty = false; 353 354 // We found a thread run it 355 __run_thread(this, readyThread); 356 357 // Are we done? 358 if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP; 359 360 if(this->io.pending && !this->io.dirty) { 361 __cfa_io_flush( this, 0 ); 362 } 363 364 ready_schedule_lock(); 365 __maybe_io_drain( this ); 366 ready_schedule_unlock(); 367 #endif 257 368 } 258 369 … … 365 476 break RUNNING; 366 477 case TICKET_UNBLOCK: 367 __STATS__(true, ready.threads.threads++; ) 478 #if !defined(__CFA_NO_STATISTICS__) 479 __tls_stats()->ready.threads.threads++; 480 #endif 368 481 // This is case 2, the racy case, someone tried to run this thread before it finished blocking 369 482 // In this case, just run it again. … … 380 493 __cfadbg_print_safe(runtime_core, "Kernel : core %p finished running thread %p\n", this, thrd_dst); 381 494 382 __STATS__(true, ready.threads.threads--; ) 495 #if !defined(__CFA_NO_STATISTICS__) 496 __tls_stats()->ready.threads.threads--; 497 #endif 383 498 384 499 /* paranoid */ verify( ! __preemption_enabled() ); … … 391 506 thread$ * thrd_src = kernelTLS().this_thread; 392 507 393 __STATS _DEF( thrd_src->last_proc = kernelTLS().this_processor; )508 __STATS( thrd_src->last_proc = kernelTLS().this_processor; ) 394 509 395 510 // Run the thread on this processor … … 439 554 /* paranoid */ verify( 0x0D15EA5E0D15EA5Ep == thrd->canary ); 440 555 556 const bool local = thrd->state != Start; 441 557 if (thrd->preempted == __NO_PREEMPTION) thrd->state = Ready; 442 558 443 559 // Dereference the thread now because once we push it, there is not guaranteed it's still valid. 444 560 struct cluster * cl = thrd->curr_cluster; 445 __STATS _DEF(bool outside = hint == UNPARK_LOCAL && thrd->last_proc && thrd->last_proc != kernelTLS().this_processor; )561 __STATS(bool outside = hint == UNPARK_LOCAL && thrd->last_proc && thrd->last_proc != kernelTLS().this_processor; ) 446 562 447 563 // push the thread to the cluster ready-queue … … 494 610 495 611 ready_schedule_lock(); 496 thread$ * thrd = pop_slow( this ); 497 ready_schedule_unlock(); 498 499 /* paranoid */ verify( ! __preemption_enabled() ); 500 return thrd; 501 } 502 503 // KERNEL ONLY 504 static inline thread$ * __next_thread_search(cluster * this) with( *this ) { 505 /* paranoid */ verify( ! __preemption_enabled() ); 506 507 ready_schedule_lock(); 508 thread$ * thrd = pop_search( this ); 612 thread$ * thrd; 613 for(25) { 614 thrd = pop_slow( this ); 615 if(thrd) goto RET; 616 } 617 thrd = pop_search( this ); 618 619 RET: 509 620 ready_schedule_unlock(); 510 621 … … 622 733 // Wake a thread from the front if there are any 623 734 static void __wake_one(cluster * this) { 735 /* paranoid */ verify( ! __preemption_enabled() ); 736 /* paranoid */ verify( ready_schedule_islocked() ); 737 738 // Check if there is a sleeping processor 739 // int fd = __atomic_load_n(&this->procs.fd, __ATOMIC_SEQ_CST); 740 int fd = 0; 741 if( __atomic_load_n(&this->procs.fd, __ATOMIC_SEQ_CST) != 0 ) { 742 fd = __atomic_exchange_n(&this->procs.fd, 0, __ATOMIC_RELAXED); 743 } 744 745 // If no one is sleeping, we are done 746 if( fd == 0 ) return; 747 748 // We found a processor, wake it up 624 749 eventfd_t val; 625 626 /* paranoid */ verify( ! __preemption_enabled() ); 627 /* paranoid */ verify( ready_schedule_islocked() ); 628 629 // Check if there is a sleeping processor 630 struct __fd_waitctx * fdp = __atomic_load_n(&this->procs.fdw, __ATOMIC_SEQ_CST); 631 632 // If no one is sleeping: we are done 633 if( fdp == 0p ) return; 634 635 int fd = 1; 636 if( __atomic_load_n(&fdp->fd, __ATOMIC_SEQ_CST) != 1 ) { 637 fd = __atomic_exchange_n(&fdp->fd, 1, __ATOMIC_RELAXED); 638 } 639 640 switch(fd) { 641 case 0: 642 // If the processor isn't ready to sleep then the exchange will already wake it up 643 #if !defined(__CFA_NO_STATISTICS__) 644 if( kernelTLS().this_stats ) { __tls_stats()->ready.sleep.early++; 645 } else { __atomic_fetch_add(&this->stats->ready.sleep.early, 1, __ATOMIC_RELAXED); } 646 #endif 647 break; 648 case 1: 649 // If someone else already said they will wake them: we are done 650 #if !defined(__CFA_NO_STATISTICS__) 651 if( kernelTLS().this_stats ) { __tls_stats()->ready.sleep.seen++; 652 } else { __atomic_fetch_add(&this->stats->ready.sleep.seen, 1, __ATOMIC_RELAXED); } 653 #endif 654 break; 655 default: 656 // If the processor was ready to sleep, we need to wake it up with an actual write 657 val = 1; 658 eventfd_write( fd, val ); 659 660 #if !defined(__CFA_NO_STATISTICS__) 661 if( kernelTLS().this_stats ) { __tls_stats()->ready.sleep.wakes++; 662 } else { __atomic_fetch_add(&this->stats->ready.sleep.wakes, 1, __ATOMIC_RELAXED); } 663 #endif 664 break; 665 } 750 val = 1; 751 eventfd_write( fd, val ); 752 753 #if !defined(__CFA_NO_STATISTICS__) 754 if( kernelTLS().this_stats ) { 755 __tls_stats()->ready.sleep.wakes++; 756 } 757 else { 758 __atomic_fetch_add(&this->stats->ready.sleep.wakes, 1, __ATOMIC_RELAXED); 759 } 760 #endif 666 761 667 762 /* paranoid */ verify( ready_schedule_islocked() ); … … 676 771 677 772 __cfadbg_print_safe(runtime_core, "Kernel : waking Processor %p\n", this); 678 679 this->idle_wctx.fd = 1;680 773 681 774 eventfd_t val; … … 687 780 688 781 static void idle_sleep(processor * this, io_future_t & future, iovec & iov) { 689 // Tell everyone we are ready to go do sleep690 for() {691 int expected = this->idle_wctx.fd;692 693 // Someone already told us to wake-up! No time for a nap.694 if(expected == 1) { return; }695 696 // Try to mark that we are going to sleep697 if(__atomic_compare_exchange_n(&this->idle_wctx.fd, &expected, this->idle_fd, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) {698 // Every one agreed, taking a nap699 break;700 }701 }702 703 704 782 #if !defined(CFA_WITH_IO_URING_IDLE) 705 783 #if !defined(__CFA_NO_STATISTICS__) … … 748 826 749 827 static bool mark_idle(__cluster_proc_list & this, processor & proc) { 750 __STATS__(true, ready.sleep.halts++; )751 752 proc.idle_wctx.fd = 0;753 754 828 /* paranoid */ verify( ! __preemption_enabled() ); 755 829 if(!try_lock( this )) return false; … … 759 833 insert_first(this.idles, proc); 760 834 761 __atomic_store_n(&this.fd w, &proc.idle_wctx, __ATOMIC_SEQ_CST);835 __atomic_store_n(&this.fd, proc.idle_fd, __ATOMIC_SEQ_CST); 762 836 unlock( this ); 763 837 /* paranoid */ verify( ! __preemption_enabled() ); … … 775 849 776 850 { 777 struct __fd_waitctx * wctx= 0;778 if(!this.idles`isEmpty) wctx = &this.idles`first.idle_wctx;779 __atomic_store_n(&this.fd w, wctx, __ATOMIC_SEQ_CST);851 int fd = 0; 852 if(!this.idles`isEmpty) fd = this.idles`first.idle_fd; 853 __atomic_store_n(&this.fd, fd, __ATOMIC_SEQ_CST); 780 854 } 781 855 … … 841 915 unsigned tail = *ctx->cq.tail; 842 916 if(head == tail) return false; 843 ready_schedule_lock(); 844 ret = __cfa_io_drain( proc ); 845 ready_schedule_unlock(); 917 #if OLD_MAIN 918 ready_schedule_lock(); 919 ret = __cfa_io_drain( proc ); 920 ready_schedule_unlock(); 921 #else 922 ret = __cfa_io_drain( proc ); 923 #endif 846 924 #endif 847 925 return ret;
Note:
See TracChangeset
for help on using the changeset viewer.