Ignore:
Timestamp:
May 1, 2017, 1:40:13 PM (7 years ago)
Author:
Rob Schluntz <rschlunt@…>
Branches:
ADT, aaron-thesis, arm-eh, ast-experimental, cleanup-dtors, deferred_resn, demangler, enum, forall-pointer-decay, jacob/cs343-translation, jenkins-sandbox, master, new-ast, new-ast-unique-expr, new-env, no_list, persistent-indexer, pthread-emulation, qualifiedEnum, resolv-new, with_gc
Children:
5544465, ed8a0d2
Parents:
12d3187 (diff), 13e2c54 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:/u/cforall/software/cfa/cfa-cc

File:
1 edited

Legend:

Unmodified
Added
Removed
  • src/libcfa/concurrency/monitor.c

    r12d3187 r2055098  
    2020#include "libhdr.h"
    2121
    22 void set_owner( monitor_desc * this, thread_desc * owner ) {
    23         //Pass the monitor appropriately
    24         this->owner = owner;
    25 
    26         //We are passing the monitor to someone else, which means recursion level is not 0
    27         this->recursion = owner ? 1 : 0;
    28 }
     22//-----------------------------------------------------------------------------
     23// Forward declarations
     24static inline void set_owner( monitor_desc * this, thread_desc * owner );
     25static inline thread_desc * next_thread( monitor_desc * this );
     26
     27static inline void lock_all( spinlock ** locks, unsigned short count );
     28static inline void lock_all( monitor_desc ** source, spinlock ** /*out*/ locks, unsigned short count );
     29static inline void unlock_all( spinlock ** locks, unsigned short count );
     30static inline void unlock_all( monitor_desc ** locks, unsigned short count );
     31
     32static inline void save_recursion   ( monitor_desc ** ctx, unsigned int * /*out*/ recursions, unsigned short count );
     33static inline void restore_recursion( monitor_desc ** ctx, unsigned int * /*in */ recursions, unsigned short count );
     34
     35static inline thread_desc * check_condition( __condition_criterion_t * );
     36static inline void brand_condition( condition * );
     37static inline unsigned short insert_unique( thread_desc ** thrds, unsigned short end, thread_desc * val );
     38
     39//-----------------------------------------------------------------------------
     40// Enter/Leave routines
     41
    2942
    3043extern "C" {
    31         void __enter_monitor_desc(monitor_desc * this, monitor_desc * leader) {
     44        void __enter_monitor_desc(monitor_desc * this) {
    3245                lock( &this->lock );
    3346                thread_desc * thrd = this_thread();
    3447
    35                 // //Update the stack owner
    36                 // this->stack_owner = leader;
    37 
    38                 LIB_DEBUG_PRINT_SAFE("Entering %p (o: %p, r: %i)\n", this, this->owner, this->recursion);
     48                LIB_DEBUG_PRINT_SAFE("%p Entering %p (o: %p, r: %i)\n", thrd, this, this->owner, this->recursion);
    3949
    4050                if( !this->owner ) {
     
    6171
    6272        // leave pseudo code :
    63         //      decrement level
    64         //      leve == 0 ?
    65         //              no : done
    66         //              yes :
    67         //                      signal stack empty ?
    68         //                              has leader :
    69         //                                      bulk acquiring means we don't own the signal stack
    70         //                                      ignore it but don't release the monitor
    71         //                              yes :
    72         //                                      next in entry queue is new owner
    73         //                              no :
    74         //                                      top of the signal stack is the owner
    75         //                                      context switch to him right away
    76         //
    77         void __leave_monitor_desc(monitor_desc * this, monitor_desc * leader) {
     73        //      TODO
     74        void __leave_monitor_desc(monitor_desc * this) {
    7875                lock( &this->lock );
    7976
    80                 LIB_DEBUG_PRINT_SAFE("Leaving %p (o: %p, r: %i)\n", this, this->owner, this->recursion);
    81 
    8277                thread_desc * thrd = this_thread();
    83                 assertf( thrd == this->owner, "Expected owner to be %p, got %p (r: %i)", this->owner, thrd, this->recursion );
     78
     79                LIB_DEBUG_PRINT_SAFE("%p Leaving %p (o: %p, r: %i)\n", thrd, this, this->owner, this->recursion);
     80                assertf( thrd == this->owner, "Expected owner to be %p, got %p (r: %i)", thrd, this->owner, this->recursion );
    8481
    8582                //Leaving a recursion level, decrement the counter
     
    8986                //it means we don't need to do anything
    9087                if( this->recursion != 0) {
    91                         // this->stack_owner = leader;
    9288                        unlock( &this->lock );
    9389                        return;
    9490                }
    95                        
    96                 // //If we don't own the signal stack then just leave it to the owner
    97                 // if( this->stack_owner ) {
    98                 //      this->stack_owner = leader;
    99                 //      unlock( &this->lock );
    100                 //      return;
    101                 // }
    102 
    103                 //We are the stack owner and have left the last recursion level.
    104                 //We are in charge of passing the monitor
    105                 thread_desc * new_owner = 0;
    106 
    107                 //Check the signaller stack
    108                 new_owner = pop( &this->signal_stack );
    109                 if( new_owner ) {
    110                         //The signaller stack is not empty,
    111                         //transfer control immediately
    112                         set_owner( this, new_owner );
    113                         // this->stack_owner = leader;
    114                         ScheduleInternal( &this->lock, new_owner );
    115                         return;
    116                 }
    117                
    118                 // No signaller thread
    119                 // Get the next thread in the entry_queue
    120                 new_owner = pop_head( &this->entry_queue );
    121                 set_owner( this, new_owner );
    122 
    123                 // //Update the stack owner
    124                 // this->stack_owner = leader;
     91
     92                thread_desc * new_owner = next_thread( this );
    12593
    12694                //We can now let other threads in safely
     
    133101
    134102static inline void enter(monitor_desc ** monitors, int count) {
    135         __enter_monitor_desc( monitors[0], NULL );
    136         for(int i = 1; i < count; i++) {
    137                 __enter_monitor_desc( monitors[i], monitors[0] );
     103        for(int i = 0; i < count; i++) {
     104                __enter_monitor_desc( monitors[i] );
    138105        }
    139106}
    140107
    141108static inline void leave(monitor_desc ** monitors, int count) {
    142         __leave_monitor_desc( monitors[0], NULL );
    143         for(int i = count - 1; i >= 1; i--) {
    144                 __leave_monitor_desc( monitors[i], monitors[0] );
     109        for(int i = count - 1; i >= 0; i--) {
     110                __leave_monitor_desc( monitors[i] );
    145111        }
    146112}
     
    169135// Internal scheduling
    170136void wait( condition * this ) {
    171         assertf(false, "NO SUPPORTED");
    172         // LIB_DEBUG_FPRINTF("Waiting\n");
    173         thread_desc * this_thrd = this_thread();
    174 
    175         if( !this->monitors ) {
    176                 this->monitors = this_thrd->current_monitors;
    177                 this->monitor_count = this_thrd->current_monitor_count;
    178         }
     137        LIB_DEBUG_PRINT_SAFE("Waiting\n");
     138
     139        brand_condition( this );
     140
     141        //Check that everything is as expected
     142        assertf( this->monitors != NULL, "Waiting with no monitors (%p)", this->monitors );
     143        assertf( this->monitor_count != 0, "Waiting with 0 monitors (%i)", this->monitor_count );
    179144
    180145        unsigned short count = this->monitor_count;
    181 
    182         //Check that everything is as expected
    183         assert( this->monitors != NULL );
    184         assert( this->monitor_count != 0 );
    185 
    186146        unsigned int recursions[ count ];               //Save the current recursion levels to restore them later
    187147        spinlock *   locks     [ count ];               //We need to pass-in an array of locks to ScheduleInternal
    188148
    189         // LIB_DEBUG_FPRINTF("Getting ready to wait\n");
    190 
    191         //Loop on all the monitors and release the owner
    192         for( unsigned int i = 0; i < count; i++ ) {
    193                 monitor_desc * cur = this->monitors[i];
    194 
    195                 assert( cur );
    196 
    197                 // LIB_DEBUG_FPRINTF("cur %p lock %p\n", cur, &cur->lock);
    198 
    199                 //Store the locks for later
    200                 locks[i] = &cur->lock;
    201 
    202                 //Protect the monitors
    203                 lock( locks[i] );
    204                 {               
    205                         //Save the recursion levels
    206                         recursions[i] = cur->recursion;
    207 
    208                         //Release the owner
    209                         cur->recursion = 0;
    210                         cur->owner = NULL;
    211                 }
    212                 //Release the monitor
    213                 unlock( locks[i] );
    214         }
    215 
    216         // LIB_DEBUG_FPRINTF("Waiting now\n");
    217 
    218         //Everything is ready to go to sleep
    219         ScheduleInternal( locks, count );
     149        LIB_DEBUG_PRINT_SAFE("count %i\n", count);
     150
     151        __condition_node_t waiter;
     152        waiter.waiting_thread = this_thread();
     153        waiter.count = count;
     154        waiter.next = NULL;
     155
     156        __condition_criterion_t criteria[count];
     157        for(int i = 0; i < count; i++) {
     158                criteria[i].ready  = false;
     159                criteria[i].target = this->monitors[i];
     160                criteria[i].owner  = &waiter;
     161                criteria[i].next   = NULL;
     162                LIB_DEBUG_PRINT_SAFE( "Criterion %p\n", &criteria[i] );
     163        }
     164
     165        waiter.criteria = criteria;
     166        append( &this->blocked, &waiter );
     167
     168        lock_all( this->monitors, locks, count );
     169        save_recursion( this->monitors, recursions, count );
     170        //DON'T unlock, ask the kernel to do it
     171
     172        //Find the next thread(s) to run
     173        unsigned short thread_count = count;
     174        thread_desc * threads[ count ];
     175
     176        for( int i = 0; i < count; i++) {
     177                thread_desc * new_owner = next_thread( this->monitors[i] );
     178                thread_count = insert_unique( threads, i, new_owner );
     179        }
     180
     181        LIB_DEBUG_PRINT_SAFE("Will unblock: ");
     182        for(int i = 0; i < thread_count; i++) {
     183                LIB_DEBUG_PRINT_SAFE("%p ", threads[i]);
     184        }
     185        LIB_DEBUG_PRINT_SAFE("\n");
     186
     187        // Everything is ready to go to sleep
     188        ScheduleInternal( locks, count, threads, thread_count );
    220189
    221190
     
    224193
    225194        //We are back, restore the owners and recursions
    226         for( unsigned int i = 0; i < count; i++ ) {
    227                 monitor_desc * cur = this->monitors[i];
    228 
    229                 //Protect the monitors
    230                 lock( locks[i] );
    231                 {
    232                         //Release the owner
    233                         cur->owner = this_thrd;
    234                         cur->recursion = recursions[i];
    235                 }
    236                 //Release the monitor
    237                 unlock( locks[i] );
    238         }
    239 }
    240 
    241 static void __signal_internal( condition * this ) {
    242         assertf(false, "NO SUPPORTED");
    243         if( !this->blocked.head ) return;
     195        lock_all( locks, count );
     196        restore_recursion( this->monitors, recursions, count );
     197        unlock_all( locks, count );
     198}
     199
     200void signal( condition * this ) {
     201        if( !this->blocked.head ) {
     202                LIB_DEBUG_PRINT_SAFE("Nothing to signal\n");
     203                return;
     204        }
    244205
    245206        //Check that everything is as expected
    246207        assert( this->monitors );
    247208        assert( this->monitor_count != 0 );
     209
     210        unsigned short count = this->monitor_count;
    248211       
    249212        LIB_DEBUG_DO(
    250                 if ( this->monitors != this_thread()->current_monitors ) {
    251                         abortf( "Signal on condition %p made outside of the correct monitor(s)", this );
     213                thread_desc * this_thrd = this_thread();
     214                if ( this->monitor_count != this_thrd->current_monitor_count ) {
     215                        abortf( "Signal on condition %p made with different number of monitor(s), expected %i got %i", this, this->monitor_count, this_thrd->current_monitor_count );
    252216                } // if
     217
     218                for(int i = 0; i < this->monitor_count; i++) {
     219                        if ( this->monitors[i] != this_thrd->current_monitors[i] ) {
     220                                abortf( "Signal on condition %p made with different monitor, expected %p got %i", this, this->monitors[i], this_thrd->current_monitors[i] );
     221                        } // if
     222                }
    253223        );
    254224
    255         monitor_desc * owner = this->monitors[0];
    256         lock( &owner->lock );
    257         {
    258                 thread_desc * unblock = pop_head( &this->blocked );
    259                 push( &owner->signal_stack, unblock );
    260         }
    261         unlock( &owner->lock );
    262 }
    263 
    264 void signal( condition * this ) {
    265         __signal_internal( this );
    266 }
     225        lock_all( this->monitors, NULL, count );
     226        LIB_DEBUG_PRINT_SAFE("Signalling");
     227
     228        __condition_node_t * node = pop_head( &this->blocked );
     229        for(int i = 0; i < count; i++) {
     230                __condition_criterion_t * crit = &node->criteria[i];
     231                LIB_DEBUG_PRINT_SAFE(" %p", crit->target);
     232                assert( !crit->ready );
     233                push( &crit->target->signal_stack, crit );
     234        }
     235
     236        LIB_DEBUG_PRINT_SAFE("\n");
     237
     238        unlock_all( this->monitors, count );
     239}
     240
     241//-----------------------------------------------------------------------------
     242// Utilities
     243
     244static inline void set_owner( monitor_desc * this, thread_desc * owner ) {
     245        //Pass the monitor appropriately
     246        this->owner = owner;
     247
     248        //We are passing the monitor to someone else, which means recursion level is not 0
     249        this->recursion = owner ? 1 : 0;
     250}
     251
     252static inline thread_desc * next_thread( monitor_desc * this ) {
     253        //Check the signaller stack
     254        __condition_criterion_t * urgent = pop( &this->signal_stack );
     255        if( urgent ) {
     256                //The signaller stack is not empty,
     257                //regardless of if we are ready to baton pass,
     258                //we need to set the monitor as in use
     259                set_owner( this,  urgent->owner->waiting_thread );
     260
     261                return check_condition( urgent );
     262        }
     263
     264        // No signaller thread
     265        // Get the next thread in the entry_queue
     266        thread_desc * new_owner = pop_head( &this->entry_queue );
     267        set_owner( this, new_owner );
     268
     269        return new_owner;
     270}
     271
     272static inline void lock_all( spinlock ** locks, unsigned short count ) {
     273        for( int i = 0; i < count; i++ ) {
     274                lock( locks[i] );
     275        }
     276}
     277
     278static inline void lock_all( monitor_desc ** source, spinlock ** /*out*/ locks, unsigned short count ) {
     279        for( int i = 0; i < count; i++ ) {
     280                spinlock * l = &source[i]->lock;
     281                lock( l );
     282                if(locks) locks[i] = l;
     283        }
     284}
     285
     286static inline void unlock_all( spinlock ** locks, unsigned short count ) {
     287        for( int i = 0; i < count; i++ ) {
     288                unlock( locks[i] );
     289        }
     290}
     291
     292static inline void unlock_all( monitor_desc ** locks, unsigned short count ) {
     293        for( int i = 0; i < count; i++ ) {
     294                unlock( &locks[i]->lock );
     295        }
     296}
     297
     298
     299static inline void save_recursion   ( monitor_desc ** ctx, unsigned int * /*out*/ recursions, unsigned short count ) {
     300        for( int i = 0; i < count; i++ ) {
     301                recursions[i] = ctx[i]->recursion;
     302        }
     303}
     304
     305static inline void restore_recursion( monitor_desc ** ctx, unsigned int * /*in */ recursions, unsigned short count ) {
     306        for( int i = 0; i < count; i++ ) {
     307                ctx[i]->recursion = recursions[i];
     308        }
     309}
     310
     311// Function has 2 different behavior
     312// 1 - Marks a monitors as being ready to run
     313// 2 - Checks if all the monitors are ready to run
     314//     if so return the thread to run
     315static inline thread_desc * check_condition( __condition_criterion_t * target ) {
     316        __condition_node_t * node = target->owner;
     317        unsigned short count = node->count;
     318        __condition_criterion_t * criteria = node->criteria;
     319
     320        bool ready2run = true;
     321
     322        for(    int i = 0; i < count; i++ ) {
     323                LIB_DEBUG_PRINT_SAFE( "Checking %p for %p\n", &criteria[i], target );
     324                if( &criteria[i] == target ) {
     325                        criteria[i].ready = true;
     326                        LIB_DEBUG_PRINT_SAFE( "True\n" );
     327                }
     328
     329                ready2run = criteria[i].ready && ready2run;
     330        }
     331
     332        LIB_DEBUG_PRINT_SAFE( "Runing %i\n", ready2run );
     333        return ready2run ? node->waiting_thread : NULL;
     334}
     335
     336static inline void brand_condition( condition * this ) {
     337        thread_desc * thrd = this_thread();
     338        if( !this->monitors ) {
     339                LIB_DEBUG_PRINT_SAFE("Branding\n");
     340                assertf( thrd->current_monitors != NULL, "No current monitor to brand condition", thrd->current_monitors );
     341                this->monitors = thrd->current_monitors;
     342                this->monitor_count = thrd->current_monitor_count;
     343        }
     344}
     345
     346static inline unsigned short insert_unique( thread_desc ** thrds, unsigned short end, thread_desc * val ) {
     347        for(int i = 0; i < end; i++) {
     348                if( thrds[i] == val ) return end;
     349        }
     350
     351        thrds[end] = val;
     352        return end + 1;
     353}
     354
     355void ?{}( __condition_blocked_queue_t * this ) {
     356        this->head = NULL;
     357        this->tail = &this->head;
     358}
     359
     360void append( __condition_blocked_queue_t * this, __condition_node_t * c ) {
     361        assert(this->tail != NULL);
     362        *this->tail = c;
     363        this->tail = &c->next;
     364}
     365
     366__condition_node_t * pop_head( __condition_blocked_queue_t * this ) {
     367        __condition_node_t * head = this->head;
     368        if( head ) {
     369                this->head = head->next;
     370                if( !head->next ) {
     371                        this->tail = &this->head;
     372                }
     373                head->next = NULL;
     374        }
     375        return head;
     376}
Note: See TracChangeset for help on using the changeset viewer.