Changeset cb0e6de


Ignore:
Timestamp:
Mar 21, 2017, 2:29:25 PM (5 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
aaron-thesis, arm-eh, cleanup-dtors, deferred_resn, demangler, jacob/cs343-translation, jenkins-sandbox, master, new-ast, new-ast-unique-expr, new-env, no_list, persistent-indexer, resolv-new, with_gc
Children:
9c31349
Parents:
e04b636
Message:

Threads now use monitor semantics to wait until completion

Location:
src
Files:
10 edited

Legend:

Unmodified
Added
Removed
  • src/Concurrency/Keywords.cc

    re04b636 rcb0e6de  
    331331                        ),
    332332                        new ListInit(
    333                                 map_range < std::list<Initializer*> > ( args, [](DeclarationWithType * var ){
     333                                map_range < std::list<Initializer*> > ( args, [this](DeclarationWithType * var ){
     334                                        Type * type = var->get_type()->clone();
     335                                        type->set_mutex( false );
    334336                                        return new SingleInit( new UntypedExpr(
    335337                                                new NameExpr( "get_monitor" ),
    336                                                 {  new VariableExpr( var ) }
     338                                                {  new CastExpr( new VariableExpr( var ), type ) }
    337339                                        ) );
    338340                                })
  • src/libcfa/concurrency/invoke.c

    re04b636 rcb0e6de  
    2929
    3030extern void __suspend_internal(void);
    31 extern void __thread_signal_termination(struct thread_desc*);
     31extern void __leave_monitor_desc( struct monitor_desc * this );
    3232
    3333void CtxInvokeCoroutine(
     
    6565      struct thread_desc* thrd = get_thread( this );
    6666      struct coroutine_desc* cor = &thrd->cor;
     67      struct monitor_desc* mon = &thrd->mon;
    6768      cor->state = Active;
    6869
     
    7071      main( this );
    7172
    72       __thread_signal_termination(thrd);
     73      __leave_monitor_desc( mon );
    7374
    7475      //Final suspend, should never return
  • src/libcfa/concurrency/invoke.h

    re04b636 rcb0e6de  
    7979      };
    8080
     81      struct monitor_desc {
     82            struct spinlock lock;
     83            struct thread_desc * owner;
     84            struct simple_thread_list entry_queue;
     85            unsigned int recursion;
     86      };
     87
    8188      struct thread_desc {
    82             struct coroutine_desc cor;            // coroutine body used to store context
     89            struct coroutine_desc cor;          // coroutine body used to store context
     90            struct monitor_desc mon;
    8391            struct signal_once terminated;      // indicate if execuation state is not halted
    8492            struct thread_desc * next;          // instrusive link field for threads
  • src/libcfa/concurrency/monitor

    re04b636 rcb0e6de  
    2121#include "invoke.h"
    2222#include "stdlib"
    23 
    24 struct monitor_desc {
    25         spinlock lock;
    26         thread_desc * owner;
    27         simple_thread_list entry_queue;
    28         unsigned int recursion;
    29 };
    3023
    3124static inline void ?{}(monitor_desc * this) {
  • src/libcfa/concurrency/monitor.c

    re04b636 rcb0e6de  
    1919#include "kernel_private.h"
    2020
    21 void enter(monitor_desc * this) {
    22         lock( &this->lock );
    23         thread_desc * thrd = this_thread();
     21extern "C" {
     22        void __enter_monitor_desc(monitor_desc * this) {
     23                lock( &this->lock );
     24                thread_desc * thrd = this_thread();
    2425
    25         if( !this->owner ) {
    26                 //No one has the monitor, just take it
    27                 this->owner = thrd;
    28                 this->recursion = 1;
    29         }
    30         else if( this->owner == thrd) {
    31                 //We already have the monitor, just not how many times we took it
    32                 assert( this->recursion > 0 );
    33                 this->recursion += 1;
    34         }
    35         else {
    36                 //Some one else has the monitor, wait in line for it
    37                 append( &this->entry_queue, thrd );
    38                 ScheduleInternal( &this->lock );
     26                if( !this->owner ) {
     27                        //No one has the monitor, just take it
     28                        this->owner = thrd;
     29                        this->recursion = 1;
     30                }
     31                else if( this->owner == thrd) {
     32                        //We already have the monitor, just not how many times we took it
     33                        assert( this->recursion > 0 );
     34                        this->recursion += 1;
     35                }
     36                else {
     37                        //Some one else has the monitor, wait in line for it
     38                        append( &this->entry_queue, thrd );
     39                        ScheduleInternal( &this->lock );
    3940
    40                 //ScheduleInternal will unlock spinlock, no need to unlock ourselves
    41                 return;
     41                        //ScheduleInternal will unlock spinlock, no need to unlock ourselves
     42                        return;
     43                }
     44
     45                unlock( &this->lock );
    4246        }
    4347
    44         unlock( &this->lock );
    45 }
     48        void __leave_monitor_desc(monitor_desc * this) {
     49                lock( &this->lock );
    4650
    47 void leave(monitor_desc * this) {
    48         lock( &this->lock );
     51                thread_desc * thrd = this_thread();
     52                assert( thrd == this->owner );
    4953
    50         thread_desc * thrd = this_thread();
    51         assert( thrd == this->owner );
     54                //Leaving a recursion level, decrement the counter
     55                this->recursion -= 1;
    5256
    53         //Leaving a recursion level, decrement the counter
    54         this->recursion -= 1;
     57                //If we left the last level of recursion it means we are changing who owns the monitor
     58                thread_desc * new_owner = 0;
     59                if( this->recursion == 0) {
     60                        //Get the next thread in the list
     61                        new_owner = this->owner = pop_head( &this->entry_queue );
    5562
    56         //If we left the last level of recursion it means we are changing who owns the monitor
    57         thread_desc * new_owner = 0;
    58         if( this->recursion == 0) {
    59                 //Get the next thread in the list
    60                 new_owner = this->owner = pop_head( &this->entry_queue );
     63                        //We are passing the monitor to someone else, which means recursion level is not 0
     64                        this->recursion = new_owner ? 1 : 0;
     65                }       
    6166
    62                 //We are passing the monitor to someone else, which means recursion level is not 0
    63                 this->recursion = new_owner ? 1 : 0;
    64         }       
     67                unlock( &this->lock );
    6568
    66         unlock( &this->lock );
    67 
    68         //If we have a new owner, we need to wake-up the thread
    69         if( new_owner ) {
    70                 ScheduleThread( new_owner );
     69                //If we have a new owner, we need to wake-up the thread
     70                if( new_owner ) {
     71                        ScheduleThread( new_owner );
     72                }
    7173        }
    7274}
     
    7476void enter(monitor_desc ** monitors, int count) {
    7577        for(int i = 0; i < count; i++) {
    76                 enter( monitors[i] );
     78                __enter_monitor_desc( monitors[i] );
    7779        }
    7880}
     
    8082void leave(monitor_desc ** monitors, int count) {
    8183        for(int i = count - 1; i >= 0; i--) {
    82                 leave( monitors[i] );
     84                __leave_monitor_desc( monitors[i] );
    8385        }
    8486}
  • src/libcfa/concurrency/thread

    re04b636 rcb0e6de  
    2222
    2323#include "coroutine"
     24#include "monitor"
    2425
    2526//-----------------------------------------------------------------------------
     
    2829// Anything that is resumed is a coroutine.
    2930trait is_thread(dtype T) {
    30       void ^?{}(T* this);
     31      void ^?{}(T* mutex this);
    3132      void main(T* this);
    3233      thread_desc* get_thread(T* this);
     
    4041}
    4142
    42 static inline coroutine_desc* get_coroutine(thread_desc* this) {
     43forall( dtype T | is_thread(T) )
     44static inline monitor_desc* get_monitor(T * this) {
     45        return &get_thread(this)->mon;
     46}
     47
     48static inline coroutine_desc* get_coroutine(thread_desc * this) {
    4349        return &this->cor;
     50}
     51
     52static inline monitor_desc* get_monitor(thread_desc * this) {
     53        return &this->mon;
    4454}
    4555
  • src/libcfa/concurrency/thread.c

    re04b636 rcb0e6de  
    4444        (&this->cor){};
    4545        this->cor.name = "Anonymous Coroutine";
     46        this->mon.owner = this;
     47        this->mon.recursion = 1;
    4648        (&this->terminated){};
    4749        this->next = NULL;
     
    9092forall( dtype T | is_thread(T) )
    9193void stop( T* this ) {
    92         wait( & get_thread(this)->terminated );
     94        // wait( & get_thread(this)->terminated );     
    9395}
    9496
  • src/tests/monitor.c

    re04b636 rcb0e6de  
    3636
    3737void ?{}( MyThread * this ) {}
     38void ^?{}( MyThread * mutex this ) {}
    3839
    3940void main( MyThread* this ) {
    40         for(int i = 0; i < 1000000; i++) {
     41        for(int i = 0; i < 1_000_000; i++) {
    4142                increment( &global );
    4243        }
  • src/tests/multi-monitor.c

    re04b636 rcb0e6de  
    3131}
    3232
     33void ^?{}( MyThread * mutex this ) {}
     34
    3335void main( MyThread* this ) {
    3436        for(int i = 0; i < 1000000; i++) {
  • src/tests/thread.c

    re04b636 rcb0e6de  
    1212void ?{}( First * this, signal_once* lock ) { this->lock = lock; }
    1313void ?{}( Second * this, signal_once* lock ) { this->lock = lock; }
     14
     15void ^?{}( First  * mutex this ) {}
     16void ^?{}( Second * mutex this ) {}
    1417
    1518void main(First* this) {
Note: See TracChangeset for help on using the changeset viewer.