Changeset b18830e for src


Ignore:
Timestamp:
Sep 20, 2017, 2:07:57 PM (7 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, aaron-thesis, arm-eh, ast-experimental, cleanup-dtors, deferred_resn, demangler, enum, forall-pointer-decay, jacob/cs343-translation, jenkins-sandbox, master, new-ast, new-ast-unique-expr, new-env, no_list, persistent-indexer, pthread-emulation, qualifiedEnum, resolv-new, with_gc
Children:
0895cba
Parents:
f980549
Message:

Refactoring monitor code in prevision for proper waitfor support

  • added monitor group struct
  • else and timeout now return negative results
Location:
src
Files:
7 edited

Legend:

Unmodified
Added
Removed
  • src/Concurrency/Waitfor.cc

    rf980549 rb18830e  
    484484                                new CaseStmt(
    485485                                        noLabels,
    486                                         new ConstantExpr( Constant::from_ulong( i++ ) ),
     486                                        new ConstantExpr( Constant::from_int( -2 ) ),
    487487                                        {
    488488                                                waitfor->timeout.statement,
     
    501501                                new CaseStmt(
    502502                                        noLabels,
    503                                         new ConstantExpr( Constant::from_ulong( i++ ) ),
     503                                        new ConstantExpr( Constant::from_int( -1 ) ),
    504504                                        {
    505505                                                waitfor->orelse.statement,
  • src/libcfa/concurrency/invoke.h

    rf980549 rb18830e  
    9494            unsigned short acceptable_count;          // number of acceptable functions
    9595            short accepted_index;                     // the index of the accepted function, -1 if none
    96        };
     96      };
     97
     98      struct __monitor_group {
     99            struct monitor_desc ** list;              // currently held monitors
     100            short                  size;              // number of currently held monitors
     101            fptr_t                 func;              // last function that acquired monitors
     102      };
    97103
    98104      struct thread_desc {
    99105            // Core threading fields
    100             struct coroutine_desc cor;                // coroutine body used to store context
    101             struct monitor_desc mon;                  // monitor body used for mutual exclusion
     106            struct coroutine_desc  self_cor;          // coroutine body used to store context
     107            struct monitor_desc    self_mon;          // monitor body used for mutual exclusion
     108            struct monitor_desc *  self_mon_p;        // pointer to monitor with sufficient lifetime for current monitors
     109            struct __monitor_group monitors;          // monitors currently held by this thread
    102110
    103111            // Link lists fields
    104112            struct thread_desc * next;                // instrusive link field for threads
    105113
    106             // Current status related to monitors
    107             struct monitor_desc ** current_monitors;  // currently held monitors
    108             unsigned short current_monitor_count;     // number of currently held monitors
    109             fptr_t current_monitor_func;              // last function that acquired monitors
     114
    110115     };
     116
     117     #ifdef __CFORALL__
     118     extern "Cforall" {
     119            static inline monitor_desc * ?[?]( const __monitor_group & this, ptrdiff_t index ) {
     120                  return this.list[index];
     121            }
     122
     123            static inline bool ?==?( const __monitor_group & lhs, const __monitor_group & rhs ) {
     124                  if( lhs.size != rhs.size ) return false;
     125                  if( lhs.func != rhs.func ) return false;
     126
     127                  // Check that all the monitors match
     128                  for( int i = 0; i < lhs.size; i++ ) {
     129                        // If not a match, check next function
     130                        if( lhs[i] != rhs[i] ) return false;
     131                  }
     132
     133                  return true;
     134            }
     135      }
     136      #endif
    111137
    112138#endif //_INVOKE_H_
  • src/libcfa/concurrency/kernel.c

    rf980549 rb18830e  
    106106
    107107void ?{}( thread_desc & this, current_stack_info_t * info) {
    108         (this.cor){ info };
     108        (this.self_cor){ info };
    109109}
    110110
     
    115115void ?{}(processorCtx_t & this, processor * proc) {
    116116        (this.__cor){ "Processor" };
    117         this.__cor.starter = &mainThread->cor;
     117        this.__cor.starter = &mainThread->self_cor;
    118118        this.proc = proc;
    119119        proc->runner = &this;
     
    328328        // if( !thrd ) return;
    329329        verify( thrd );
    330         verify( thrd->cor.state != Halted );
     330        verify( thrd->self_cor.state != Halted );
    331331
    332332        verify( disable_preempt_count > 0 );
     
    373373        assert(thrd);
    374374        disable_interrupts();
    375         assert( thrd->cor.state != Halted );
     375        assert( thrd->self_cor.state != Halted );
    376376        this_processor->finish.action_code = Schedule;
    377377        this_processor->finish.thrd = thrd;
     
    466466        this_processor = mainProcessor;
    467467        this_thread = mainThread;
    468         this_coroutine = &mainThread->cor;
     468        this_coroutine = &mainThread->self_cor;
    469469
    470470        // Enable preemption
     
    547547        thread_desc * thrd = kernel_data;
    548548
    549         int len = snprintf( abort_text, abort_text_size, "Error occurred while executing task %.256s (%p)", thrd->cor.name, thrd );
     549        int len = snprintf( abort_text, abort_text_size, "Error occurred while executing task %.256s (%p)", thrd->self_cor.name, thrd );
    550550        __lib_debug_write( STDERR_FILENO, abort_text, len );
    551551
  • src/libcfa/concurrency/monitor

    rf980549 rb18830e  
    105105
    106106struct __acceptable_t {
    107         fptr_t func;
    108         unsigned short count;
    109         monitor_desc ** monitors;
     107        __monitor_group monitors;
    110108        bool is_dtor;
    111109};
  • src/libcfa/concurrency/monitor.c

    rf980549 rb18830e  
    2525static inline void set_owner( monitor_desc * this, thread_desc * owner );
    2626static inline thread_desc * next_thread( monitor_desc * this );
    27 static inline int is_accepted( thread_desc * owner, monitor_desc * this, monitor_desc ** group, int group_cnt, void (*func)() );
     27static inline int is_accepted( thread_desc * owner, monitor_desc * this, const __monitor_group & monitors );
    2828
    2929static inline void lock_all( spinlock ** locks, unsigned short count );
     
    4242static inline unsigned short insert_unique( thread_desc ** thrds, unsigned short end, thread_desc * val );
    4343
    44 static inline thread_desc * search_entry_queue( __acceptable_t * acceptables, int acc_count, monitor_desc ** monitors, int count );
     44static inline [thread_desc *, int] search_entry_queue( __acceptable_t * acceptables, int acc_count, monitor_desc ** monitors, int count );
     45
     46static inline short count_max( short acc_count, __acceptable_t * acceptables );
     47static inline short aggregate( monitor_desc ** storage, short count, __acceptable_t * acceptables );
     48static inline void  set_mask ( monitor_desc ** storage, short count, __acceptable_t * acceptables, short acc_count );
    4549
    4650//-----------------------------------------------------------------------------
     
    6872extern "C" {
    6973        // Enter single monitor
    70         static void __enter_monitor_desc( monitor_desc * this, monitor_desc ** group, int group_cnt, void (*func)() ) {
     74        static void __enter_monitor_desc( const __monitor_group & group ) {
     75                monitor_desc * this = group.list[0];
     76
    7177                // Lock the monitor spinlock, lock_yield to reduce contention
    7278                lock_yield( &this->lock DEBUG_CTX2 );
     
    8995                        LIB_DEBUG_PRINT_SAFE("Kernel :  mon already owned \n");
    9096                }
    91                 else if( (this->accepted_index = is_accepted( thrd, this, group, group_cnt, func)) >= 0 ) {
     97                else if( (this->accepted_index = is_accepted( thrd, this, group)) >= 0 ) {
    9298                        // Some one was waiting for us, enter
    9399                        set_owner( this, thrd );
     
    146152        // Should never return
    147153        void __leave_thread_monitor( thread_desc * thrd ) {
    148                 monitor_desc * this = &thrd->mon;
     154                monitor_desc * this = &thrd->self_mon;
    149155
    150156                // Lock the monitor now
     
    153159                disable_interrupts();
    154160
    155                 thrd->cor.state = Halted;
     161                thrd->self_cor.state = Halted;
    156162
    157163                verifyf( thrd == this->owner, "Expected owner to be %p, got %p (r: %i)", thrd, this->owner, this->recursion );
     
    178184// Enter multiple monitor
    179185// relies on the monitor array being sorted
    180 static inline void enter(monitor_desc ** monitors, int count, void (*func)() ) {
    181         for(int i = 0; i < count; i++) {
    182                 __enter_monitor_desc( monitors[i], monitors, count, func );
     186static inline void enter( __monitor_group monitors ) {
     187        for(int i = 0; i < monitors.size; i++) {
     188                __enter_monitor_desc( monitors );
    183189        }
    184190}
     
    203209
    204210        // Save previous thread context
    205         this.prev_mntrs = this_thread->current_monitors;
    206         this.prev_count = this_thread->current_monitor_count;
    207         this.prev_func  = this_thread->current_monitor_func;
     211        this.prev_mntrs = this_thread->monitors.list;
     212        this.prev_count = this_thread->monitors.size;
     213        this.prev_func  = this_thread->monitors.func;
    208214
    209215        // Update thread context (needed for conditions)
    210         this_thread->current_monitors      = m;
    211         this_thread->current_monitor_count = count;
    212         this_thread->current_monitor_func = func;
     216        this_thread->monitors.list = m;
     217        this_thread->monitors.size = count;
     218        this_thread->monitors.func = func;
    213219
    214220        // Enter the monitors in order
    215         enter( this.m, this.count, func );
     221        __monitor_group group = {this.m, this.count, func};
     222        enter( group );
    216223}
    217224
     
    223230
    224231        // Restore thread context
    225         this_thread->current_monitors      = this.prev_mntrs;
    226         this_thread->current_monitor_count = this.prev_count;
    227         this_thread->current_monitor_func = this.prev_func;
     232        this_thread->monitors.list = this.prev_mntrs;
     233        this_thread->monitors.size = this.prev_count;
     234        this_thread->monitors.func = this.prev_func;
    228235}
    229236
     
    315322        LIB_DEBUG_DO(
    316323                thread_desc * this_thrd = this_thread;
    317                 if ( this->monitor_count != this_thrd->current_monitor_count ) {
    318                         abortf( "Signal on condition %p made with different number of monitor(s), expected %i got %i", this, this->monitor_count, this_thrd->current_monitor_count );
     324                if ( this->monitor_count != this_thrd->monitors.size ) {
     325                        abortf( "Signal on condition %p made with different number of monitor(s), expected %i got %i", this, this->monitor_count, this_thrd->monitors.size );
    319326                }
    320327
    321328                for(int i = 0; i < this->monitor_count; i++) {
    322                         if ( this->monitors[i] != this_thrd->current_monitors[i] ) {
    323                                 abortf( "Signal on condition %p made with different monitor, expected %p got %i", this, this->monitors[i], this_thrd->current_monitors[i] );
     329                        if ( this->monitors[i] != this_thrd->monitors.list[i] ) {
     330                                abortf( "Signal on condition %p made with different monitor, expected %p got %i", this, this->monitors[i], this_thrd->monitors.list[i] );
    324331                        }
    325332                }
     
    397404
    398405//-----------------------------------------------------------------------------
    399 // Internal scheduling
    400 int __waitfor_internal( unsigned short acc_count, __acceptable_t * acceptables ) {
    401         thread_desc * thrd = this_thread;
     406// External scheduling
     407// cases to handle :
     408//      - target already there :
     409//              block and wake
     410//      - dtor already there
     411//              put thread on signaller stack
     412//      - non-blocking
     413//              return else
     414//      - timeout
     415//              return timeout
     416//      - block
     417//              setup mask
     418//              block
     419int __waitfor_internal( unsigned short acc_count, __acceptable_t * acceptables, int duration ) {
     420        // This statment doesn't have a contiguous list of monitors...
     421        // Create one!
     422        short max = count_max( acc_count, acceptables );
     423        monitor_desc * mon_storage[max];
     424        short actual_count = aggregate( mon_storage, acc_count, acceptables );
    402425
    403426        // Create storage for monitor context
    404         monitor_ctx( acceptables->monitors, acceptables->count );
     427        monitor_ctx( mon_storage, actual_count );
    405428
    406429        // Lock all monitors (aggregates the lock them as well)
    407430        lock_all( monitors, locks, count );
    408431
    409         // Create the node specific to this wait operation
    410         wait_ctx_primed( thrd, 0 );
    411 
    412         // Check if the entry queue
    413         thread_desc * next = search_entry_queue( acceptables, acc_count, monitors, count );
    414 
    415         LIB_DEBUG_PRINT_SAFE("Owner(s) :");
    416         for(int i = 0; i < count; i++) {
    417                 LIB_DEBUG_PRINT_SAFE(" %p", monitors[i]->owner );
    418         }
    419         LIB_DEBUG_PRINT_SAFE("\n");
    420 
    421         LIB_DEBUG_PRINT_SAFE("Passing mon to %p\n", next);
    422 
    423         if( !next ) {
    424                 // Update acceptables on the current monitors
    425                 for(int i = 0; i < count; i++) {
    426                         monitors[i]->acceptables = acceptables;
    427                         monitors[i]->acceptable_count = acc_count;
    428                 }
    429         }
    430         else {
    431                 for(int i = 0; i < count; i++) {
    432                         set_owner( monitors[i], next );
    433                 }
    434         }
     432        {
     433                // Check if the entry queue
     434                thread_desc * next;
     435                int index;
     436                [next, index] = search_entry_queue( acceptables, acc_count, monitors, count );
     437
     438                if( next ) {
     439                        if( acceptables[index].is_dtor ) {
     440                                #warning case not implemented
     441                        }
     442                        else {
     443                                save_recursion( monitors, recursions, count );
     444
     445                                // Everything is ready to go to sleep
     446                                BlockInternal( locks, count, &next, 1 );
     447
     448
     449                                //WE WOKE UP
     450
     451
     452                                //We are back, restore the owners and recursions
     453                                lock_all( locks, count );
     454                                restore_recursion( monitors, recursions, count );
     455                                unlock_all( locks, count );
     456                        }
     457
     458                        return index;
     459                }
     460        }
     461
     462
     463        if( duration == 0 ) return -1;
     464
     465        set_mask( monitors, count, acceptables, acc_count );
     466
     467        verifyf( duration < 0, "Timeout on waitfor statments not supported yet.");
    435468
    436469
     
    439472
    440473        // Everything is ready to go to sleep
    441         BlockInternal( locks, count, &next, next ? 1 : 0 );
     474        BlockInternal( locks, count );
    442475
    443476
     
    485518}
    486519
    487 static inline int is_accepted( thread_desc * owner, monitor_desc * this, monitor_desc ** group, int group_cnt, void (*func)() ) {
    488         __acceptable_t* accs = this->acceptables; // Optim
    489         int acc_cnt = this->acceptable_count;
    490 
    491         // Check if there are any acceptable functions
    492         if( !accs ) return -1;
    493 
    494         // If this isn't the first monitor to test this, there is no reason to repeat the test.
    495         if( this != group[0] ) return group[0]->accepted_index;
    496 
    497         // For all acceptable functions check if this is the current function.
    498         OUT_LOOP:
    499         for( int i = 0; i < acc_cnt; i++ ) {
    500                 __acceptable_t * acc = &accs[i];
    501 
    502                 // if function matches, check the monitors
    503                 if( acc->func == func ) {
    504 
    505                         // If the group count is different then it can't be a match
    506                         if( acc->count != group_cnt ) return -1;
    507 
    508                         // Check that all the monitors match
    509                         for( int j = 0; j < group_cnt; j++ ) {
    510                                 // If not a match, check next function
    511                                 if( acc->monitors[j] != group[j] ) continue OUT_LOOP;
    512                         }
    513 
    514                         // It's a complete match, accept the call
    515                         return i;
    516                 }
    517         }
    518 
    519         // No function matched
    520         return -1;
    521 }
    522 
    523520static inline void init( int count, monitor_desc ** monitors, __condition_node_t * waiter, __condition_criterion_t * criteria ) {
    524521        for(int i = 0; i < count; i++) {
     
    607604        if( !this->monitors ) {
    608605                // LIB_DEBUG_PRINT_SAFE("Branding\n");
    609                 assertf( thrd->current_monitors != NULL, "No current monitor to brand condition %p", thrd->current_monitors );
    610                 this->monitor_count = thrd->current_monitor_count;
     606                assertf( thrd->monitors.list != NULL, "No current monitor to brand condition %p", thrd->monitors.list );
     607                this->monitor_count = thrd->monitors.size;
    611608
    612609                this->monitors = malloc( this->monitor_count * sizeof( *this->monitors ) );
    613610                for( int i = 0; i < this->monitor_count; i++ ) {
    614                         this->monitors[i] = thrd->current_monitors[i];
     611                        this->monitors[i] = thrd->monitors.list[i];
    615612                }
    616613        }
     
    628625}
    629626
    630 
    631 static inline bool match( __acceptable_t * acc, thread_desc * thrd ) {
    632         verify( thrd );
    633         verify( acc );
    634         if( acc->func != thrd->current_monitor_func ) return false;
    635 
    636         return true;
    637 }
    638 
    639 static inline thread_desc * search_entry_queue( __acceptable_t * acceptables, int acc_count, monitor_desc ** monitors, int count ) {
     627static inline int is_accepted( thread_desc * owner, monitor_desc * this, const __monitor_group & group ) {
     628        __acceptable_t* accs = this->acceptables; // Optim
     629        int acc_cnt = this->acceptable_count;
     630
     631        // Check if there are any acceptable functions
     632        if( !accs ) return -1;
     633
     634        // If this isn't the first monitor to test this, there is no reason to repeat the test.
     635        if( this != group[0] ) return group[0]->accepted_index;
     636
     637        // For all acceptable functions check if this is the current function.
     638        for( int i = 0; i < acc_cnt; i++ ) {
     639                __acceptable_t * acc = &accs[i];
     640
     641                if( acc->monitors == group ) return i;
     642        }
     643
     644        // No function matched
     645        return -1;
     646}
     647
     648static inline [thread_desc *, int] search_entry_queue( __acceptable_t * acceptables, int acc_count, monitor_desc ** monitors, int count ) {
    640649
    641650        __thread_queue_t * entry_queue = &monitors[0]->entry_queue;
     
    647656        {
    648657                // For each acceptable check if it matches
     658                int i;
    649659                __acceptable_t * acc_end = acceptables + acc_count;
    650                 for( __acceptable_t * acc_it = acceptables; acc_it != acc_end; acc_it++ ) {
     660                for( __acceptable_t * acc_it = acceptables; acc_it != acc_end; acc_it++, i++ ) {
    651661                        // Check if we have a match
    652                         if( match( acc_it, *thrd_it ) ) {
     662                        if( acc_it->monitors == (*thrd_it)->monitors ) {
    653663
    654664                                // If we have a match return it
    655665                                // after removeing it from the entry queue
    656                                 return remove( entry_queue, thrd_it );
     666                                return [remove( entry_queue, thrd_it ), i];
    657667                        }
    658668                }
    659669        }
    660670
    661         return NULL;
    662 }
     671        return [0, -1];
     672}
     673
     674static inline short count_max( short acc_count, __acceptable_t * acceptables ) {
     675        short max = 0;
     676        for( int i = 0; i < acc_count; i++ ) {
     677                max += acceptables[i].monitors.size;
     678        }
     679        return max;
     680}
     681
     682static inline short aggregate( monitor_desc ** storage, short count, __acceptable_t * acceptables ) {
     683        #warning function not implemented
     684        return 0;
     685}
     686
     687static inline void set_mask( monitor_desc ** storage, short count, __acceptable_t * acceptables, short acc_count ) {
     688        for(int i = 0; i < count; i++) {
     689                storage[i]->acceptables      = acceptables;
     690                storage[i]->acceptable_count = acc_count;
     691                storage[i]->accepted_index   = -1;
     692        }
     693}
     694
     695
    663696void ?{}( __condition_blocked_queue_t & this ) {
    664697        this.head = NULL;
  • src/libcfa/concurrency/thread

    rf980549 rb18830e  
    3636forall( dtype T | is_thread(T) )
    3737static inline coroutine_desc* get_coroutine(T & this) {
    38         return &get_thread(this)->cor;
     38        return &get_thread(this)->self_cor;
    3939}
    4040
    4141forall( dtype T | is_thread(T) )
    4242static inline monitor_desc* get_monitor(T & this) {
    43         return &get_thread(this)->mon;
     43        return &get_thread(this)->self_mon;
    4444}
    4545
    4646static inline coroutine_desc* get_coroutine(thread_desc * this) {
    47         return &this->cor;
     47        return &this->self_cor;
    4848}
    4949
    5050static inline monitor_desc* get_monitor(thread_desc * this) {
    51         return &this->mon;
     51        return &this->self_mon;
    5252}
    5353
  • src/libcfa/concurrency/thread.c

    rf980549 rb18830e  
    3333
    3434void ?{}(thread_desc& this) {
    35         (this.cor){};
    36         this.cor.name = "Anonymous Coroutine";
    37         this.mon.owner = &this;
    38         this.mon.recursion = 1;
     35        (this.self_cor){};
     36        this.self_cor.name = "Anonymous Coroutine";
     37        this.self_mon.owner = &this;
     38        this.self_mon.recursion = 1;
     39        this.self_mon_p = &this.self_mon;
    3940        this.next = NULL;
    4041
    41         this.current_monitors      = &this.mon;
    42         this.current_monitor_count = 1;
     42        (this.monitors){ &this.self_mon_p, 1, (fptr_t)0 };
    4343}
    4444
    4545void ^?{}(thread_desc& this) {
    46         ^(this.cor){};
     46        ^(this.self_cor){};
    4747}
    4848
Note: See TracChangeset for help on using the changeset viewer.