Changeset 9f575ea


Ignore:
Timestamp:
Feb 6, 2020, 10:23:27 AM (5 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, jenkins-sandbox, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
3381ed7
Parents:
4f7b418
Message:

First attempt at park/unpark

Location:
libcfa/src/concurrency
Files:
7 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/invoke.h

    r4f7b418 r9f575ea  
    9292        };
    9393
    94         enum coroutine_state { Halted, Start, Inactive, Active, Primed };
     94        enum coroutine_state { Halted, Start, Primed, Inactive, Active, Rerun, Reschedule };
    9595
    9696        struct coroutine_desc {
     
    164164
    165165                // current execution status for coroutine
    166                 enum coroutine_state state;
     166                volatile int state;
     167                int preempted;
    167168
    168169                //SKULLDUGGERY errno is not save in the thread data structure because returnToKernel appears to be the only function to require saving and restoring it
  • libcfa/src/concurrency/kernel.cfa

    r4f7b418 r9f575ea  
    327327        kernelTLS.this_thread = thrd_dst;
    328328
    329         // set state of processor coroutine to inactive and the thread to active
    330         proc_cor->state = proc_cor->state == Halted ? Halted : Inactive;
    331         thrd_dst->state = Active;
    332 
    333         // set context switch to the thread that the processor is executing
    334         verify( thrd_dst->context.SP );
    335         CtxSwitch( &proc_cor->context, &thrd_dst->context );
    336         // when CtxSwitch returns we are back in the processor coroutine
     329        // set state of processor coroutine to inactive
     330        verify(proc_cor->state == Active);
     331        proc_cor->state = Inactive;
     332
     333        // Actually run the thread
     334        RUN:
     335        {
     336                if(unlikely(thrd_dst->preempted)) {
     337                        thrd_dst->preempted = false;
     338                } else {
     339                        thrd_dst->state = Active;
     340                }
     341
     342                // set context switch to the thread that the processor is executing
     343                verify( thrd_dst->context.SP );
     344                CtxSwitch( &proc_cor->context, &thrd_dst->context );
     345                // when CtxSwitch returns we are back in the processor coroutine
     346        }
     347
     348        // We just finished running a thread, there are a few things that could have happened.
     349        // 1 - Regular case : the thread has blocked and now one has scheduled it yet.
     350        // 2 - Racy case    : the thread has blocked but someone has already tried to schedule it.
     351        // 3 - Polite Racy case : the thread has blocked, someone has already tried to schedule it, but the thread is nice and wants to go through the ready-queue any way
     352        // 4 - Preempted
     353        // In case 1, we may have won a race so we can't write to the state again.
     354        // In case 2, we lost the race so we now own the thread.
     355        // In case 3, we lost the race but can just reschedule the thread.
     356
     357        if(unlikely(thrd_dst->preempted)) {
     358                // The thread was preempted, reschedule it and reset the flag
     359                ScheduleThread( thrd_dst );
     360
     361                // Just before returning to the processor, set the processor coroutine to active
     362                proc_cor->state = Active;
     363                return;
     364        }
    337365
    338366        // set state of processor coroutine to active and the thread to inactive
    339         thrd_dst->state = thrd_dst->state == Halted ? Halted : Inactive;
     367        enum coroutine_state old_state = __atomic_exchange_n(&thrd_dst->state, Inactive, __ATOMIC_SEQ_CST);
     368        switch(old_state) {
     369                case Halted:
     370                        // The thread has halted, it should never be scheduled/run again, leave it back to Halted and move on
     371                        thrd_dst->state = Halted;
     372                        break;
     373                case Active:
     374                        // This is case 1, the regular case, nothing more is needed
     375                        break;
     376                case Rerun:
     377                        // This is case 2, the racy case, someone tried to run this thread before it finished blocking
     378                        // In this case, just run it again.
     379                        goto RUN;
     380                case Reschedule:
     381                        // This is case 3, someone tried to run this before it finished blocking
     382                        // but it must go through the ready-queue
     383                        thrd_dst->state = Inactive;  /*restore invariant */
     384                        ScheduleThread( thrd_dst );
     385                        break;
     386                case Inactive:
     387                case Start:
     388                case Primed:
     389                default:
     390                        // This makes no sense, something is wrong abort
     391                        abort("Finished running a thread that was Inactive/Start/Primed %d\n", old_state);
     392        }
     393
     394        // Just before returning to the processor, set the processor coroutine to active
    340395        proc_cor->state = Active;
    341396}
     
    343398// KERNEL_ONLY
    344399static void returnToKernel() {
     400        verify( ! kernelTLS.preemption_state.enabled );
    345401        coroutine_desc * proc_cor = get_coroutine(kernelTLS.this_processor->runner);
    346402        thread_desc * thrd_src = kernelTLS.this_thread;
    347403
    348         // set state of current coroutine to inactive
    349         thrd_src->state = thrd_src->state == Halted ? Halted : Inactive;
    350         proc_cor->state = Active;
    351         int local_errno = *__volatile_errno();
    352         #if defined( __i386 ) || defined( __x86_64 )
    353                 __x87_store;
    354         #endif
    355 
    356         // set new coroutine that the processor is executing
    357         // and context switch to it
    358         verify( proc_cor->context.SP );
    359         CtxSwitch( &thrd_src->context, &proc_cor->context );
    360 
    361         // set state of new coroutine to active
    362         proc_cor->state = proc_cor->state == Halted ? Halted : Inactive;
    363         thrd_src->state = Active;
    364 
    365         #if defined( __i386 ) || defined( __x86_64 )
    366                 __x87_load;
    367         #endif
    368         *__volatile_errno() = local_errno;
     404        // Run the thread on this processor
     405        {
     406                int local_errno = *__volatile_errno();
     407                #if defined( __i386 ) || defined( __x86_64 )
     408                        __x87_store;
     409                #endif
     410                verify( proc_cor->context.SP );
     411                CtxSwitch( &thrd_src->context, &proc_cor->context );
     412                #if defined( __i386 ) || defined( __x86_64 )
     413                        __x87_load;
     414                #endif
     415                *__volatile_errno() = local_errno;
     416        }
     417
     418        verify( ! kernelTLS.preemption_state.enabled );
    369419}
    370420
     
    374424static void finishRunning(processor * this) with( this->finish ) {
    375425        verify( ! kernelTLS.preemption_state.enabled );
     426        verify( action_code == No_Action );
    376427        choose( action_code ) {
    377428        case No_Action:
     
    532583
    533584// KERNEL ONLY
    534 void ScheduleThread( thread_desc * thrd ) {
    535         verify( thrd );
    536         verify( thrd->state != Halted );
    537 
    538         verify( ! kernelTLS.preemption_state.enabled );
    539 
    540         verifyf( thrd->next == 0p, "Expected null got %p", thrd->next );
    541 
    542         with( *thrd->curr_cluster ) {
    543                 lock  ( ready_queue_lock __cfaabi_dbg_ctx2 );
    544                 bool was_empty = !(ready_queue != 0);
    545                 append( ready_queue, thrd );
    546                 unlock( ready_queue_lock );
    547 
    548                 if(was_empty) {
    549                         lock      (proc_list_lock __cfaabi_dbg_ctx2);
    550                         if(idles) {
    551                                 wake_fast(idles.head);
    552                         }
    553                         unlock    (proc_list_lock);
     585void ScheduleThread( thread_desc * thrd ) with( *thrd->curr_cluster ) {
     586        /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
     587        /* paranoid */ verifyf( thrd->state == Inactive || thrd->state == Start || thrd->preempted, "state : %d, preempted %d\n", thrd->state, thrd->preempted);
     588        /* paranoid */ verifyf( thrd->next == 0p, "Expected null got %p", thrd->next );
     589
     590        lock  ( ready_queue_lock __cfaabi_dbg_ctx2 );
     591        bool was_empty = !(ready_queue != 0);
     592        append( ready_queue, thrd );
     593        unlock( ready_queue_lock );
     594
     595        if(was_empty) {
     596                lock      (proc_list_lock __cfaabi_dbg_ctx2);
     597                if(idles) {
     598                        wake_fast(idles.head);
    554599                }
    555                 else if( struct processor * idle = idles.head ) {
    556                         wake_fast(idle);
    557                 }
    558 
    559         }
    560 
    561         verify( ! kernelTLS.preemption_state.enabled );
     600                unlock    (proc_list_lock);
     601        }
     602        else if( struct processor * idle = idles.head ) {
     603                wake_fast(idle);
     604        }
     605
     606        /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
    562607}
    563608
     
    582627void BlockInternal( __spinlock_t * lock ) {
    583628        disable_interrupts();
    584         with( *kernelTLS.this_processor ) {
    585                 finish.action_code = Release;
    586                 finish.lock        = lock;
    587         }
     629        unlock( *lock );
    588630
    589631        verify( ! kernelTLS.preemption_state.enabled );
     
    596638void BlockInternal( thread_desc * thrd ) {
    597639        disable_interrupts();
    598         with( * kernelTLS.this_processor ) {
    599                 finish.action_code = Schedule;
    600                 finish.thrd        = thrd;
    601         }
     640        WakeThread( thrd, false );
    602641
    603642        verify( ! kernelTLS.preemption_state.enabled );
     
    609648
    610649void BlockInternal( __spinlock_t * lock, thread_desc * thrd ) {
    611         assert(thrd);
    612650        disable_interrupts();
    613         with( * kernelTLS.this_processor ) {
    614                 finish.action_code = Release_Schedule;
    615                 finish.lock        = lock;
    616                 finish.thrd        = thrd;
    617         }
     651        unlock( *lock );
     652        WakeThread( thrd, false );
    618653
    619654        verify( ! kernelTLS.preemption_state.enabled );
     
    626661void BlockInternal(__spinlock_t * locks [], unsigned short count) {
    627662        disable_interrupts();
    628         with( * kernelTLS.this_processor ) {
    629                 finish.action_code = Release_Multi;
    630                 finish.locks       = locks;
    631                 finish.lock_count  = count;
     663        for(int i = 0; i < count; i++) {
     664                unlock( *locks[i] );
    632665        }
    633666
     
    641674void BlockInternal(__spinlock_t * locks [], unsigned short lock_count, thread_desc * thrds [], unsigned short thrd_count) {
    642675        disable_interrupts();
    643         with( *kernelTLS.this_processor ) {
    644                 finish.action_code = Release_Multi_Schedule;
    645                 finish.locks       = locks;
    646                 finish.lock_count  = lock_count;
    647                 finish.thrds       = thrds;
    648                 finish.thrd_count  = thrd_count;
     676        for(int i = 0; i < lock_count; i++) {
     677                unlock( *locks[i] );
     678        }
     679        for(int i = 0; i < thrd_count; i++) {
     680                WakeThread( thrds[i], false );
    649681        }
    650682
     
    658690void BlockInternal(__finish_callback_fptr_t callback) {
    659691        disable_interrupts();
    660         with( *kernelTLS.this_processor ) {
    661                 finish.action_code = Callback;
    662                 finish.callback    = callback;
    663         }
     692        callback();
    664693
    665694        verify( ! kernelTLS.preemption_state.enabled );
     
    673702void LeaveThread(__spinlock_t * lock, thread_desc * thrd) {
    674703        verify( ! kernelTLS.preemption_state.enabled );
    675         with( * kernelTLS.this_processor ) {
    676                 finish.action_code = thrd ? Release_Schedule : Release;
    677                 finish.lock        = lock;
    678                 finish.thrd        = thrd;
    679         }
     704        unlock( *lock );
     705        WakeThread( thrd, false );
    680706
    681707        returnToKernel();
     
    932958
    933959        // make new owner
    934         WakeThread( thrd );
     960        WakeThread( thrd, false );
    935961}
    936962
  • libcfa/src/concurrency/kernel_private.hfa

    r4f7b418 r9f575ea  
    3131}
    3232
    33 void ScheduleThread( thread_desc * );
    34 static inline void WakeThread( thread_desc * thrd ) {
     33void ScheduleThread( thread_desc * ) __attribute__((nonnull (1)));
     34static inline void WakeThread( thread_desc * thrd, bool must_yield ) {
    3535        if( !thrd ) return;
    3636
    37         verify(thrd->state == Inactive);
     37        enum coroutine_state new_state = must_yield ? Reschedule : Rerun;
    3838
    3939        disable_interrupts();
    40         ScheduleThread( thrd );
     40        static_assert(sizeof(thrd->state) == sizeof(int));
     41        enum coroutine_state old_state = (enum coroutine_state)__atomic_exchange_n((volatile int *)&thrd->state, (int)new_state, __ATOMIC_SEQ_CST);
     42        switch(old_state) {
     43                case Active:
     44                        // Wake won the race, the thread will reschedule/rerun itself
     45                        break;
     46                case Inactive:
     47                        // Wake lost the race,
     48                        thrd->state = Inactive;
     49                        ScheduleThread( thrd );
     50                        break;
     51                case Rerun:
     52                case Reschedule:
     53                        abort("More than one thread attempted to schedule thread %p\n", thrd);
     54                        break;
     55                case Halted:
     56                case Start:
     57                case Primed:
     58                default:
     59                        // This makes no sense, something is wrong abort
     60                        abort();
     61        }
    4162        enable_interrupts( __cfaabi_dbg_ctx );
    4263}
  • libcfa/src/concurrency/monitor.cfa

    r4f7b418 r9f575ea  
    225225
    226226                //We need to wake-up the thread
    227                 WakeThread( new_owner );
     227                WakeThread( new_owner, false );
    228228        }
    229229
  • libcfa/src/concurrency/mutex.cfa

    r4f7b418 r9f575ea  
    6363        this.is_locked = (this.blocked_threads != 0);
    6464        WakeThread(
    65                 pop_head( this.blocked_threads )
     65                pop_head( this.blocked_threads ), false
    6666        );
    6767        unlock( this.lock );
     
    121121                owner = thrd;
    122122                recursion_count = (thrd ? 1 : 0);
    123                 WakeThread( thrd );
     123                WakeThread( thrd, false );
    124124        }
    125125        unlock( lock );
     
    139139        lock( lock __cfaabi_dbg_ctx2 );
    140140        WakeThread(
    141                 pop_head( this.blocked_threads )
     141                pop_head( this.blocked_threads ), false
    142142        );
    143143        unlock( lock );
     
    148148        while(this.blocked_threads) {
    149149                WakeThread(
    150                         pop_head( this.blocked_threads )
     150                        pop_head( this.blocked_threads ), false
    151151                );
    152152        }
  • libcfa/src/concurrency/preemption.cfa

    r4f7b418 r9f575ea  
    394394        // Preemption can occur here
    395395
    396         BlockInternal( kernelTLS.this_thread ); // Do the actual CtxSwitch
     396        kernelTLS.this_thread->preempted = true;
     397        BlockInternal(); // Do the actual CtxSwitch
    397398}
    398399
  • libcfa/src/concurrency/thread.cfa

    r4f7b418 r9f575ea  
    3636        self_cor{ name, storage, storageSize };
    3737        state = Start;
     38        preempted = false;
    3839        curr_cor = &self_cor;
    3940        self_mon.owner = &this;
Note: See TracChangeset for help on using the changeset viewer.