Ignore:
Timestamp:
May 29, 2023, 11:44:29 AM (2 years ago)
Author:
JiadaL <j82liang@…>
Branches:
ADT
Children:
fa2c005
Parents:
3a513d89 (diff), 2b78949 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' into ADT

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/locks.cfa

    r3a513d89 r044ae62  
    55// file "LICENCE" distributed with Cforall.
    66//
    7 // locks.hfa -- LIBCFATHREAD
     7// locks.cfa -- LIBCFATHREAD
    88// Runtime locks that used with the runtime thread system.
    99//
     
    7979        // lock is held by some other thread
    8080        if ( owner != 0p && owner != thrd ) {
    81                 insert_last( blocked_threads, *thrd );
     81        select_node node;
     82                insert_last( blocked_threads, node );
    8283                wait_count++;
    8384                unlock( lock );
    8485                park( );
    85         }
    86         // multi acquisition lock is held by current thread
    87         else if ( owner == thrd && multi_acquisition ) {
     86        return;
     87        } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread
    8888                recursion_count++;
    89                 unlock( lock );
    90         }
    91         // lock isn't held
    92         else {
     89        } else {  // lock isn't held
    9390                owner = thrd;
    9491                recursion_count = 1;
    95                 unlock( lock );
    96         }
     92        }
     93    unlock( lock );
    9794}
    9895
     
    117114}
    118115
    119 static void pop_and_set_new_owner( blocking_lock & this ) with( this ) {
    120         thread$ * t = &try_pop_front( blocked_threads );
    121         owner = t;
    122         recursion_count = ( t ? 1 : 0 );
    123         if ( t ) wait_count--;
    124         unpark( t );
     116static inline void pop_node( blocking_lock & this ) with( this ) {
     117    __handle_waituntil_OR( blocked_threads );
     118    select_node * node = &try_pop_front( blocked_threads );
     119    if ( node ) {
     120        wait_count--;
     121        owner = node->blocked_thread;
     122        recursion_count = 1;
     123        // if ( !node->clause_status || __make_select_node_available( *node ) ) unpark( node->blocked_thread );
     124        wake_one( blocked_threads, *node );
     125    } else {
     126        owner = 0p;
     127        recursion_count = 0;
     128    }
    125129}
    126130
     
    134138        recursion_count--;
    135139        if ( recursion_count == 0 ) {
    136                 pop_and_set_new_owner( this );
     140                pop_node( this );
    137141        }
    138142        unlock( lock );
     
    147151        // lock held
    148152        if ( owner != 0p ) {
    149                 insert_last( blocked_threads, *t );
     153                insert_last( blocked_threads, *(select_node *)t->link_node );
    150154                wait_count++;
    151                 unlock( lock );
    152155        }
    153156        // lock not held
     
    156159                recursion_count = 1;
    157160                unpark( t );
    158                 unlock( lock );
    159         }
    160 }
    161 
    162 size_t on_wait( blocking_lock & this ) with( this ) {
     161        }
     162    unlock( lock );
     163}
     164
     165size_t on_wait( blocking_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) with( this ) {
    163166        lock( lock __cfaabi_dbg_ctx2 );
    164167        /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
     
    167170        size_t ret = recursion_count;
    168171
    169         pop_and_set_new_owner( this );
     172        pop_node( this );
     173
     174    select_node node;
     175    active_thread()->link_node = (void *)&node;
    170176        unlock( lock );
     177
     178    pre_park_then_park( pp_fn, pp_datum );
     179
    171180        return ret;
    172181}
     
    175184        recursion_count = recursion;
    176185}
     186
     187// waituntil() support
     188bool register_select( blocking_lock & this, select_node & node ) with(this) {
     189    lock( lock __cfaabi_dbg_ctx2 );
     190        thread$ * thrd = active_thread();
     191
     192        // single acquisition lock is held by current thread
     193        /* paranoid */ verifyf( owner != thrd || multi_acquisition, "Single acquisition lock holder (%p) attempted to reacquire the lock %p resulting in a deadlock.", owner, &this );
     194
     195    if ( !node.park_counter && ( (owner == thrd && multi_acquisition) || owner == 0p ) ) { // OR special case
     196        if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
     197           unlock( lock );
     198           return false;
     199        }
     200    }
     201
     202        // lock is held by some other thread
     203        if ( owner != 0p && owner != thrd ) {
     204                insert_last( blocked_threads, node );
     205                wait_count++;
     206                unlock( lock );
     207        return false;
     208        } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread
     209                recursion_count++;
     210        } else {  // lock isn't held
     211                owner = thrd;
     212                recursion_count = 1;
     213        }
     214
     215    if ( node.park_counter ) __make_select_node_available( node );
     216    unlock( lock );
     217    return true;
     218}
     219
     220bool unregister_select( blocking_lock & this, select_node & node ) with(this) {
     221    lock( lock __cfaabi_dbg_ctx2 );
     222    if ( node`isListed ) {
     223        remove( node );
     224        wait_count--;
     225        unlock( lock );
     226        return false;
     227    }
     228   
     229    if ( owner == active_thread() ) {
     230        /* paranoid */ verifyf( recursion_count == 1 || multi_acquisition, "Thread %p attempted to unlock owner lock %p in waituntil unregister, which is not recursive but has a recursive count of %zu", active_thread(), &this, recursion_count );
     231        // if recursion count is zero release lock and set new owner if one is waiting
     232        recursion_count--;
     233        if ( recursion_count == 0 ) {
     234            pop_node( this );
     235        }
     236    }
     237        unlock( lock );
     238    return false;
     239}
     240
     241bool on_selected( blocking_lock & this, select_node & node ) { return true; }
    177242
    178243//-----------------------------------------------------------------------------
     
    311376        int counter( condition_variable(L) & this ) with(this) { return count; }
    312377
    313         static size_t queue_and_get_recursion( condition_variable(L) & this, info_thread(L) * i ) with(this) {
     378        static void enqueue_thread( condition_variable(L) & this, info_thread(L) * i ) with(this) {
    314379                // add info_thread to waiting queue
    315380                insert_last( blocked_threads, *i );
    316381                count++;
    317                 size_t recursion_count = 0;
    318                 if (i->lock) {
    319                         // if lock was passed get recursion count to reset to after waking thread
    320                         recursion_count = on_wait( *i->lock );
    321                 }
    322                 return recursion_count;
    323         }
     382        }
     383
     384    static size_t block_and_get_recursion( info_thread(L) & i, __cfa_pre_park pp_fn, void * pp_datum ) {
     385        size_t recursion_count = 0;
     386                if ( i.lock ) // if lock was passed get recursion count to reset to after waking thread
     387                        recursion_count = on_wait( *i.lock, pp_fn, pp_datum ); // this call blocks
     388                else
     389            pre_park_then_park( pp_fn, pp_datum );
     390        return recursion_count;
     391    }
     392    static size_t block_and_get_recursion( info_thread(L) & i ) { return block_and_get_recursion( i, pre_park_noop, 0p ); }
    324393
    325394        // helper for wait()'s' with no timeout
    326395        static void queue_info_thread( condition_variable(L) & this, info_thread(L) & i ) with(this) {
    327396                lock( lock __cfaabi_dbg_ctx2 );
    328                 size_t recursion_count = queue_and_get_recursion(this, &i);
     397        enqueue_thread( this, &i );
    329398                unlock( lock );
    330399
    331400                // blocks here
    332                 park( );
     401        size_t recursion_count = block_and_get_recursion( i );
    333402
    334403                // resets recursion count here after waking
    335                 if (i.lock) on_wakeup(*i.lock, recursion_count);
     404                if ( i.lock ) on_wakeup( *i.lock, recursion_count );
    336405        }
    337406
     
    340409                queue_info_thread( this, i );
    341410
     411    static void cond_alarm_register( void * node_ptr ) { register_self( (alarm_node_t *)node_ptr ); }
     412
    342413        // helper for wait()'s' with a timeout
    343414        static void queue_info_thread_timeout( condition_variable(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) {
    344415                lock( lock __cfaabi_dbg_ctx2 );
    345                 size_t recursion_count = queue_and_get_recursion(this, &info);
     416        enqueue_thread( this, &info );
    346417                alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info };
    347418                unlock( lock );
    348419
    349                 // registers alarm outside cond lock to avoid deadlock
    350                 register_self( &node_wrap.alarm_node );
    351 
    352                 // blocks here
    353                 park();
     420                // blocks here and registers alarm node before blocking after releasing locks to avoid deadlock
     421        size_t recursion_count = block_and_get_recursion( info, cond_alarm_register, (void *)(&node_wrap.alarm_node) );
     422                // park();
    354423
    355424                // unregisters alarm so it doesn't go off if this happens first
     
    357426
    358427                // resets recursion count here after waking
    359                 if (info.lock) on_wakeup(*info.lock, recursion_count);
     428                if ( info.lock ) on_wakeup( *info.lock, recursion_count );
    360429        }
    361430
     
    417486                info_thread( L ) i = { active_thread(), info, &l };
    418487                insert_last( blocked_threads, i );
    419                 size_t recursion_count = on_wait( *i.lock );
    420                 park( );
     488                size_t recursion_count = on_wait( *i.lock, pre_park_noop, 0p ); // blocks here
     489                // park( );
    421490                on_wakeup(*i.lock, recursion_count);
    422491        }
     
    459528        bool empty ( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty; }
    460529
    461         static size_t queue_and_get_recursion( pthread_cond_var(L) & this, info_thread(L) * i ) with(this) {
    462                 // add info_thread to waiting queue
    463                 insert_last( blocked_threads, *i );
    464                 size_t recursion_count = 0;
    465                 recursion_count = on_wait( *i->lock );
    466                 return recursion_count;
    467         }
    468        
    469530        static void queue_info_thread_timeout( pthread_cond_var(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) {
    470531                lock( lock __cfaabi_dbg_ctx2 );
    471                 size_t recursion_count = queue_and_get_recursion(this, &info);
     532        insert_last( blocked_threads, info );
    472533                pthread_alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info };
    473534                unlock( lock );
    474535
    475                 // registers alarm outside cond lock to avoid deadlock
    476                 register_self( &node_wrap.alarm_node );
    477 
    478                 // blocks here
    479                 park();
    480 
    481                 // unregisters alarm so it doesn't go off if this happens first
     536                // blocks here and registers alarm node before blocking after releasing locks to avoid deadlock
     537        size_t recursion_count = block_and_get_recursion( info, cond_alarm_register, (void *)(&node_wrap.alarm_node) );
     538
     539                // unregisters alarm so it doesn't go off if signal happens first
    482540                unregister_self( &node_wrap.alarm_node );
    483541
    484542                // resets recursion count here after waking
    485                 if (info.lock) on_wakeup(*info.lock, recursion_count);
     543                if ( info.lock ) on_wakeup( *info.lock, recursion_count );
    486544        }
    487545
     
    493551                lock( lock __cfaabi_dbg_ctx2 );
    494552                info_thread( L ) i = { active_thread(), info, &l };
    495                 size_t recursion_count = queue_and_get_recursion(this, &i);
    496                 unlock( lock );
    497                 park( );
    498                 on_wakeup(*i.lock, recursion_count);
     553        insert_last( blocked_threads, i );
     554                unlock( lock );
     555
     556        // blocks here
     557                size_t recursion_count = block_and_get_recursion( i );
     558
     559                on_wakeup( *i.lock, recursion_count );
    499560        }
    500561
     
    584645        return thrd != 0p;
    585646}
     647
Note: See TracChangeset for help on using the changeset viewer.