Ignore:
Timestamp:
Feb 15, 2017, 8:13:49 AM (9 years ago)
Author:
Rob Schluntz <rschlunt@…>
Branches:
ADT, aaron-thesis, arm-eh, ast-experimental, cleanup-dtors, deferred_resn, demangler, enum, forall-pointer-decay, jacob/cs343-translation, jenkins-sandbox, master, new-ast, new-ast-unique-expr, new-env, no_list, persistent-indexer, pthread-emulation, qualifiedEnum, resolv-new, stuck-waitfor-destruct, with_gc
Children:
e6512c8
Parents:
aa9ee19 (diff), 3149e7e (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:/u/cforall/software/cfa/cfa-cc

Location:
src/libcfa/concurrency
Files:
1 added
7 edited

Legend:

Unmodified
Added
Removed
  • src/libcfa/concurrency/coroutines

    raa9ee19 r97f65d5  
    7777                "Possible cause is a suspend executed in a member called by a coroutine user rather than by the coroutine main.",
    7878                src->name, src );
    79         assertf( src->last->notHalted,
     79        assertf( src->last->state != Halted,
    8080                "Attempt by coroutine \"%.256s\" (%p) to suspend back to terminated coroutine \"%.256s\" (%p).\n"
    8181                "Possible cause is terminated coroutine's main routine has already returned.",
     
    9898      // not resuming self ?
    9999        if ( src != dst ) {
    100                 assertf( dst->notHalted ,
     100                assertf( dst->state != Halted ,
    101101                        "Attempt by coroutine %.256s (%p) to resume terminated coroutine %.256s (%p).\n"
    102102                        "Possible cause is terminated coroutine's main routine has already returned.",
     
    116116      // not resuming self ?
    117117        if ( src != dst ) {
    118                 assertf( dst->notHalted ,
     118                assertf( dst->state != Halted ,
    119119                        "Attempt by coroutine %.256s (%p) to resume terminated coroutine %.256s (%p).\n"
    120120                        "Possible cause is terminated coroutine's main routine has already returned.",
  • src/libcfa/concurrency/coroutines.c

    raa9ee19 r97f65d5  
    6161
    6262void ?{}(coroutine* this) {
    63         this->name = "Anonymous Coroutine";
    64         this->errno_ = 0;
    65         this->state = Start;
    66       this->notHalted = true;
    67         this->starter = NULL;
    68         this->last = NULL;
     63        this{ "Anonymous Coroutine" };
    6964}
    7065
     
    7368        this->errno_ = 0;
    7469        this->state = Start;
    75       this->notHalted = true;
    7670        this->starter = NULL;
    7771        this->last = NULL;
     
    169163        this->context = this->base;
    170164        this->top = (char *)this->context + cxtSize;
    171 
    172         LIB_DEBUG_PRINTF("Coroutine : created stack %p\n", this->base);
    173165}
    174166
  • src/libcfa/concurrency/invoke.c

    raa9ee19 r97f65d5  
    4848      main( this );
    4949
    50       cor->state = Halt;
    51       cor->notHalted = false;
     50      cor->state = Halted;
    5251
    5352      //Final suspend, should never return
  • src/libcfa/concurrency/invoke.h

    raa9ee19 r97f65d5  
    3030      #define SCHEDULER_CAPACITY 10
    3131
     32      struct spinlock {
     33            volatile int lock;
     34      };
     35
    3236      struct simple_thread_list {
    3337            struct thread * head;
    3438            struct thread ** tail;
     39      };
     40
     41      struct signal_once {
     42            volatile bool condition;
     43            struct spinlock lock;
     44            struct simple_thread_list blocked;
    3545      };
    3646
     
    4050            void append( struct simple_thread_list *, struct thread * );
    4151            struct thread * pop_head( struct simple_thread_list * );
     52
     53            void ?{}(spinlock * this);
     54            void ^?{}(spinlock * this);
     55
     56            void ?{}(signal_once * this);
     57            void ^?{}(signal_once * this);
    4258      }
    4359      #endif
     
    5369      };
    5470
    55       enum coroutine_state { Start, Inactive, Active, Halt, Primed };
     71      enum coroutine_state { Halted, Start, Inactive, Active, Primed };
    5672
    5773      struct coroutine {
     
    6076            int errno_;                         // copy of global UNIX variable errno
    6177            enum coroutine_state state; // current execution status for coroutine
    62             bool notHalted;                     // indicate if execuation state is not halted
    63 
    6478            struct coroutine *starter;  // first coroutine to resume this one
    6579            struct coroutine *last;             // last coroutine to resume this one
    6680      };
    6781
    68       struct simple_lock {
    69         struct simple_thread_list blocked;
    70       };
    71 
    7282      struct thread {
    73             struct coroutine c;
    74             struct simple_lock lock;
    75             struct thread * next;
     83            struct coroutine c;           // coroutine body used to store context
     84            struct signal_once terminated;// indicate if execuation state is not halted
     85            struct thread * next;         // instrusive link field for threads
    7686      };
    7787
  • src/libcfa/concurrency/kernel

    raa9ee19 r97f65d5  
    99//
    1010// Author           : Thierry Delisle
    11 // Created On       : Tue Jan 17 12:27:26 2016
     11// Created On       : Tue Jan 17 12:27:26 2017
    1212// Last Modified By : Thierry Delisle
    1313// Last Modified On : --
     
    2727
    2828//-----------------------------------------------------------------------------
     29// Locks
     30void lock( spinlock * );
     31void unlock( spinlock * );
     32
     33void wait( signal_once * );
     34void signal( signal_once * );
     35
     36//-----------------------------------------------------------------------------
    2937// Cluster
    3038struct cluster {
    3139        simple_thread_list ready_queue;
    32         // pthread_spinlock_t lock;
     40        spinlock lock;
    3341};
    3442
     
    3846//-----------------------------------------------------------------------------
    3947// Processor
    40 enum ProcessorAction {
    41         Reschedule,
    42         NoAction
     48enum FinishOpCode { No_Action, Release, Schedule, Release_Schedule };
     49struct FinishAction {
     50        FinishOpCode action_code;
     51        thread * thrd;
     52        spinlock * lock;
    4353};
     54static inline void ?{}(FinishAction * this) {
     55        this->action_code = No_Action;
     56        this->thrd = NULL;
     57        this->lock = NULL;
     58}
     59static inline void ^?{}(FinishAction * this) {}
    4460
    4561struct processor {
     
    4965        thread * current_thread;
    5066        pthread_t kernel_thread;
    51         simple_lock lock;
    52         volatile bool terminated;
    53         ProcessorAction thread_action;
     67       
     68        signal_once terminated;
     69        volatile bool is_terminated;
     70
     71        struct FinishAction finish;
    5472};
    5573
     
    5775void ?{}(processor * this, cluster * cltr);
    5876void ^?{}(processor * this);
    59 
    60 
    61 //-----------------------------------------------------------------------------
    62 // Locks
    63 
    64 void ?{}(simple_lock * this);
    65 void ^?{}(simple_lock * this);
    66 
    67 void lock( simple_lock * );
    68 void unlock( simple_lock * );
    6977
    7078#endif //KERNEL_H
  • src/libcfa/concurrency/kernel.c

    raa9ee19 r97f65d5  
    99//
    1010// Author           : Thierry Delisle
    11 // Created On       : Tue Jan 17 12:27:26 2016
     11// Created On       : Tue Jan 17 12:27:26 2017
    1212// Last Modified By : Thierry Delisle
    1313// Last Modified On : --
     
    2020
    2121//Header
    22 #include "kernel"
     22#include "kernel_private.h"
    2323
    2424//C Includes
     
    3131//CFA Includes
    3232#include "libhdr.h"
    33 #include "threads"
    3433
    3534//Private includes
     
    3736#include "invoke.h"
    3837
    39 static volatile int lock;
    40 
    41 void spin_lock( volatile int *lock ) {
    42         for ( unsigned int i = 1;; i += 1 ) {
    43           if ( *lock == 0 && __sync_lock_test_and_set_4( lock, 1 ) == 0 ) break;
    44         }
    45 }
    46 
    47 void spin_unlock( volatile int *lock ) {
    48         __sync_lock_release_4( lock );
    49 }
    50 
    5138//-----------------------------------------------------------------------------
    5239// Kernel storage
    53 struct processorCtx_t {
    54         processor * proc;
    55         coroutine c;
    56 };
    57 
    58 DECL_COROUTINE(processorCtx_t);
    59 
    6040#define KERNEL_STORAGE(T,X) static char X##_storage[sizeof(T)]
    6141
     
    127107        this->name = "Main Thread";
    128108        this->errno_ = 0;
    129         this->state = Inactive;
    130         this->notHalted = true;
     109        this->state = Start;
    131110}
    132111
     
    149128}
    150129
    151 void start(processor * this);
    152 
    153130void ?{}(processor * this) {
    154131        this{ systemCluster };
     
    159136        this->current_coroutine = NULL;
    160137        this->current_thread = NULL;
    161         (&this->lock){};
    162         this->terminated = false;
     138        (&this->terminated){};
     139        this->is_terminated = false;
    163140
    164141        start( this );
     
    169146        this->current_coroutine = NULL;
    170147        this->current_thread = NULL;
    171         (&this->lock){};
    172         this->terminated = false;
     148        (&this->terminated){};
     149        this->is_terminated = false;
    173150
    174151        this->runner = runner;
     
    178155
    179156void ^?{}(processor * this) {
    180         if( ! this->terminated ) {
     157        if( ! this->is_terminated ) {
    181158                LIB_DEBUG_PRINTF("Kernel : core %p signaling termination\n", this);
    182                 this->terminated = true;
    183                 lock( &this->lock );
     159                this->is_terminated = true;
     160                wait( &this->terminated );
    184161        }
    185162}
     
    187164void ?{}(cluster * this) {
    188165        ( &this->ready_queue ){};
    189         lock = 0;
     166        ( &this->lock ){};
    190167}
    191168
     
    194171}
    195172
    196 //-----------------------------------------------------------------------------
    197 // Processor running routines
    198 void main(processorCtx_t *);
    199 thread * nextThread(cluster * this);
    200 void scheduleInternal(processor * this, thread * dst);
    201 void spin(processor * this, unsigned int * spin_count);
    202 void thread_schedule( thread * thrd );
    203 
     173//=============================================================================================
     174// Kernel Scheduling logic
     175//=============================================================================================
    204176//Main of the processor contexts
    205177void main(processorCtx_t * runner) {
     
    207179        LIB_DEBUG_PRINTF("Kernel : core %p starting\n", this);
    208180
    209         fenv_t envp;
    210         fegetenv( &envp );
    211         LIB_DEBUG_PRINTF("Kernel : mxcsr %x\n", envp.__mxcsr);
    212 
    213181        thread * readyThread = NULL;
    214         for( unsigned int spin_count = 0; ! this->terminated; spin_count++ ) {
    215                
     182        for( unsigned int spin_count = 0; ! this->is_terminated; spin_count++ )
     183        {
    216184                readyThread = nextThread( this->cltr );
    217185
    218                 if(readyThread) {
    219                         scheduleInternal(this, readyThread);
     186                if(readyThread)
     187                {
     188                        runThread(this, readyThread);
     189
     190                        //Some actions need to be taken from the kernel
     191                        finishRunning(this);
     192
    220193                        spin_count = 0;
    221                 } else {
     194                }
     195                else
     196                {
    222197                        spin(this, &spin_count);
    223198                }               
     
    225200
    226201        LIB_DEBUG_PRINTF("Kernel : core %p unlocking thread\n", this);
    227         unlock( &this->lock );
     202        signal( &this->terminated );
    228203        LIB_DEBUG_PRINTF("Kernel : core %p terminated\n", this);
    229204}
    230205
    231 //Declarations for scheduleInternal
    232 extern void ThreadCtxSwitch(coroutine * src, coroutine * dst);
    233 
    234 // scheduleInternal runs a thread by context switching
     206// runThread runs a thread by context switching
    235207// from the processor coroutine to the target thread
    236 void scheduleInternal(processor * this, thread * dst) {
    237         this->thread_action = NoAction;
    238 
    239         // coroutine * proc_ctx = get_coroutine(this->ctx);
    240         // coroutine * thrd_ctx = get_coroutine(dst);
    241 
    242         // //Update global state
    243         // this->current_thread = dst;
    244 
    245         // // Context Switch to the thread
    246         // ThreadCtxSwitch(proc_ctx, thrd_ctx);
    247         // // when ThreadCtxSwitch returns we are back in the processor coroutine
    248 
    249         coroutine * proc_ctx = get_coroutine(this->runner);
    250         coroutine * thrd_ctx = get_coroutine(dst);
    251       thrd_ctx->last = proc_ctx;
    252  
    253       // context switch to specified coroutine
    254       // Which is now the current_coroutine
    255       // LIB_DEBUG_PRINTF("Kernel : switching to ctx %p (from %p, current %p)\n", thrd_ctx, proc_ctx, this->current_coroutine);
    256       this->current_thread = dst;
    257       this->current_coroutine = thrd_ctx;
    258       CtxSwitch( proc_ctx->stack.context, thrd_ctx->stack.context );
    259       this->current_coroutine = proc_ctx;
    260       // LIB_DEBUG_PRINTF("Kernel : returned from ctx %p (to %p, current %p)\n", thrd_ctx, proc_ctx, this->current_coroutine);
    261  
    262       // when CtxSwitch returns we are back in the processor coroutine
    263         if(this->thread_action == Reschedule) {
    264                 thread_schedule( dst );
     208void runThread(processor * this, thread * dst) {
     209        coroutine * proc_cor = get_coroutine(this->runner);
     210        coroutine * thrd_cor = get_coroutine(dst);
     211       
     212        //Reset the terminating actions here
     213        this->finish.action_code = No_Action;
     214
     215        //Update global state
     216        this->current_thread = dst;
     217
     218        // Context Switch to the thread
     219        ThreadCtxSwitch(proc_cor, thrd_cor);
     220        // when ThreadCtxSwitch returns we are back in the processor coroutine
     221}
     222
     223// Once a thread has finished running, some of
     224// its final actions must be executed from the kernel
     225void finishRunning(processor * this) {
     226        if( this->finish.action_code == Release ) {
     227                unlock( this->finish.lock );
     228        }
     229        else if( this->finish.action_code == Schedule ) {
     230                ScheduleThread( this->finish.thrd );
     231        }
     232        else if( this->finish.action_code == Release_Schedule ) {
     233                unlock( this->finish.lock );           
     234                ScheduleThread( this->finish.thrd );
     235        }
     236        else {
     237                assert(this->finish.action_code == No_Action);
    265238        }
    266239}
     
    301274        proc_cor_storage.c.state = Active;
    302275      main( &proc_cor_storage );
    303       proc_cor_storage.c.state = Halt;
    304       proc_cor_storage.c.notHalted = false;
     276      proc_cor_storage.c.state = Halted;
    305277
    306278        // Main routine of the core returned, the core is now fully terminated
     
    325297//-----------------------------------------------------------------------------
    326298// Scheduler routines
    327 void thread_schedule( thread * thrd ) {
     299void ScheduleThread( thread * thrd ) {
    328300        assertf( thrd->next == NULL, "Expected null got %p", thrd->next );
    329301       
    330         spin_lock( &lock );
     302        lock( &systemProcessor->cltr->lock );
    331303        append( &systemProcessor->cltr->ready_queue, thrd );
    332         spin_unlock( &lock );
     304        unlock( &systemProcessor->cltr->lock );
    333305}
    334306
    335307thread * nextThread(cluster * this) {
    336         spin_lock( &lock );
     308        lock( &this->lock );
    337309        thread * head = pop_head( &this->ready_queue );
    338         spin_unlock( &lock );
     310        unlock( &this->lock );
    339311        return head;
     312}
     313
     314void ScheduleInternal() {
     315        suspend();
     316}
     317
     318void ScheduleInternal( spinlock * lock ) {
     319        get_this_processor()->finish.action_code = Release;
     320        get_this_processor()->finish.lock = lock;
     321        suspend();
     322}
     323
     324void ScheduleInternal( thread * thrd ) {
     325        get_this_processor()->finish.action_code = Schedule;
     326        get_this_processor()->finish.thrd = thrd;
     327        suspend();
     328}
     329
     330void ScheduleInternal( spinlock * lock, thread * thrd ) {
     331        get_this_processor()->finish.action_code = Release_Schedule;
     332        get_this_processor()->finish.lock = lock;
     333        get_this_processor()->finish.thrd = thrd;
     334        suspend();
    340335}
    341336
     
    363358        // Add the main thread to the ready queue
    364359        // once resume is called on systemProcessor->ctx the mainThread needs to be scheduled like any normal thread
    365         thread_schedule(mainThread);
     360        ScheduleThread(mainThread);
    366361
    367362        //initialize the global state variables
     
    387382        // When its coroutine terminates, it return control to the mainThread
    388383        // which is currently here
    389         systemProcessor->terminated = true;
     384        systemProcessor->is_terminated = true;
    390385        suspend();
    391386
     
    406401//-----------------------------------------------------------------------------
    407402// Locks
    408 void ?{}( simple_lock * this ) {
    409         ( &this->blocked ){};
    410 }
    411 
    412 void ^?{}( simple_lock * this ) {
    413 
    414 }
    415 
    416 void lock( simple_lock * this ) {
     403void ?{}( spinlock * this ) {
     404        this->lock = 0;
     405}
     406void ^?{}( spinlock * this ) {
     407
     408}
     409
     410void lock( spinlock * this ) {
     411        for ( unsigned int i = 1;; i += 1 ) {
     412                if ( this->lock == 0 && __sync_lock_test_and_set_4( &this->lock, 1 ) == 0 ) break;
     413        }
     414}
     415
     416void unlock( spinlock * this ) {
     417        __sync_lock_release_4( &this->lock );
     418}
     419
     420void ?{}( signal_once * this ) {
     421        this->condition = false;
     422}
     423void ^?{}( signal_once * this ) {
     424
     425}
     426
     427void wait( signal_once * this ) {
     428        lock( &this->lock );
     429        if( !this->condition ) {
     430                append( &this->blocked, this_thread() );
     431                ScheduleInternal( &this->lock );
     432                lock( &this->lock );
     433        }
     434        unlock( &this->lock );
     435}
     436
     437void signal( signal_once * this ) {
     438        lock( &this->lock );
    417439        {
    418                 spin_lock( &lock );
    419                 append( &this->blocked, this_thread() );
    420                 spin_unlock( &lock );
    421         }
    422         suspend();
    423 }
    424 
    425 void unlock( simple_lock * this ) {
    426         thread * it;
    427         while( it = pop_head( &this->blocked) ) {
    428                 thread_schedule( it );
    429         }
     440                this->condition = true;
     441
     442                thread * it;
     443                while( it = pop_head( &this->blocked) ) {
     444                        ScheduleThread( it );
     445                }
     446        }
     447        unlock( &this->lock );
    430448}
    431449
  • src/libcfa/concurrency/threads.c

    raa9ee19 r97f65d5  
    1717#include "threads"
    1818
    19 #include "kernel"
     19#include "kernel_private.h"
    2020#include "libhdr.h"
    2121
     
    4444        (&this->c){};
    4545        this->c.name = "Anonymous Coroutine";
    46         (&this->lock){};
     46        (&this->terminated){};
    4747        this->next = NULL;
    4848}
     
    7272//-----------------------------------------------------------------------------
    7373// Starting and stopping threads
    74 extern "C" {
    75       forall(dtype T | is_thread(T))
    76       void CtxInvokeThread(T * this);
    77 }
    78 
    79 extern void thread_schedule( thread * );
    80 
    8174forall( dtype T | is_thread(T) )
    8275void start( T* this ) {
     
    9285        CtxSwitch( thrd_c->last->stack.context, thrd_c->stack.context );
    9386
    94         fenv_t envp;
    95         fegetenv( &envp );
    96         LIB_DEBUG_PRINTF("Thread : mxcsr %x\n", envp.__mxcsr);
    97         LIB_DEBUG_PRINTF("Thread started : %p (t %p, c %p)\n", this, thrd_c, thrd_h);
    98 
    99         thread_schedule(thrd_h);
     87        ScheduleThread(thrd_h);
    10088}
    10189
    10290forall( dtype T | is_thread(T) )
    10391void stop( T* this ) {
    104         thread*  thrd = get_thread(this);
    105         if( thrd->c.notHalted ) {
    106                 lock( &thrd->lock );
    107         }
     92        wait( & get_thread(this)->terminated );
    10893}
    10994
    11095void yield( void ) {
    111         get_this_processor()->thread_action = Reschedule;
    112         suspend();
     96        ScheduleInternal( get_this_processor()->current_thread );
    11397}
    11498
    11599void ThreadCtxSwitch(coroutine* src, coroutine* dst) {
     100        // set state of current coroutine to inactive
     101        src->state = Inactive;
     102        dst->state = Active;
     103
     104        //update the last resumer
    116105        dst->last = src;
    117106
    118         // set state of current coroutine to inactive
    119         src->state = Inactive;
    120 
    121         // set new coroutine that task is executing
     107        // set new coroutine that the processor is executing
     108        // and context switch to it
    122109        get_this_processor()->current_coroutine = dst; 
    123 
    124         // context switch to specified coroutine
    125110        CtxSwitch( src->stack.context, dst->stack.context );
    126         // when CtxSwitch returns we are back in the src coroutine
     111        get_this_processor()->current_coroutine = src; 
    127112
    128113        // set state of new coroutine to active
     114        dst->state = Inactive;
    129115        src->state = Active;
    130116}
     
    134120extern "C" {
    135121        void __thread_signal_termination( thread * this ) {
    136                 this->c.state = Halt;
    137                 this->c.notHalted = false;
    138                 unlock( &this->lock );
     122                this->c.state = Halted;
     123                LIB_DEBUG_PRINTF("Thread end : %p\n", this);
     124                signal( &this->terminated );   
    139125        }
    140126}
Note: See TracChangeset for help on using the changeset viewer.