Changeset db6f06a for src/libcfa


Ignore:
Timestamp:
Feb 13, 2017, 5:04:43 PM (8 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, aaron-thesis, arm-eh, ast-experimental, cleanup-dtors, deferred_resn, demangler, enum, forall-pointer-decay, jacob/cs343-translation, jenkins-sandbox, master, new-ast, new-ast-unique-expr, new-env, no_list, persistent-indexer, pthread-emulation, qualifiedEnum, resolv-new, with_gc
Children:
ee897e4b
Parents:
75f3522
Message:

Implemented better condition lock to solve race condition on thread/processor termination

Location:
src/libcfa/concurrency
Files:
6 edited

Legend:

Unmodified
Added
Removed
  • src/libcfa/concurrency/coroutines.c

    r75f3522 rdb6f06a  
    6161
    6262void ?{}(coroutine* this) {
    63         this->name = "Anonymous Coroutine";
    64         this->errno_ = 0;
    65         this->state = Start;
    66       this->notHalted = true;
    67         this->starter = NULL;
    68         this->last = NULL;
     63        this{ "Anonymous Coroutine" };
    6964}
    7065
     
    7368        this->errno_ = 0;
    7469        this->state = Start;
    75       this->notHalted = true;
     70        this->notHalted = true;
    7671        this->starter = NULL;
    7772        this->last = NULL;
     
    169164        this->context = this->base;
    170165        this->top = (char *)this->context + cxtSize;
    171 
    172         LIB_DEBUG_PRINTF("Coroutine : created stack %p\n", this->base);
    173166}
    174167
  • src/libcfa/concurrency/invoke.h

    r75f3522 rdb6f06a  
    3030      #define SCHEDULER_CAPACITY 10
    3131
     32      struct spinlock {
     33            volatile int lock;
     34      };
     35
    3236      struct simple_thread_list {
    3337            struct thread * head;
    3438            struct thread ** tail;
     39      };
     40
     41      // struct simple_lock {
     42      //        struct simple_thread_list blocked;
     43      // };
     44
     45      struct signal_once {
     46            volatile bool condition;
     47            struct spinlock lock;
     48            struct simple_thread_list blocked;
    3549      };
    3650
     
    4054            void append( struct simple_thread_list *, struct thread * );
    4155            struct thread * pop_head( struct simple_thread_list * );
     56
     57            // void ?{}(simple_lock * this);
     58            // void ^?{}(simple_lock * this);
     59
     60            void ?{}(spinlock * this);
     61            void ^?{}(spinlock * this);
     62
     63            void ?{}(signal_once * this);
     64            void ^?{}(signal_once * this);
    4265      }
    4366      #endif
     
    6083            int errno_;                         // copy of global UNIX variable errno
    6184            enum coroutine_state state; // current execution status for coroutine
    62             bool notHalted;                     // indicate if execuation state is not halted
    63 
     85            bool notHalted;                   // indicate if execuation state is not halted
    6486            struct coroutine *starter;  // first coroutine to resume this one
    6587            struct coroutine *last;             // last coroutine to resume this one
    6688      };
    6789
    68       struct simple_lock {
    69         struct simple_thread_list blocked;
    70       };
    71 
    7290      struct thread {
    7391            struct coroutine c;
    74             struct simple_lock lock;
     92            signal_once terminated;             // indicate if execuation state is not halted
    7593            struct thread * next;
    7694      };
  • src/libcfa/concurrency/kernel

    r75f3522 rdb6f06a  
    2727
    2828//-----------------------------------------------------------------------------
     29// Locks
     30// void lock( simple_lock * );
     31// void lock( simple_lock *, spinlock * );
     32// void unlock( simple_lock * );
     33
     34void lock( spinlock * );
     35void unlock( spinlock * );
     36
     37void wait( signal_once * );
     38void signal( signal_once * );
     39
     40//-----------------------------------------------------------------------------
    2941// Cluster
    3042struct cluster {
    3143        simple_thread_list ready_queue;
    32         // pthread_spinlock_t lock;
     44        spinlock lock;
    3345};
    3446
     
    3850//-----------------------------------------------------------------------------
    3951// Processor
    40 enum ProcessorAction {
    41         Reschedule,
    42         NoAction
     52enum FinishOpCode { No_Action, Release, Schedule, Release_Schedule };
     53struct FinishAction {
     54        FinishOpCode action_code;
     55        thread * thrd;
     56        spinlock * lock;
    4357};
     58static inline void ?{}(FinishAction * this) {
     59        this->action_code = No_Action;
     60        this->thrd = NULL;
     61        this->lock = NULL;
     62}
     63static inline void ^?{}(FinishAction * this) {}
    4464
    4565struct processor {
     
    4969        thread * current_thread;
    5070        pthread_t kernel_thread;
    51         simple_lock lock;
    52         volatile bool terminated;
    53         ProcessorAction thread_action;
     71       
     72        signal_once terminated;
     73        volatile bool is_terminated;
     74
     75        struct FinishAction finish;
    5476};
    5577
     
    5779void ?{}(processor * this, cluster * cltr);
    5880void ^?{}(processor * this);
    59 
    60 //-----------------------------------------------------------------------------
    61 // Locks
    62 
    63 void ?{}(simple_lock * this);
    64 void ^?{}(simple_lock * this);
    65 
    66 void lock( simple_lock * );
    67 void unlock( simple_lock * );
    6881
    6982#endif //KERNEL_H
  • src/libcfa/concurrency/kernel.c

    r75f3522 rdb6f06a  
    141141}
    142142
    143 void start(processor * this);
    144 
    145143void ?{}(processor * this) {
    146144        this{ systemCluster };
     
    151149        this->current_coroutine = NULL;
    152150        this->current_thread = NULL;
    153         (&this->lock){};
    154         this->terminated = false;
     151        (&this->terminated){};
     152        this->is_terminated = false;
    155153
    156154        start( this );
     
    161159        this->current_coroutine = NULL;
    162160        this->current_thread = NULL;
    163         (&this->lock){};
    164         this->terminated = false;
     161        (&this->terminated){};
     162        this->is_terminated = false;
    165163
    166164        this->runner = runner;
     
    170168
    171169void ^?{}(processor * this) {
    172         if( ! this->terminated ) {
     170        if( ! this->is_terminated ) {
    173171                LIB_DEBUG_PRINTF("Kernel : core %p signaling termination\n", this);
    174                 this->terminated = true;
    175                 lock( &this->lock );
     172                this->is_terminated = true;
     173                wait( &this->terminated );
    176174        }
    177175}
     
    194192        LIB_DEBUG_PRINTF("Kernel : core %p starting\n", this);
    195193
    196         fenv_t envp;
    197         fegetenv( &envp );
    198         LIB_DEBUG_PRINTF("Kernel : mxcsr %x\n", envp.__mxcsr);
    199 
    200194        thread * readyThread = NULL;
    201         for( unsigned int spin_count = 0; ! this->terminated; spin_count++ )
     195        for( unsigned int spin_count = 0; ! this->is_terminated; spin_count++ )
    202196        {
    203197                readyThread = nextThread( this->cltr );
     
    208202
    209203                        //Some actions need to be taken from the kernel
    210                         finishRunning(this, readyThread);
     204                        finishRunning(this);
    211205
    212206                        spin_count = 0;
     
    219213
    220214        LIB_DEBUG_PRINTF("Kernel : core %p unlocking thread\n", this);
    221         unlock( &this->lock );
     215        signal( &this->terminated );
    222216        LIB_DEBUG_PRINTF("Kernel : core %p terminated\n", this);
    223217}
     
    230224       
    231225        //Reset the terminating actions here
    232         this->thread_action = NoAction;
     226        this->finish.action_code = No_Action;
    233227
    234228        //Update global state
     
    242236// Once a thread has finished running, some of
    243237// its final actions must be executed from the kernel
    244 void finishRunning(processor * this, thread * thrd) {
    245         if(this->thread_action == Reschedule) {
    246                 ScheduleThread( thrd );
     238void finishRunning(processor * this) {
     239        if( this->finish.action_code == Release ) {
     240                unlock( this->finish.lock );
     241        }
     242        else if( this->finish.action_code == Schedule ) {
     243                ScheduleThread( this->finish.thrd );
     244        }
     245        else if( this->finish.action_code == Release_Schedule ) {
     246                unlock( this->finish.lock );           
     247                ScheduleThread( this->finish.thrd );
     248        }
     249        else {
     250                assert(this->finish.action_code == No_Action);
    247251        }
    248252}
     
    310314        assertf( thrd->next == NULL, "Expected null got %p", thrd->next );
    311315       
    312         spin_lock( &lock );
     316        lock( &systemProcessor->cltr->lock );
    313317        append( &systemProcessor->cltr->ready_queue, thrd );
    314         spin_unlock( &lock );
     318        unlock( &systemProcessor->cltr->lock );
     319}
     320
     321thread * nextThread(cluster * this) {
     322        lock( &this->lock );
     323        thread * head = pop_head( &this->ready_queue );
     324        unlock( &this->lock );
     325        return head;
    315326}
    316327
    317328void ScheduleInternal() {
    318         get_this_processor()->thread_action = Reschedule;
    319329        suspend();
    320330}
    321331
    322 thread * nextThread(cluster * this) {
    323         spin_lock( &lock );
    324         thread * head = pop_head( &this->ready_queue );
    325         spin_unlock( &lock );
    326         return head;
     332void ScheduleInternal( spinlock * lock ) {
     333        get_this_processor()->finish.action_code = Release;
     334        get_this_processor()->finish.lock = lock;
     335        suspend();
     336}
     337
     338void ScheduleInternal( thread * thrd ) {
     339        get_this_processor()->finish.action_code = Schedule;
     340        get_this_processor()->finish.thrd = thrd;
     341        suspend();
     342}
     343
     344void ScheduleInternal( spinlock * lock, thread * thrd ) {
     345        get_this_processor()->finish.action_code = Release_Schedule;
     346        get_this_processor()->finish.lock = lock;
     347        get_this_processor()->finish.thrd = thrd;
     348        suspend();
    327349}
    328350
     
    374396        // When its coroutine terminates, it return control to the mainThread
    375397        // which is currently here
    376         systemProcessor->terminated = true;
     398        systemProcessor->is_terminated = true;
    377399        suspend();
    378400
     
    393415//-----------------------------------------------------------------------------
    394416// Locks
    395 void ?{}( simple_lock * this ) {
    396         ( &this->blocked ){};
    397 }
    398 
    399 void ^?{}( simple_lock * this ) {
    400 
    401 }
    402 
    403 void lock( simple_lock * this ) {
     417// void ?{}( simple_lock * this ) {
     418//      ( &this->blocked ){};
     419// }
     420
     421// void ^?{}( simple_lock * this ) {
     422
     423// }
     424
     425// void lock( simple_lock * this ) {
     426//      {
     427//              spin_lock( &lock );
     428//              append( &this->blocked, this_thread() );
     429//              spin_unlock( &lock );
     430//      }
     431//      ScheduleInternal();
     432// }
     433
     434// void lock( simple_lock * this, spinlock * to_release ) {
     435//      {
     436//              spin_lock( &lock );
     437//              append( &this->blocked, this_thread() );
     438//              spin_unlock( &lock );
     439//      }
     440//      ScheduleInternal( to_release );
     441//      lock( to_release );
     442// }
     443
     444// void unlock( simple_lock * this ) {
     445//      thread * it;
     446//      while( it = pop_head( &this->blocked) ) {
     447//              ScheduleThread( it );
     448//      }
     449// }
     450
     451void ?{}( spinlock * this ) {
     452        this->lock = 0;
     453}
     454void ^?{}( spinlock * this ) {
     455
     456}
     457
     458void lock( spinlock * this ) {
     459        for ( unsigned int i = 1;; i += 1 ) {
     460                if ( this->lock == 0 && __sync_lock_test_and_set_4( &this->lock, 1 ) == 0 ) break;
     461        }
     462}
     463
     464void unlock( spinlock * this ) {
     465        __sync_lock_release_4( &this->lock );
     466}
     467
     468void ?{}( signal_once * this ) {
     469        this->condition = false;
     470}
     471void ^?{}( signal_once * this ) {
     472
     473}
     474
     475void wait( signal_once * this ) {
     476        lock( &this->lock );
     477        if( !this->condition ) {
     478                append( &this->blocked, this_thread() );
     479                ScheduleInternal( &this->lock );
     480                lock( &this->lock );
     481        }
     482        unlock( &this->lock );
     483}
     484
     485void signal( signal_once * this ) {
     486        lock( &this->lock );
    404487        {
    405                 spin_lock( &lock );
    406                 append( &this->blocked, this_thread() );
    407                 spin_unlock( &lock );
    408         }
    409         suspend();
    410 }
    411 
    412 void unlock( simple_lock * this ) {
    413         thread * it;
    414         while( it = pop_head( &this->blocked) ) {
    415                 ScheduleThread( it );
    416         }
     488                this->condition = true;
     489
     490                thread * it;
     491                while( it = pop_head( &this->blocked) ) {
     492                        ScheduleThread( it );
     493                }
     494        }
     495        unlock( &this->lock );
    417496}
    418497
  • src/libcfa/concurrency/kernel_private.h

    r75f3522 rdb6f06a  
    2424// Scheduler
    2525void ScheduleThread( thread * );
     26thread * nextThread(cluster * this);
     27
    2628void ScheduleInternal();
    27 thread * nextThread(cluster * this);
     29void ScheduleInternal(spinlock * lock);
     30void ScheduleInternal(thread * thrd);
     31void ScheduleInternal(spinlock * lock, thread * thrd);
    2832
    2933//-----------------------------------------------------------------------------
     
    3741
    3842void main(processorCtx_t *);
     43void start(processor * this);
    3944void runThread(processor * this, thread * dst);
    40 void finishRunning(processor * this, thread * thrd);
     45void finishRunning(processor * this);
    4146void spin(processor * this, unsigned int * spin_count);
    4247
  • src/libcfa/concurrency/threads.c

    r75f3522 rdb6f06a  
    4444        (&this->c){};
    4545        this->c.name = "Anonymous Coroutine";
    46         (&this->lock){};
     46        (&this->terminated){};
    4747        this->next = NULL;
    4848}
     
    9090forall( dtype T | is_thread(T) )
    9191void stop( T* this ) {
    92         thread*  thrd = get_thread(this);
    93         if( thrd->c.notHalted ) {
    94                 lock( &thrd->lock );
    95         }
     92        wait( & get_thread(this)->terminated );
    9693}
    9794
    9895void yield( void ) {
    99         ScheduleInternal();
     96        ScheduleInternal( get_this_processor()->current_thread );
    10097}
    10198
     
    124121        void __thread_signal_termination( thread * this ) {
    125122                this->c.state = Halt;
    126                 this->c.notHalted = false;
    127                 unlock( &this->lock );
     123                LIB_DEBUG_PRINTF("Thread end : %p\n", this);
     124                signal( &this->terminated );   
    128125        }
    129126}
Note: See TracChangeset for help on using the changeset viewer.