Changeset ae06e0b


Ignore:
Timestamp:
Jun 8, 2022, 4:23:08 PM (2 years ago)
Author:
caparsons <caparson@…>
Branches:
ADT, ast-experimental, master, pthread-emulation, qualifiedEnum
Children:
55422cf
Parents:
ced5e2a
Message:

added pthread_cond_var

Location:
libcfa/src/concurrency
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/locks.cfa

    rced5e2a rae06e0b  
    178178
    179179//-----------------------------------------------------------------------------
    180 // simple_owner_lock
    181 
    182 static inline void lock(simple_owner_lock & this) with(this) {
    183         if (owner == active_thread()) {
    184                 recursion_count++;
    185                 return;
    186         }
    187         lock( lock __cfaabi_dbg_ctx2 );
    188 
    189         if (owner != 0p) {
    190                 insert_last( blocked_threads, *active_thread() );
    191                 unlock( lock );
    192                 park( );
    193                 return;
    194         }
    195         owner = active_thread();
    196         recursion_count = 1;
    197         unlock( lock );
    198 }
    199 
    200 void pop_and_set_new_owner( simple_owner_lock & this ) with( this ) {
    201         thread$ * t = &try_pop_front( blocked_threads );
    202         owner = t;
    203         recursion_count = ( t ? 1 : 0 );
    204         unpark( t );
    205 }
    206 
    207 static inline void unlock(simple_owner_lock & this) with(this) {
    208         lock( lock __cfaabi_dbg_ctx2 );
    209         /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
    210         /* paranoid */ verifyf( owner == active_thread(), "Thread %p other than the owner %p attempted to release owner lock %p", owner, active_thread(), &this );
    211         // if recursion count is zero release lock and set new owner if one is waiting
    212         recursion_count--;
    213         if ( recursion_count == 0 ) {
    214                 pop_and_set_new_owner( this );
    215         }
    216         unlock( lock );
    217 }
    218 
    219 static inline void on_notify(simple_owner_lock & this, struct thread$ * t ) with(this) {
    220         lock( lock __cfaabi_dbg_ctx2 );
    221         // lock held
    222         if ( owner != 0p ) {
    223                 insert_last( blocked_threads, *t );
    224                 unlock( lock );
    225         }
    226         // lock not held
    227         else {
    228                 owner = t;
    229                 recursion_count = 1;
    230                 unpark( t );
    231                 unlock( lock );
    232         }
    233 }
    234 
    235 static inline size_t on_wait(simple_owner_lock & this) with(this) {
    236         lock( lock __cfaabi_dbg_ctx2 );
    237         /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
    238         /* paranoid */ verifyf( owner == active_thread(), "Thread %p other than the owner %p attempted to release owner lock %p", owner, active_thread(), &this );
    239 
    240         size_t ret = recursion_count;
    241 
    242         pop_and_set_new_owner( this );
    243 
    244         unlock( lock );
    245         return ret;
    246 }
    247 
    248 static inline void on_wakeup(simple_owner_lock & this, size_t recursion ) with(this) { recursion_count = recursion; }
    249 
    250 
    251 //-----------------------------------------------------------------------------
    252180// alarm node wrapper
    253181forall(L & | is_blocking_lock(L)) {
     
    291219        // this casts the alarm node to our wrapped type since we used type erasure
    292220        static void alarm_node_wrap_cast( alarm_node_t & a ) { timeout_handler( (alarm_node_wrap(L) &)a ); }
     221
     222        struct pthread_alarm_node_wrap {
     223                alarm_node_t alarm_node;
     224                pthread_cond_var(L) * cond;
     225                info_thread(L) * info_thd;
     226        };
     227
     228        void ?{}( pthread_alarm_node_wrap(L) & this, Duration alarm, Duration period, Alarm_Callback callback, pthread_cond_var(L) * c, info_thread(L) * i ) {
     229                this.alarm_node{ callback, alarm, period };
     230                this.cond = c;
     231                this.info_thd = i;
     232        }
     233
     234        void ^?{}( pthread_alarm_node_wrap(L) & this ) { }
     235
     236        static void timeout_handler ( pthread_alarm_node_wrap(L) & this ) with( this ) {
     237                // This pthread_cond_var member is called from the kernel, and therefore, cannot block, but it can spin.
     238                lock( cond->lock __cfaabi_dbg_ctx2 );
     239
     240                // this check is necessary to avoid a race condition since this timeout handler
     241                //      may still be called after a thread has been removed from the queue but
     242                //      before the alarm is unregistered
     243                if ( (*info_thd)`isListed ) {   // is thread on queue
     244                        info_thd->signalled = false;
     245                        // remove this thread O(1)
     246                        remove( *info_thd );
     247                        on_notify(*info_thd->lock, info_thd->t);
     248                }
     249                unlock( cond->lock );
     250        }
     251
     252        // this casts the alarm node to our wrapped type since we used type erasure
     253        static void pthread_alarm_node_wrap_cast( alarm_node_t & a ) { timeout_handler( (pthread_alarm_node_wrap(L) &)a ); }
    293254}
    294255
     
    460421                on_wakeup(*i.lock, recursion_count);
    461422        }
    462 }
    463 
     423
     424        //-----------------------------------------------------------------------------
     425        // pthread_cond_var
     426
     427        void  ?{}( pthread_cond_var(L) & this ) with(this) {
     428                blocked_threads{};
     429                lock{};
     430        }
     431
     432        void ^?{}( pthread_cond_var(L) & this ) { }
     433
     434        bool notify_one( pthread_cond_var(L) & this ) with(this) {
     435                lock( lock __cfaabi_dbg_ctx2 );
     436                bool ret = ! blocked_threads`isEmpty;
     437                if ( ret ) {
     438                        info_thread(L) & popped = try_pop_front( blocked_threads );
     439                        on_notify(*popped.lock, popped.t);
     440                }
     441                unlock( lock );
     442                return ret;
     443        }
     444
     445        bool notify_all( pthread_cond_var(L) & this ) with(this) {
     446                lock( lock __cfaabi_dbg_ctx2 );
     447                bool ret = ! blocked_threads`isEmpty;
     448                while( ! blocked_threads`isEmpty ) {
     449                        info_thread(L) & popped = try_pop_front( blocked_threads );
     450                        on_notify(*popped.lock, popped.t);
     451                }
     452                unlock( lock );
     453                return ret;
     454        }
     455
     456        uintptr_t front( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty ? NULL : blocked_threads`first.info; }
     457        bool empty ( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty; }
     458
     459        static size_t queue_and_get_recursion( pthread_cond_var(L) & this, info_thread(L) * i ) with(this) {
     460                // add info_thread to waiting queue
     461                insert_last( blocked_threads, *i );
     462                size_t recursion_count = 0;
     463                recursion_count = on_wait( *i->lock );
     464                return recursion_count;
     465        }
     466       
     467        static void queue_info_thread_timeout( pthread_cond_var(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) {
     468                lock( lock __cfaabi_dbg_ctx2 );
     469                size_t recursion_count = queue_and_get_recursion(this, &info);
     470                pthread_alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info };
     471                register_self( &node_wrap.alarm_node );
     472                unlock( lock );
     473
     474                // blocks here
     475                park();
     476
     477                // unregisters alarm so it doesn't go off if this happens first
     478                unregister_self( &node_wrap.alarm_node );
     479
     480                // resets recursion count here after waking
     481                if (info.lock) on_wakeup(*info.lock, recursion_count);
     482        }
     483
     484        void wait( pthread_cond_var(L) & this, L & l ) with(this) {
     485                wait( this, l, 0 );
     486        }
     487
     488        void wait( pthread_cond_var(L) & this, L & l, uintptr_t info ) with(this) {
     489                lock( lock __cfaabi_dbg_ctx2 );
     490                info_thread( L ) i = { active_thread(), info, &l };
     491                size_t recursion_count = queue_and_get_recursion(this, &i);
     492                unlock( lock );
     493                park( );
     494                on_wakeup(*i.lock, recursion_count);
     495        }
     496
     497        #define PTHREAD_WAIT_TIME( u, l, t ) \
     498                info_thread( L ) i = { active_thread(), u, l }; \
     499                queue_info_thread_timeout(this, i, t, pthread_alarm_node_wrap_cast ); \
     500                return i.signalled;
     501
     502        bool wait( pthread_cond_var(L) & this, L & l, timespec t ) {
     503                Duration d = { t };
     504                WAIT_TIME( 0, &l , d )
     505        }
     506       
     507        bool wait( pthread_cond_var(L) & this, L & l, uintptr_t info, timespec t  ) {
     508                Duration d = { t };
     509                WAIT_TIME( info, &l , d )
     510        }
     511}
    464512//-----------------------------------------------------------------------------
    465513// Semaphore
  • libcfa/src/concurrency/locks.hfa

    rced5e2a rae06e0b  
    366366static inline void ?=?( simple_owner_lock & this, simple_owner_lock this2 ) = void;
    367367
     368static inline void lock(simple_owner_lock & this) with(this) {
     369        if (owner == active_thread()) {
     370                recursion_count++;
     371                return;
     372        }
     373        lock( lock __cfaabi_dbg_ctx2 );
     374
     375        if (owner != 0p) {
     376                insert_last( blocked_threads, *active_thread() );
     377                unlock( lock );
     378                park( );
     379                return;
     380        }
     381        owner = active_thread();
     382        recursion_count = 1;
     383        unlock( lock );
     384}
     385
     386// TODO: fix duplicate def issue and bring this back
     387// void pop_and_set_new_owner( simple_owner_lock & this ) with( this ) {
     388        // thread$ * t = &try_pop_front( blocked_threads );
     389        // owner = t;
     390        // recursion_count = ( t ? 1 : 0 );
     391        // unpark( t );
     392// }
     393
     394static inline void unlock(simple_owner_lock & this) with(this) {
     395        lock( lock __cfaabi_dbg_ctx2 );
     396        /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
     397        /* paranoid */ verifyf( owner == active_thread(), "Thread %p other than the owner %p attempted to release owner lock %p", owner, active_thread(), &this );
     398        // if recursion count is zero release lock and set new owner if one is waiting
     399        recursion_count--;
     400        if ( recursion_count == 0 ) {
     401                // pop_and_set_new_owner( this );
     402                thread$ * t = &try_pop_front( blocked_threads );
     403                owner = t;
     404                recursion_count = ( t ? 1 : 0 );
     405                unpark( t );
     406        }
     407        unlock( lock );
     408}
     409
     410static inline void on_notify(simple_owner_lock & this, struct thread$ * t ) with(this) {
     411        lock( lock __cfaabi_dbg_ctx2 );
     412        // lock held
     413        if ( owner != 0p ) {
     414                insert_last( blocked_threads, *t );
     415                unlock( lock );
     416        }
     417        // lock not held
     418        else {
     419                owner = t;
     420                recursion_count = 1;
     421                unpark( t );
     422                unlock( lock );
     423        }
     424}
     425
     426static inline size_t on_wait(simple_owner_lock & this) with(this) {
     427        lock( lock __cfaabi_dbg_ctx2 );
     428        /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
     429        /* paranoid */ verifyf( owner == active_thread(), "Thread %p other than the owner %p attempted to release owner lock %p", owner, active_thread(), &this );
     430
     431        size_t ret = recursion_count;
     432
     433        // pop_and_set_new_owner( this );
     434
     435        thread$ * t = &try_pop_front( blocked_threads );
     436        owner = t;
     437        recursion_count = ( t ? 1 : 0 );
     438        unpark( t );
     439
     440        unlock( lock );
     441        return ret;
     442}
     443
     444static inline void on_wakeup(simple_owner_lock & this, size_t recursion ) with(this) { recursion_count = recursion; }
     445
    368446//-----------------------------------------------------------------------------
    369447// Spin Queue Lock
     
    606684        // - signalling without holding branded lock is UNSAFE!
    607685        // - only allows usage of one lock, cond var is branded after usage
     686
    608687        struct fast_cond_var {
    609688                // List of blocked threads
    610689                dlist( info_thread(L) ) blocked_threads;
    611 
    612690                #ifdef __CFA_DEBUG__
    613691                L * lock_used;
     
    615693        };
    616694
    617 
    618695        void  ?{}( fast_cond_var(L) & this );
    619696        void ^?{}( fast_cond_var(L) & this );
     
    623700
    624701        uintptr_t front( fast_cond_var(L) & this );
    625 
    626702        bool empty  ( fast_cond_var(L) & this );
    627703
    628704        void wait( fast_cond_var(L) & this, L & l );
    629705        void wait( fast_cond_var(L) & this, L & l, uintptr_t info );
    630 }
     706
     707
     708        //-----------------------------------------------------------------------------
     709        // pthread_cond_var
     710        //
     711        // - cond var with minimal footprint
     712        // - supports operations needed for phthread cond
     713
     714        struct pthread_cond_var {
     715                dlist( info_thread(L) ) blocked_threads;
     716                __spinlock_t lock;
     717        };
     718
     719        void  ?{}( pthread_cond_var(L) & this );
     720        void ^?{}( pthread_cond_var(L) & this );
     721
     722        bool notify_one( pthread_cond_var(L) & this );
     723        bool notify_all( pthread_cond_var(L) & this );
     724
     725        uintptr_t front( pthread_cond_var(L) & this );
     726        bool empty ( pthread_cond_var(L) & this );
     727
     728        void wait( pthread_cond_var(L) & this, L & l );
     729        void wait( pthread_cond_var(L) & this, L & l, uintptr_t info );
     730        bool wait( pthread_cond_var(L) & this, L & l, timespec t );
     731        bool wait( pthread_cond_var(L) & this, L & l, uintptr_t info, timespec t );
     732}
Note: See TracChangeset for help on using the changeset viewer.