Changeset bd98b58


Ignore:
Timestamp:
Jan 20, 2017, 4:50:15 PM (5 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
aaron-thesis, arm-eh, cleanup-dtors, deferred_resn, demangler, jacob/cs343-translation, jenkins-sandbox, master, new-ast, new-ast-unique-expr, new-env, no_list, persistent-indexer, resolv-new, with_gc
Children:
207c7e1d
Parents:
dcb42b8
Message:

Kernel now uses intrusive lists and blocking locks for ready queue management.
Update the plan for concurrency.

Files:
10 edited

Legend:

Unmodified
Added
Removed
  • doc/proposals/concurrency/thePlan.md

    rdcb42b8 rbd98b58  
    11_Phase 1_ : Prototype
    2 Threads and Processors.
    3 Main needs to call process
     2done - Threads.
     3done - Main thread is a cfa thread.
     4done - SimpleBlockingLock.
     5done - Synchronisation points in thread destructors.
     6Processors & SpinLock.
    47
    58_Phase 2_ : Minimum Viable Product
    6 Main thread is a cfa thread
    7 Basic monitors for synchronisation and minimal lock support.
    8 No internal/external scheduling.
    9 Synchronisation points in thread destructors.
     9Basic monitors for synchronisation (No internal/external scheduling).
     10Non-thrash scheduler.
     11Clusters.
    1012
    1113_Phase 3_ : Kernel features
    1214Threads features ex: detach
    1315Internal scheduling
    14 Clusters
    1516
    1617_Phase 4_ : Monitor features
  • src/examples/thread.c

    rdcb42b8 rbd98b58  
    1010};
    1111
    12 DECL_THREAD(MyThread)
     12// DECL_THREAD(MyThread)
    1313
    1414void ?{}( MyThread * this, unsigned id, unsigned count ) {
     
    1717}
    1818
    19 void main(MyThread* this) {
    20         sout | "Thread" | this->id | " : Suspending" | this->count | "times" | endl;
    21         suspend();
     19// void main(MyThread* this) {
     20//      sout | "Thread" | this->id | " : Suspending" | this->count | "times" | endl;
     21//      yield();
    2222
    23         for(int i = 0; i < this->count; i++) {
    24                 sout | "Thread" | this->id | " : Suspend No." | i + 1 | endl;
    25                 suspend();
    26         }
    27 }
     23//      for(int i = 0; i < this->count; i++) {
     24//              sout | "Thread" | this->id | " : Suspend No." | i + 1 | endl;
     25//              yield();
     26//      }
     27// }
    2828
    2929int main(int argc, char* argv[]) {
    3030
    31         unsigned itterations = 10u;
    32         if(argc == 2) {
    33                 int val = ato(argv[1]);
    34                 assert(val >= 0);
    35                 itterations = val;
    36         }
     31        // unsigned itterations = 10u;
     32        // if(argc == 2) {
     33        //      int val = ato(argv[1]);
     34        //      assert(val >= 0);
     35        //      itterations = val;
     36        // }
    3737
    3838        sout | "User main begin" | endl;
    3939
    40         {
    41                 thread(MyThread) thread1 = { 1u, itterations };
    42                 thread(MyThread) thread2 = { 2u, itterations };
    43         }
     40        // {
     41        //      thread(MyThread) thread1 = { 1u, itterations };
     42        //      thread(MyThread) thread2 = { 2u, itterations };
     43        // }
    4444
    4545        sout | "User main end" | endl;
  • src/libcfa/concurrency/coroutines

    rdcb42b8 rbd98b58  
    6262
    6363// Get current coroutine
    64 extern coroutine * current_coroutine; //PRIVATE, never use directly
    65 static inline coroutine * this_coroutine(void) {
    66         return current_coroutine;
    67 }
     64coroutine * this_coroutine(void);
    6865
    6966// Private wrappers for context switch and stack creation
  • src/libcfa/concurrency/coroutines.c

    rdcb42b8 rbd98b58  
    1414//
    1515
     16#include "coroutines"
     17
    1618extern "C" {
    1719#include <stddef.h>
     
    2325}
    2426
    25 #include "coroutines"
     27#include "kernel"
    2628#include "libhdr.h"
    2729
    2830#define __CFA_INVOKE_PRIVATE__
    2931#include "invoke.h"
     32
     33/*thread_local*/ extern processor * this_processor;
    3034
    3135//-----------------------------------------------------------------------------
     
    3539#define MinStackSize 1000
    3640static size_t pageSize = 0;                             // architecture pagesize HACK, should go in proper runtime singleton
    37 
    38 //Current coroutine
    39 //Will need to be in TLS when multi-threading is added
    40 coroutine* current_coroutine;
    4141
    4242//-----------------------------------------------------------------------------
     
    110110
    111111        // set new coroutine that task is executing
    112         current_coroutine = dst;                       
     112        this_processor->current_coroutine = dst;                       
    113113
    114114        // context switch to specified coroutine
  • src/libcfa/concurrency/invoke.c

    rdcb42b8 rbd98b58  
    1414
    1515extern void __suspend_no_inline__F___1(void);
    16 extern void __scheduler_remove__F_P9sthread_h__1(struct thread_h*);
     16extern void __signal_termination__F_P9sthread_h__1(struct thread_h*);
    1717
    1818void CtxInvokeCoroutine(
     
    5757      main( this );
    5858
    59       cor->state = Halt;
    60       cor->notHalted = false;
    61       __scheduler_remove__F_P9sthread_h__1(thrd);
     59      __signal_termination__F_P9sthread_h__1(thrd);
    6260
    6361      //Final suspend, should never return
  • src/libcfa/concurrency/invoke.h

    rdcb42b8 rbd98b58  
    1111
    1212      #define unlikely(x)    __builtin_expect(!!(x), 0)
     13      #define SCHEDULER_CAPACITY 10
     14
     15      struct simple_thread_list {
     16            struct thread_h * head;
     17            struct thread_h ** tail;
     18      };
     19
     20      #ifdef __CFORALL__
     21      extern "Cforall" {
     22            void ?{}( struct simple_thread_list * );
     23            void append( struct simple_thread_list *, struct thread_h * );
     24            struct thread_h * pop_head( struct simple_thread_list * );
     25      }
     26      #endif
    1327
    1428      struct coStack_t {
     
    3549      };
    3650
     51      struct simple_lock {
     52        struct simple_thread_list blocked;
     53      };
     54
    3755      struct thread_h {
    3856            struct coroutine c;
     57            struct simple_lock lock;
     58            struct thread_h * next;
    3959      };
    4060
  • src/libcfa/concurrency/kernel

    rdcb42b8 rbd98b58  
    2020#include <stdbool.h>
    2121
     22#include "invoke.h"
     23
     24//-----------------------------------------------------------------------------
     25// Cluster
     26struct cluster {
     27        simple_thread_list ready_queue;
     28};
     29
     30void ?{}(cluster * this);
     31void ^?{}(cluster * this);
     32
     33//-----------------------------------------------------------------------------
     34// Processor
    2235struct processor {
    2336        struct processorCtx_t * ctx;
    24         unsigned int thread_index;
    25         unsigned int thread_count;
    26         struct thread_h * threads[10];
     37        cluster * cltr;
     38        coroutine * current_coroutine;
     39        thread_h * current_thread;
    2740        bool terminated;
    2841};
    2942
    30 void ?{}(processor * this);
     43void ?{}(processor * this, cluster * cltr);
    3144void ^?{}(processor * this);
    3245
    33 void scheduler_add( struct thread_h * thrd );
    34 void scheduler_remove( struct thread_h * thrd );
    35 void kernel_run( void );
     46
     47//-----------------------------------------------------------------------------
     48// Locks
     49
     50void ?{}(simple_lock * this);
     51void ^?{}(simple_lock * this);
     52
     53void lock( simple_lock * );
     54void unlock( simple_lock * );
    3655
    3756#endif //KERNEL_H
  • src/libcfa/concurrency/kernel.c

    rdcb42b8 rbd98b58  
    3232#include "invoke.h"
    3333
     34cluster * systemCluster;
    3435processor * systemProcessor;
    3536thread_h * mainThread;
     
    3839void kernel_shutdown(void) __attribute__((destructor(101)));
    3940
    40 void ?{}(processor * this) {
     41void ?{}(processor * this, cluster * cltr) {
    4142        this->ctx = NULL;
    42         this->thread_index = 0;
    43         this->thread_count = 10;
     43        this->cltr = cltr;
    4444        this->terminated = false;
    45 
    46         for(int i = 0; i < 10; i++) {
    47                 this->threads[i] = NULL;
    48         }
    49 
    50         LIB_DEBUG_PRINTF("Processor : ctor for core %p (core spots %d)\n", this, this->thread_count);
    51 }
    52 
    53 void ^?{}(processor * this) {
    54 
     45}
     46
     47void ^?{}(processor * this) {}
     48
     49void ?{}(cluster * this) {
     50        ( &this->ready_queue ){};
     51}
     52
     53void ^?{}(cluster * this) {}
     54
     55//-----------------------------------------------------------------------------
     56// Global state
     57
     58/*thread_local*/ processor * this_processor;
     59
     60coroutine * this_coroutine(void) {
     61        return this_processor->current_coroutine;
     62}
     63
     64thread_h * this_thread(void) {
     65        return this_processor->current_thread;
    5566}
    5667
     
    7788// Processor running routines
    7889void main(processorCtx_t * ctx);
    79 thread_h * nextThread(processor * this);
     90thread_h * nextThread(cluster * this);
    8091void runThread(processor * this, thread_h * dst);
    8192void spin(processor * this, unsigned int * spin_count);
     
    8899        for( unsigned int spin_count = 0; ! this->terminated; spin_count++ ) {
    89100               
    90                 readyThread = nextThread(this);
     101                readyThread = nextThread( this->cltr );
    91102
    92103                if(readyThread) {
     
    101112}
    102113
    103 thread_h * nextThread(processor * this) {
    104         for(int i = 0; i < this->thread_count; i++) {
    105                 this->thread_index = (this->thread_index + 1) % this->thread_count;     
    106                
    107                 thread_h * thrd = this->threads[this->thread_index];
    108                 if(thrd) return thrd;
    109         }
    110 
    111         return NULL;
    112 }
    113 
    114114void runThread(processor * this, thread_h * dst) {
    115115        coroutine * proc_ctx = get_coroutine(this->ctx);
     
    120120        // Which is now the current_coroutine
    121121        // LIB_DEBUG_PRINTF("Kernel : switching to ctx %p (from %p, current %p)\n", thrd_ctx, proc_ctx, current_coroutine);
    122         current_coroutine = thrd_ctx;
     122        this->current_thread = dst;
     123        this->current_coroutine = thrd_ctx;
    123124        CtxSwitch( proc_ctx->stack.context, thrd_ctx->stack.context );
    124         current_coroutine = proc_ctx;
     125        this->current_coroutine = proc_ctx;
    125126        // LIB_DEBUG_PRINTF("Kernel : returned from ctx %p (to %p, current %p)\n", thrd_ctx, proc_ctx, current_coroutine);
    126127
     
    133134
    134135//-----------------------------------------------------------------------------
    135 // Kernel runner (Temporary)
    136 
    137 void scheduler_add( thread_h * thrd ) {
    138         for(int i = 0; i < systemProcessor->thread_count; i++) {
    139                 if(systemProcessor->threads[i] == NULL) {
    140                         systemProcessor->threads[i] = thrd;
    141                         return;
    142                 }
    143         }
    144         assertf(false, "Scheduler full");
    145 }
    146 
    147 void scheduler_remove( thread_h * thrd ) {
    148         for(int i = 0; i < systemProcessor->thread_count; i++) {
    149                 if(systemProcessor->threads[i] == thrd) {
    150                         systemProcessor->threads[i] = NULL;
    151                         return;
    152                 }
    153         }
    154         assertf(false, "Trying to unschedule unkown thread");
     136// Scheduler routines
     137void thread_schedule( thread_h * thrd ) {
     138        assertf( thrd->next == NULL, "Expected null got %p", thrd->next );
     139        append( &systemProcessor->cltr->ready_queue, thrd );
     140}
     141
     142thread_h * nextThread(cluster * this) {
     143        return pop_head( &this->ready_queue );
    155144}
    156145
     
    160149
    161150KERNEL_STORAGE(processorCtx_t, systemProcessorCtx);
     151KERNEL_STORAGE(cluster, systemCluster);
    162152KERNEL_STORAGE(processor, systemProcessor);
    163153KERNEL_STORAGE(thread_h, mainThread);
     
    221211
    222212        mainThread_info_t ctx;
    223         // LIB_DEBUG_PRINTF("Kernel :    base : %p\n", ctx.base );
    224         // LIB_DEBUG_PRINTF("Kernel :     top : %p\n", ctx.top );
    225         // LIB_DEBUG_PRINTF("Kernel :   limit : %p\n", ctx.limit );
    226         // LIB_DEBUG_PRINTF("Kernel :    size : %x\n", ctx.size );
    227         // LIB_DEBUG_PRINTF("Kernel : storage : %p\n", ctx.storage );
    228         // LIB_DEBUG_PRINTF("Kernel : context : %p\n", ctx.context );
    229213
    230214        // Start by initializing the main thread
     
    232216        mainThread{ &ctx };
    233217
    234         // // Initialize the system processor
     218        // Initialize the system cluster
     219        systemCluster = (cluster *)&systemCluster_storage;
     220        systemCluster{};
     221
     222        // Initialize the system processor
    235223        systemProcessor = (processor *)&systemProcessor_storage;
    236         systemProcessor{};
     224        systemProcessor{ systemCluster };
    237225
    238226        // Initialize the system processor ctx
     
    243231        // Add the main thread to the ready queue
    244232        // once resume is called on systemProcessor->ctx the mainThread needs to be scheduled like any normal thread
    245         scheduler_add(mainThread);
     233        thread_schedule(mainThread);
    246234
    247235        //initialize the global state variables
    248         current_coroutine = &mainThread->c;
     236        this_processor = systemProcessor;
     237        this_processor->current_thread = mainThread;
     238        this_processor->current_coroutine = &mainThread->c;
    249239
    250240        // SKULLDUGGERY: Force a context switch to the system processor to set the main thread's context to the current UNIX
     
    285275}
    286276
     277//-----------------------------------------------------------------------------
     278// Locks
     279void ?{}( simple_lock * this ) {
     280        ( &this->blocked ){};
     281}
     282
     283void ^?{}( simple_lock * this ) {
     284
     285}
     286
     287void lock( simple_lock * this ) {
     288        append( &this->blocked, this_thread() );
     289        suspend();
     290}
     291
     292void unlock( simple_lock * this ) {
     293        thread_h * it;
     294        while( it = pop_head( &this->blocked) ) {
     295                thread_schedule( it );
     296        }
     297}
     298
     299//-----------------------------------------------------------------------------
     300// Queues
     301void ?{}( simple_thread_list * this ) {
     302        this->head = NULL;
     303        this->tail = &this->head;
     304}
     305
     306void append( simple_thread_list * this, thread_h * t ) {
     307        assert( t->next == NULL );
     308        *this->tail = t;
     309        this->tail = &t->next;
     310}
     311
     312thread_h * pop_head( simple_thread_list * this ) {
     313        thread_h * head = this->head;
     314        if( head ) {
     315                this->head = head->next;
     316                if( !head->next ) {
     317                        this->tail = &this->head;
     318                }
     319                head->next = NULL;
     320        }       
     321       
     322        return head;
     323}
    287324// Local Variables: //
    288325// mode: c //
  • src/libcfa/concurrency/threads

    rdcb42b8 rbd98b58  
    4545}
    4646
     47thread_h * this_thread(void);
     48
    4749//-----------------------------------------------------------------------------
    4850// Ctors and dtors
     
    6769void ^?{}( thread(T)* this );
    6870
    69 //-----------------------------------------------------------------------------
    70 // PRIVATE exposed because of inline
     71void yield();
    7172
    7273#endif //THREADS_H
  • src/libcfa/concurrency/threads.c

    rdcb42b8 rbd98b58  
    2323#include "invoke.h"
    2424
    25 #include <stdlib>
     25extern "C" {
     26        #include <stddef.h>
     27}
     28
     29/*thread_local*/ extern processor * this_processor;
    2630
    2731//-----------------------------------------------------------------------------
    2832// Forward declarations
    2933forall(otype T | is_thread(T) )
    30 void start( thread(T)* this );
     34void start( T* this );
    3135
    3236forall(otype T | is_thread(T) )
    33 void stop( thread(T)* this );
     37void stop( T* this );
    3438
    3539//-----------------------------------------------------------------------------
     
    3842void ?{}(thread_h* this) {
    3943        (&this->c){};
     44        (&this->lock){};
     45        this->next = NULL;
    4046}
    4147
     
    4753void ?{}( thread(T)* this ) {
    4854        (&this->handle){};
    49         start(this);
     55        start(&this->handle);
    5056}
    5157
     
    5359void ?{}( thread(T)* this, P params ) {
    5460        (&this->handle){ params };
    55         start(this);
     61        start(&this->handle);
    5662}
    5763
    5864forall(otype T | is_thread(T) )
    5965void ^?{}( thread(T)* this ) {
    60         stop(this);
     66        stop(&this->handle);
    6167        ^(&this->handle){};
    6268}
     
    6975}
    7076
     77extern void thread_schedule( thread_h * );
     78
    7179forall(otype T | is_thread(T))
    72 void start( thread(T)* this ) {
    73         T* handle  = &this->handle;
    74         coroutine* thrd_c = get_coroutine(handle);
    75         thread_h*  thrd_h = get_thread   (handle);
     80void start( T* this ) {
     81        coroutine* thrd_c = get_coroutine(this);
     82        thread_h*  thrd_h = get_thread   (this);
    7683        thrd_c->last = this_coroutine();
    77         current_coroutine = thrd_c;
     84        this_processor->current_coroutine = thrd_c;
    7885
    7986        // LIB_DEBUG_PRINTF("Thread start : %p (t %p, c %p)\n", handle, thrd_c, thrd_h);
    8087
    8188        create_stack(&thrd_c->stack, thrd_c->stack.size);
    82         CtxStart(handle, CtxInvokeThread);
     89        CtxStart(this, CtxInvokeThread);
    8390        CtxSwitch( thrd_c->last->stack.context, thrd_c->stack.context );
    8491
    85         scheduler_add(thrd_h);
     92        thread_schedule(thrd_h);
    8693}
    8794
    8895forall(otype T | is_thread(T) )
    89 void stop( thread(T)* this ) {
    90         T* handle  = &this->handle;
    91         thread_h*  thrd_h = get_thread   (handle);
    92         while( thrd_h->c.notHalted ) {
    93                 suspend();
     96void stop( T* this ) {
     97        thread_h*  thrd = get_thread(this);
     98        if( thrd->c.notHalted ) {
     99                lock( &thrd->lock );
    94100        }
     101}
     102
     103void signal_termination( thread_h * this ) {
     104        this->c.state = Halt;
     105      this->c.notHalted = false;
     106        unlock( &this->lock );
     107}
     108
     109void yield( void ) {
     110        thread_schedule( this_thread() );
     111        suspend();
    95112}
    96113
Note: See TracChangeset for help on using the changeset viewer.