Ignore:
Timestamp:
Jun 8, 2022, 4:23:08 PM (2 years ago)
Author:
caparsons <caparson@…>
Branches:
ADT, ast-experimental, master, pthread-emulation, qualifiedEnum
Children:
55422cf
Parents:
ced5e2a
Message:

added pthread_cond_var

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/locks.cfa

    rced5e2a rae06e0b  
    178178
    179179//-----------------------------------------------------------------------------
    180 // simple_owner_lock
    181 
    182 static inline void lock(simple_owner_lock & this) with(this) {
    183         if (owner == active_thread()) {
    184                 recursion_count++;
    185                 return;
    186         }
    187         lock( lock __cfaabi_dbg_ctx2 );
    188 
    189         if (owner != 0p) {
    190                 insert_last( blocked_threads, *active_thread() );
    191                 unlock( lock );
    192                 park( );
    193                 return;
    194         }
    195         owner = active_thread();
    196         recursion_count = 1;
    197         unlock( lock );
    198 }
    199 
    200 void pop_and_set_new_owner( simple_owner_lock & this ) with( this ) {
    201         thread$ * t = &try_pop_front( blocked_threads );
    202         owner = t;
    203         recursion_count = ( t ? 1 : 0 );
    204         unpark( t );
    205 }
    206 
    207 static inline void unlock(simple_owner_lock & this) with(this) {
    208         lock( lock __cfaabi_dbg_ctx2 );
    209         /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
    210         /* paranoid */ verifyf( owner == active_thread(), "Thread %p other than the owner %p attempted to release owner lock %p", owner, active_thread(), &this );
    211         // if recursion count is zero release lock and set new owner if one is waiting
    212         recursion_count--;
    213         if ( recursion_count == 0 ) {
    214                 pop_and_set_new_owner( this );
    215         }
    216         unlock( lock );
    217 }
    218 
    219 static inline void on_notify(simple_owner_lock & this, struct thread$ * t ) with(this) {
    220         lock( lock __cfaabi_dbg_ctx2 );
    221         // lock held
    222         if ( owner != 0p ) {
    223                 insert_last( blocked_threads, *t );
    224                 unlock( lock );
    225         }
    226         // lock not held
    227         else {
    228                 owner = t;
    229                 recursion_count = 1;
    230                 unpark( t );
    231                 unlock( lock );
    232         }
    233 }
    234 
    235 static inline size_t on_wait(simple_owner_lock & this) with(this) {
    236         lock( lock __cfaabi_dbg_ctx2 );
    237         /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
    238         /* paranoid */ verifyf( owner == active_thread(), "Thread %p other than the owner %p attempted to release owner lock %p", owner, active_thread(), &this );
    239 
    240         size_t ret = recursion_count;
    241 
    242         pop_and_set_new_owner( this );
    243 
    244         unlock( lock );
    245         return ret;
    246 }
    247 
    248 static inline void on_wakeup(simple_owner_lock & this, size_t recursion ) with(this) { recursion_count = recursion; }
    249 
    250 
    251 //-----------------------------------------------------------------------------
    252180// alarm node wrapper
    253181forall(L & | is_blocking_lock(L)) {
     
    291219        // this casts the alarm node to our wrapped type since we used type erasure
    292220        static void alarm_node_wrap_cast( alarm_node_t & a ) { timeout_handler( (alarm_node_wrap(L) &)a ); }
     221
     222        struct pthread_alarm_node_wrap {
     223                alarm_node_t alarm_node;
     224                pthread_cond_var(L) * cond;
     225                info_thread(L) * info_thd;
     226        };
     227
     228        void ?{}( pthread_alarm_node_wrap(L) & this, Duration alarm, Duration period, Alarm_Callback callback, pthread_cond_var(L) * c, info_thread(L) * i ) {
     229                this.alarm_node{ callback, alarm, period };
     230                this.cond = c;
     231                this.info_thd = i;
     232        }
     233
     234        void ^?{}( pthread_alarm_node_wrap(L) & this ) { }
     235
     236        static void timeout_handler ( pthread_alarm_node_wrap(L) & this ) with( this ) {
     237                // This pthread_cond_var member is called from the kernel, and therefore, cannot block, but it can spin.
     238                lock( cond->lock __cfaabi_dbg_ctx2 );
     239
     240                // this check is necessary to avoid a race condition since this timeout handler
     241                //      may still be called after a thread has been removed from the queue but
     242                //      before the alarm is unregistered
     243                if ( (*info_thd)`isListed ) {   // is thread on queue
     244                        info_thd->signalled = false;
     245                        // remove this thread O(1)
     246                        remove( *info_thd );
     247                        on_notify(*info_thd->lock, info_thd->t);
     248                }
     249                unlock( cond->lock );
     250        }
     251
     252        // this casts the alarm node to our wrapped type since we used type erasure
     253        static void pthread_alarm_node_wrap_cast( alarm_node_t & a ) { timeout_handler( (pthread_alarm_node_wrap(L) &)a ); }
    293254}
    294255
     
    460421                on_wakeup(*i.lock, recursion_count);
    461422        }
    462 }
    463 
     423
     424        //-----------------------------------------------------------------------------
     425        // pthread_cond_var
     426
     427        void  ?{}( pthread_cond_var(L) & this ) with(this) {
     428                blocked_threads{};
     429                lock{};
     430        }
     431
     432        void ^?{}( pthread_cond_var(L) & this ) { }
     433
     434        bool notify_one( pthread_cond_var(L) & this ) with(this) {
     435                lock( lock __cfaabi_dbg_ctx2 );
     436                bool ret = ! blocked_threads`isEmpty;
     437                if ( ret ) {
     438                        info_thread(L) & popped = try_pop_front( blocked_threads );
     439                        on_notify(*popped.lock, popped.t);
     440                }
     441                unlock( lock );
     442                return ret;
     443        }
     444
     445        bool notify_all( pthread_cond_var(L) & this ) with(this) {
     446                lock( lock __cfaabi_dbg_ctx2 );
     447                bool ret = ! blocked_threads`isEmpty;
     448                while( ! blocked_threads`isEmpty ) {
     449                        info_thread(L) & popped = try_pop_front( blocked_threads );
     450                        on_notify(*popped.lock, popped.t);
     451                }
     452                unlock( lock );
     453                return ret;
     454        }
     455
     456        uintptr_t front( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty ? NULL : blocked_threads`first.info; }
     457        bool empty ( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty; }
     458
     459        static size_t queue_and_get_recursion( pthread_cond_var(L) & this, info_thread(L) * i ) with(this) {
     460                // add info_thread to waiting queue
     461                insert_last( blocked_threads, *i );
     462                size_t recursion_count = 0;
     463                recursion_count = on_wait( *i->lock );
     464                return recursion_count;
     465        }
     466       
     467        static void queue_info_thread_timeout( pthread_cond_var(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) {
     468                lock( lock __cfaabi_dbg_ctx2 );
     469                size_t recursion_count = queue_and_get_recursion(this, &info);
     470                pthread_alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info };
     471                register_self( &node_wrap.alarm_node );
     472                unlock( lock );
     473
     474                // blocks here
     475                park();
     476
     477                // unregisters alarm so it doesn't go off if this happens first
     478                unregister_self( &node_wrap.alarm_node );
     479
     480                // resets recursion count here after waking
     481                if (info.lock) on_wakeup(*info.lock, recursion_count);
     482        }
     483
     484        void wait( pthread_cond_var(L) & this, L & l ) with(this) {
     485                wait( this, l, 0 );
     486        }
     487
     488        void wait( pthread_cond_var(L) & this, L & l, uintptr_t info ) with(this) {
     489                lock( lock __cfaabi_dbg_ctx2 );
     490                info_thread( L ) i = { active_thread(), info, &l };
     491                size_t recursion_count = queue_and_get_recursion(this, &i);
     492                unlock( lock );
     493                park( );
     494                on_wakeup(*i.lock, recursion_count);
     495        }
     496
     497        #define PTHREAD_WAIT_TIME( u, l, t ) \
     498                info_thread( L ) i = { active_thread(), u, l }; \
     499                queue_info_thread_timeout(this, i, t, pthread_alarm_node_wrap_cast ); \
     500                return i.signalled;
     501
     502        bool wait( pthread_cond_var(L) & this, L & l, timespec t ) {
     503                Duration d = { t };
     504                WAIT_TIME( 0, &l , d )
     505        }
     506       
     507        bool wait( pthread_cond_var(L) & this, L & l, uintptr_t info, timespec t  ) {
     508                Duration d = { t };
     509                WAIT_TIME( info, &l , d )
     510        }
     511}
    464512//-----------------------------------------------------------------------------
    465513// Semaphore
Note: See TracChangeset for help on using the changeset viewer.