Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • src/libcfa/concurrency/monitor.c

    r9c59cd4 rcd348e7  
    2020#include "libhdr.h"
    2121
    22 //-----------------------------------------------------------------------------
    23 // Forward declarations
    24 static inline void set_owner( monitor_desc * this, thread_desc * owner );
    25 static inline thread_desc * next_thread( monitor_desc * this );
    26 
    27 static inline void lock_all( spinlock ** locks, unsigned short count );
    28 static inline void lock_all( monitor_desc ** source, spinlock ** /*out*/ locks, unsigned short count );
    29 static inline void unlock_all( spinlock ** locks, unsigned short count );
    30 static inline void unlock_all( monitor_desc ** locks, unsigned short count );
    31 
    32 static inline void save_recursion   ( monitor_desc ** ctx, unsigned int * /*out*/ recursions, unsigned short count );
    33 static inline void restore_recursion( monitor_desc ** ctx, unsigned int * /*in */ recursions, unsigned short count );
    34 
    35 static inline thread_desc * check_condition( __condition_criterion_t * );
    36 static inline void brand_condition( condition * );
    37 static inline unsigned short insert_unique( thread_desc ** thrds, unsigned short end, thread_desc * val );
    38 
    39 //-----------------------------------------------------------------------------
    40 // Enter/Leave routines
    41 
     22void set_owner( monitor_desc * this, thread_desc * owner ) {
     23        //Pass the monitor appropriately
     24        this->owner = owner;
     25
     26        //We are passing the monitor to someone else, which means recursion level is not 0
     27        this->recursion = owner ? 1 : 0;
     28}
    4229
    4330extern "C" {
    44         void __enter_monitor_desc(monitor_desc * this) {
     31        void __enter_monitor_desc(monitor_desc * this, monitor_desc * leader) {
    4532                lock( &this->lock );
    4633                thread_desc * thrd = this_thread();
    4734
    48                 LIB_DEBUG_PRINT_SAFE("%p Entering %p (o: %p, r: %i)\n", thrd, this, this->owner, this->recursion);
     35                // //Update the stack owner
     36                // this->stack_owner = leader;
     37
     38                LIB_DEBUG_PRINT_SAFE("Entering %p (o: %p, r: %i)\n", this, this->owner, this->recursion);
    4939
    5040                if( !this->owner ) {
     
    7161
    7262        // leave pseudo code :
    73         //      TODO
    74         void __leave_monitor_desc(monitor_desc * this) {
     63        //      decrement level
     64        //      leve == 0 ?
     65        //              no : done
     66        //              yes :
     67        //                      signal stack empty ?
     68        //                              has leader :
     69        //                                      bulk acquiring means we don't own the signal stack
     70        //                                      ignore it but don't release the monitor
     71        //                              yes :
     72        //                                      next in entry queue is new owner
     73        //                              no :
     74        //                                      top of the signal stack is the owner
     75        //                                      context switch to him right away
     76        //
     77        void __leave_monitor_desc(monitor_desc * this, monitor_desc * leader) {
    7578                lock( &this->lock );
    7679
     80                LIB_DEBUG_PRINT_SAFE("Leaving %p (o: %p, r: %i)\n", this, this->owner, this->recursion);
     81
    7782                thread_desc * thrd = this_thread();
    78 
    79                 LIB_DEBUG_PRINT_SAFE("%p Leaving %p (o: %p, r: %i)\n", thrd, this, this->owner, this->recursion);
    80                 assertf( thrd == this->owner, "Expected owner to be %p, got %p (r: %i)", thrd, this->owner, this->recursion );
     83                assertf( thrd == this->owner, "Expected owner to be %p, got %p (r: %i)", this->owner, thrd, this->recursion );
    8184
    8285                //Leaving a recursion level, decrement the counter
     
    8689                //it means we don't need to do anything
    8790                if( this->recursion != 0) {
     91                        // this->stack_owner = leader;
    8892                        unlock( &this->lock );
    8993                        return;
    9094                }
    91 
    92                 thread_desc * new_owner = next_thread( this );
     95                       
     96                // //If we don't own the signal stack then just leave it to the owner
     97                // if( this->stack_owner ) {
     98                //      this->stack_owner = leader;
     99                //      unlock( &this->lock );
     100                //      return;
     101                // }
     102
     103                //We are the stack owner and have left the last recursion level.
     104                //We are in charge of passing the monitor
     105                thread_desc * new_owner = 0;
     106
     107                //Check the signaller stack
     108                new_owner = pop( &this->signal_stack );
     109                if( new_owner ) {
     110                        //The signaller stack is not empty,
     111                        //transfer control immediately
     112                        set_owner( this, new_owner );
     113                        // this->stack_owner = leader;
     114                        ScheduleInternal( &this->lock, new_owner );
     115                        return;
     116                }
     117               
     118                // No signaller thread
     119                // Get the next thread in the entry_queue
     120                new_owner = pop_head( &this->entry_queue );
     121                set_owner( this, new_owner );
     122
     123                // //Update the stack owner
     124                // this->stack_owner = leader;
    93125
    94126                //We can now let other threads in safely
     
    101133
    102134static inline void enter(monitor_desc ** monitors, int count) {
    103         for(int i = 0; i < count; i++) {
    104                 __enter_monitor_desc( monitors[i] );
     135        __enter_monitor_desc( monitors[0], NULL );
     136        for(int i = 1; i < count; i++) {
     137                __enter_monitor_desc( monitors[i], monitors[0] );
    105138        }
    106139}
    107140
    108141static inline void leave(monitor_desc ** monitors, int count) {
    109         for(int i = count - 1; i >= 0; i--) {
    110                 __leave_monitor_desc( monitors[i] );
     142        __leave_monitor_desc( monitors[0], NULL );
     143        for(int i = count - 1; i >= 1; i--) {
     144                __leave_monitor_desc( monitors[i], monitors[0] );
    111145        }
    112146}
     
    135169// Internal scheduling
    136170void wait( condition * this ) {
    137         LIB_DEBUG_PRINT_SAFE("Waiting\n");
    138 
    139         brand_condition( this );
     171        assertf(false, "NO SUPPORTED");
     172        // LIB_DEBUG_FPRINTF("Waiting\n");
     173        thread_desc * this_thrd = this_thread();
     174
     175        if( !this->monitors ) {
     176                this->monitors = this_thrd->current_monitors;
     177                this->monitor_count = this_thrd->current_monitor_count;
     178        }
     179
     180        unsigned short count = this->monitor_count;
    140181
    141182        //Check that everything is as expected
    142         assertf( this->monitors != NULL, "Waiting with no monitors (%p)", this->monitors );
    143         assertf( this->monitor_count != 0, "Waiting with 0 monitors (%i)", this->monitor_count );
    144 
    145         unsigned short count = this->monitor_count;
     183        assert( this->monitors != NULL );
     184        assert( this->monitor_count != 0 );
     185
    146186        unsigned int recursions[ count ];               //Save the current recursion levels to restore them later
    147187        spinlock *   locks     [ count ];               //We need to pass-in an array of locks to ScheduleInternal
    148188
    149         LIB_DEBUG_PRINT_SAFE("count %i\n", count);
    150 
    151         __condition_node_t waiter;
    152         waiter.waiting_thread = this_thread();
    153         waiter.count = count;
    154         waiter.next = NULL;
    155 
    156         __condition_criterion_t criteria[count];
    157         for(int i = 0; i < count; i++) {
    158                 criteria[i].ready  = false;
    159                 criteria[i].target = this->monitors[i];
    160                 criteria[i].owner  = &waiter;
    161                 criteria[i].next   = NULL;
    162                 LIB_DEBUG_PRINT_SAFE( "Criterion %p\n", &criteria[i] );
    163         }
    164 
    165         waiter.criteria = criteria;
    166         append( &this->blocked, &waiter );
    167 
    168         lock_all( this->monitors, locks, count );
    169         save_recursion( this->monitors, recursions, count );
    170         //DON'T unlock, ask the kernel to do it
    171 
    172         //Find the next thread(s) to run
    173         unsigned short thread_count = count;
    174         thread_desc * threads[ count ];
    175 
    176         for( int i = 0; i < count; i++) {
    177                 thread_desc * new_owner = next_thread( this->monitors[i] );
    178                 thread_count = insert_unique( threads, i, new_owner );
    179         }
    180 
    181         LIB_DEBUG_PRINT_SAFE("Will unblock: ");
    182         for(int i = 0; i < thread_count; i++) {
    183                 LIB_DEBUG_PRINT_SAFE("%p ", threads[i]);
    184         }
    185         LIB_DEBUG_PRINT_SAFE("\n");
    186 
    187         // Everything is ready to go to sleep
    188         ScheduleInternal( locks, count, threads, thread_count );
     189        // LIB_DEBUG_FPRINTF("Getting ready to wait\n");
     190
     191        //Loop on all the monitors and release the owner
     192        for( unsigned int i = 0; i < count; i++ ) {
     193                monitor_desc * cur = this->monitors[i];
     194
     195                assert( cur );
     196
     197                // LIB_DEBUG_FPRINTF("cur %p lock %p\n", cur, &cur->lock);
     198
     199                //Store the locks for later
     200                locks[i] = &cur->lock;
     201
     202                //Protect the monitors
     203                lock( locks[i] );
     204                {               
     205                        //Save the recursion levels
     206                        recursions[i] = cur->recursion;
     207
     208                        //Release the owner
     209                        cur->recursion = 0;
     210                        cur->owner = NULL;
     211                }
     212                //Release the monitor
     213                unlock( locks[i] );
     214        }
     215
     216        // LIB_DEBUG_FPRINTF("Waiting now\n");
     217
     218        //Everything is ready to go to sleep
     219        ScheduleInternal( locks, count );
    189220
    190221
     
    193224
    194225        //We are back, restore the owners and recursions
    195         lock_all( locks, count );
    196         restore_recursion( this->monitors, recursions, count );
    197         unlock_all( locks, count );
    198 }
    199 
    200 void signal( condition * this ) {
    201         if( !this->blocked.head ) {
    202                 LIB_DEBUG_PRINT_SAFE("Nothing to signal\n");
    203                 return;
    204         }
     226        for( unsigned int i = 0; i < count; i++ ) {
     227                monitor_desc * cur = this->monitors[i];
     228
     229                //Protect the monitors
     230                lock( locks[i] );
     231                {
     232                        //Release the owner
     233                        cur->owner = this_thrd;
     234                        cur->recursion = recursions[i];
     235                }
     236                //Release the monitor
     237                unlock( locks[i] );
     238        }
     239}
     240
     241static void __signal_internal( condition * this ) {
     242        assertf(false, "NO SUPPORTED");
     243        if( !this->blocked.head ) return;
    205244
    206245        //Check that everything is as expected
    207246        assert( this->monitors );
    208247        assert( this->monitor_count != 0 );
    209 
    210         unsigned short count = this->monitor_count;
    211248       
    212249        LIB_DEBUG_DO(
    213                 thread_desc * this_thrd = this_thread();
    214                 if ( this->monitor_count != this_thrd->current_monitor_count ) {
    215                         abortf( "Signal on condition %p made with different number of monitor(s), expected %i got %i", this, this->monitor_count, this_thrd->current_monitor_count );
     250                if ( this->monitors != this_thread()->current_monitors ) {
     251                        abortf( "Signal on condition %p made outside of the correct monitor(s)", this );
    216252                } // if
    217 
    218                 for(int i = 0; i < this->monitor_count; i++) {
    219                         if ( this->monitors[i] != this_thrd->current_monitors[i] ) {
    220                                 abortf( "Signal on condition %p made with different monitor, expected %p got %i", this, this->monitors[i], this_thrd->current_monitors[i] );
    221                         } // if
    222                 }
    223253        );
    224254
    225         lock_all( this->monitors, NULL, count );
    226         LIB_DEBUG_PRINT_SAFE("Signalling");
    227 
    228         __condition_node_t * node = pop_head( &this->blocked );
    229         for(int i = 0; i < count; i++) {
    230                 __condition_criterion_t * crit = &node->criteria[i];
    231                 LIB_DEBUG_PRINT_SAFE(" %p", crit->target);
    232                 assert( !crit->ready );
    233                 push( &crit->target->signal_stack, crit );
    234         }
    235 
    236         LIB_DEBUG_PRINT_SAFE("\n");
    237 
    238         unlock_all( this->monitors, count );
    239 }
    240 
    241 //-----------------------------------------------------------------------------
    242 // Utilities
    243 
    244 static inline void set_owner( monitor_desc * this, thread_desc * owner ) {
    245         //Pass the monitor appropriately
    246         this->owner = owner;
    247 
    248         //We are passing the monitor to someone else, which means recursion level is not 0
    249         this->recursion = owner ? 1 : 0;
    250 }
    251 
    252 static inline thread_desc * next_thread( monitor_desc * this ) {
    253         //Check the signaller stack
    254         __condition_criterion_t * urgent = pop( &this->signal_stack );
    255         if( urgent ) {
    256                 //The signaller stack is not empty,
    257                 //regardless of if we are ready to baton pass,
    258                 //we need to set the monitor as in use
    259                 set_owner( this,  urgent->owner->waiting_thread );
    260 
    261                 return check_condition( urgent );
    262         }
    263 
    264         // No signaller thread
    265         // Get the next thread in the entry_queue
    266         thread_desc * new_owner = pop_head( &this->entry_queue );
    267         set_owner( this, new_owner );
    268 
    269         return new_owner;
    270 }
    271 
    272 static inline void lock_all( spinlock ** locks, unsigned short count ) {
    273         for( int i = 0; i < count; i++ ) {
    274                 lock( locks[i] );
    275         }
    276 }
    277 
    278 static inline void lock_all( monitor_desc ** source, spinlock ** /*out*/ locks, unsigned short count ) {
    279         for( int i = 0; i < count; i++ ) {
    280                 spinlock * l = &source[i]->lock;
    281                 lock( l );
    282                 if(locks) locks[i] = l;
    283         }
    284 }
    285 
    286 static inline void unlock_all( spinlock ** locks, unsigned short count ) {
    287         for( int i = 0; i < count; i++ ) {
    288                 unlock( locks[i] );
    289         }
    290 }
    291 
    292 static inline void unlock_all( monitor_desc ** locks, unsigned short count ) {
    293         for( int i = 0; i < count; i++ ) {
    294                 unlock( &locks[i]->lock );
    295         }
    296 }
    297 
    298 
    299 static inline void save_recursion   ( monitor_desc ** ctx, unsigned int * /*out*/ recursions, unsigned short count ) {
    300         for( int i = 0; i < count; i++ ) {
    301                 recursions[i] = ctx[i]->recursion;
    302         }
    303 }
    304 
    305 static inline void restore_recursion( monitor_desc ** ctx, unsigned int * /*in */ recursions, unsigned short count ) {
    306         for( int i = 0; i < count; i++ ) {
    307                 ctx[i]->recursion = recursions[i];
    308         }
    309 }
    310 
    311 // Function has 2 different behavior
    312 // 1 - Marks a monitors as being ready to run
    313 // 2 - Checks if all the monitors are ready to run
    314 //     if so return the thread to run
    315 static inline thread_desc * check_condition( __condition_criterion_t * target ) {
    316         __condition_node_t * node = target->owner;
    317         unsigned short count = node->count;
    318         __condition_criterion_t * criteria = node->criteria;
    319 
    320         bool ready2run = true;
    321 
    322         for(    int i = 0; i < count; i++ ) {
    323                 LIB_DEBUG_PRINT_SAFE( "Checking %p for %p\n", &criteria[i], target );
    324                 if( &criteria[i] == target ) {
    325                         criteria[i].ready = true;
    326                         LIB_DEBUG_PRINT_SAFE( "True\n" );
    327                 }
    328 
    329                 ready2run = criteria[i].ready && ready2run;
    330         }
    331 
    332         LIB_DEBUG_PRINT_SAFE( "Runing %i\n", ready2run );
    333         return ready2run ? node->waiting_thread : NULL;
    334 }
    335 
    336 static inline void brand_condition( condition * this ) {
    337         thread_desc * thrd = this_thread();
    338         if( !this->monitors ) {
    339                 LIB_DEBUG_PRINT_SAFE("Branding\n");
    340                 assertf( thrd->current_monitors != NULL, "No current monitor to brand condition", thrd->current_monitors );
    341                 this->monitors = thrd->current_monitors;
    342                 this->monitor_count = thrd->current_monitor_count;
    343         }
    344 }
    345 
    346 static inline unsigned short insert_unique( thread_desc ** thrds, unsigned short end, thread_desc * val ) {
    347         for(int i = 0; i < end; i++) {
    348                 if( thrds[i] == val ) return end;
    349         }
    350 
    351         thrds[end] = val;
    352         return end + 1;
    353 }
    354 
    355 void ?{}( __condition_blocked_queue_t * this ) {
    356         this->head = NULL;
    357         this->tail = &this->head;
    358 }
    359 
    360 void append( __condition_blocked_queue_t * this, __condition_node_t * c ) {
    361         assert(this->tail != NULL);
    362         *this->tail = c;
    363         this->tail = &c->next;
    364 }
    365 
    366 __condition_node_t * pop_head( __condition_blocked_queue_t * this ) {
    367         __condition_node_t * head = this->head;
    368         if( head ) {
    369                 this->head = head->next;
    370                 if( !head->next ) {
    371                         this->tail = &this->head;
    372                 }
    373                 head->next = NULL;
    374         }
    375         return head;
    376 }
     255        monitor_desc * owner = this->monitors[0];
     256        lock( &owner->lock );
     257        {
     258                thread_desc * unblock = pop_head( &this->blocked );
     259                push( &owner->signal_stack, unblock );
     260        }
     261        unlock( &owner->lock );
     262}
     263
     264void signal( condition * this ) {
     265        __signal_internal( this );
     266}
Note: See TracChangeset for help on using the changeset viewer.