Changeset beeff61e for libcfa/src


Ignore:
Timestamp:
May 1, 2023, 4:00:06 PM (14 months ago)
Author:
caparsons <caparson@…>
Branches:
ADT, ast-experimental, master
Children:
73bf7ddc
Parents:
bb7422a
Message:

some cleanup and a bunch of changes to support waituntil statement

Location:
libcfa/src/concurrency
Files:
8 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/channel.hfa

    rbb7422a rbeeff61e  
    44#include <list.hfa>
    55#include <mutex_stmt.hfa>
    6 
    7 // link field used for threads waiting on channel
    8 struct wait_link {
    9     // used to put wait_link on a dl queue
    10     inline dlink(wait_link);
    11 
    12     // waiting thread
    13     struct thread$ * t;
    14 
    15     // shadow field
    16     void * elem;
    17 };
    18 P9_EMBEDDED( wait_link, dlink(wait_link) )
    19 
    20 static inline void ?{}( wait_link & this, thread$ * t, void * elem ) {
    21     this.t = t;
    22     this.elem = elem;
    23 }
    24 
    25 // wake one thread from the list
    26 static inline void wake_one( dlist( wait_link ) & queue ) {
    27     wait_link & popped = try_pop_front( queue );
    28     unpark( popped.t );
    29 }
     6#include "select.hfa"
    307
    318// returns true if woken due to shutdown
    329// blocks thread on list and releases passed lock
    33 static inline bool block( dlist( wait_link ) & queue, void * elem_ptr, go_mutex & lock ) {
    34     wait_link w{ active_thread(), elem_ptr };
    35     insert_last( queue, w );
     10static inline bool block( dlist( select_node ) & queue, void * elem_ptr, go_mutex & lock ) {
     11    select_node sn{ active_thread(), elem_ptr };
     12    insert_last( queue, sn );
    3613    unlock( lock );
    3714    park();
    38     return w.elem == 0p;
     15    return sn.extra == 0p;
     16}
     17
     18// Waituntil support (un)register_select helper routine
     19// Sets select node avail if not special OR case and then unlocks
     20static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) {
     21    if ( node.park_counter ) __make_select_node_available( node );
     22    unlock( mutex_lock );
    3923}
    4024
     
    5943    size_t size, front, back, count;
    6044    T * buffer;
    61     dlist( wait_link ) prods, cons; // lists of blocked threads
     45    dlist( select_node ) prods, cons; // lists of blocked threads
    6246    go_mutex mutex_lock;            // MX lock
    6347    bool closed;                    // indicates channel close/open
     
    7054    size = _size;
    7155    front = back = count = 0;
    72     buffer = aalloc( size );
     56    if ( size != 0 ) buffer = aalloc( size );
    7357    prods{};
    7458    cons{};
     
    8771    #endif
    8872    verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" );
    89     delete( buffer );
     73    if ( size != 0 ) delete( buffer );
    9074}
    9175static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
     
    10286    // flush waiting consumers and producers
    10387    while ( has_waiting_consumers( chan ) ) {
    104         cons`first.elem = 0p;
     88        if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race
     89            break;  // if __handle_waituntil_OR returns false cons is empty so break
     90        cons`first.extra = 0p;
    10591        wake_one( cons );
    10692    }
    10793    while ( has_waiting_producers( chan ) ) {
    108         prods`first.elem = 0p;
     94        if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race
     95            break;  // if __handle_waituntil_OR returns false prods is empty so break
     96        prods`first.extra = 0p;
    10997        wake_one( prods );
    11098    }
     
    114102static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
    115103
     104// used to hand an element to a blocked consumer and signal it
     105static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {
     106    memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work
     107    wake_one( cons );
     108}
     109
     110// used to hand an element to a blocked producer and signal it
     111static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {
     112    memcpy( (void *)&retval, prods`first.extra, sizeof(T) );
     113    wake_one( prods );
     114}
     115
    116116static inline void flush( channel(T) & chan, T elem ) with(chan) {
    117117    lock( mutex_lock );
    118118    while ( count == 0 && !cons`isEmpty ) {
    119         memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    120         wake_one( cons );
     119        __cons_handoff( chan, elem );
    121120    }
    122121    unlock( mutex_lock );
     
    125124// handles buffer insert
    126125static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
    127     memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
     126    memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) );
    128127    count += 1;
    129128    back++;
     
    131130}
    132131
    133 // does the buffer insert or hands elem directly to consumer if one is waiting
    134 static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) {
    135     if ( count == 0 && !cons`isEmpty ) {
    136         memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    137         wake_one( cons );
    138     } else __buf_insert( chan, elem );
    139 }
    140 
    141132// needed to avoid an extra copy in closed case
    142133static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
     
    145136    operations++;
    146137    #endif
     138
     139    ConsEmpty: if ( !cons`isEmpty ) {
     140        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
     141        __cons_handoff( chan, elem );
     142        unlock( mutex_lock );
     143        return true;
     144    }
     145
    147146    if ( count == size ) { unlock( mutex_lock ); return false; }
    148     __do_insert( chan, elem );
     147
     148    __buf_insert( chan, elem );
    149149    unlock( mutex_lock );
    150150    return true;
     
    157157// handles closed case of insert routine
    158158static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
    159     channel_closed except{&channel_closed_vt, &elem, &chan };
     159    channel_closed except{ &channel_closed_vt, &elem, &chan };
    160160    throwResume except; // throw closed resumption
    161161    if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
     
    182182    }
    183183
    184     // have to check for the zero size channel case
    185     if ( size == 0 && !cons`isEmpty ) {
    186         memcpy(cons`first.elem, (void *)&elem, sizeof(T));
    187         wake_one( cons );
    188         unlock( mutex_lock );
    189         return true;
     184    // buffer count must be zero if cons are blocked (also handles zero-size case)
     185    ConsEmpty: if ( !cons`isEmpty ) {
     186        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
     187        __cons_handoff( chan, elem );
     188        unlock( mutex_lock );
     189        return;
    190190    }
    191191
     
    202202    } // if
    203203
    204     if ( count == 0 && !cons`isEmpty ) {
    205         memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    206         wake_one( cons );
    207     } else __buf_insert( chan, elem );
    208    
    209     unlock( mutex_lock );
    210     return;
    211 }
    212 
    213 // handles buffer remove
    214 static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) {
    215     memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
     204    __buf_insert( chan, elem );
     205    unlock( mutex_lock );
     206}
     207
     208// does the buffer remove and potentially does waiting producer work
     209static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
     210    memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) );
    216211    count -= 1;
    217212    front = (front + 1) % size;
    218 }
    219 
    220 // does the buffer remove and potentially does waiting producer work
    221 static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
    222     __buf_remove( chan, retval );
    223213    if (count == size - 1 && !prods`isEmpty ) {
    224         __buf_insert( chan, *(T *)prods`first.elem );  // do waiting producer work
     214        if ( !__handle_waituntil_OR( prods ) ) return;
     215        __buf_insert( chan, *(T *)prods`first.extra );  // do waiting producer work
    225216        wake_one( prods );
    226217    }
     
    233224    operations++;
    234225    #endif
     226
     227    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
     228        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
     229        __prods_handoff( chan, retval );
     230        unlock( mutex_lock );
     231        return true;
     232    }
     233
    235234    if ( count == 0 ) { unlock( mutex_lock ); return false; }
     235
    236236    __do_remove( chan, retval );
    237237    unlock( mutex_lock );
     
    244244static inline [T, bool] try_remove( channel(T) & chan ) {
    245245    T retval;
    246     return [ retval, __internal_try_remove( chan, retval ) ];
    247 }
    248 
    249 static inline T try_remove( channel(T) & chan, T elem ) {
     246    bool success = __internal_try_remove( chan, retval );
     247    return [ retval, success ];
     248}
     249
     250static inline T try_remove( channel(T) & chan ) {
    250251    T retval;
    251252    __internal_try_remove( chan, retval );
     
    255256// handles closed case of insert routine
    256257static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
    257     channel_closed except{&channel_closed_vt, 0p, &chan };
     258    channel_closed except{ &channel_closed_vt, 0p, &chan };
    258259    throwResume except; // throw resumption
    259260    if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
     
    279280
    280281    // have to check for the zero size channel case
    281     if ( size == 0 && !prods`isEmpty ) {
    282         memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T));
    283         wake_one( prods );
     282    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
     283        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
     284        __prods_handoff( chan, retval );
    284285        unlock( mutex_lock );
    285286        return retval;
     
    287288
    288289    // wait if buffer is empty, work will be completed by someone else
    289     if (count == 0) {
     290    if ( count == 0 ) {
    290291        #ifdef CHAN_STATS
    291292        blocks++;
     
    299300    // Remove from buffer
    300301    __do_remove( chan, retval );
    301 
    302302    unlock( mutex_lock );
    303303    return retval;
    304304}
     305
     306///////////////////////////////////////////////////////////////////////////////////////////
     307// The following is support for waituntil (select) statements
     308///////////////////////////////////////////////////////////////////////////////////////////
     309static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) {
     310    if ( !node`isListed && !node.park_counter ) return false; // handle special OR case
     311    lock( mutex_lock );
     312    if ( node`isListed ) { // op wasn't performed
     313        #ifdef CHAN_STATS
     314        operations--;
     315        #endif
     316        remove( node );
     317        unlock( mutex_lock );
     318        return false;
     319    }
     320    unlock( mutex_lock );
     321
     322    // only return true when not special OR case, not exceptional calse and status is SAT
     323    return ( node.extra == 0p || !node.park_counter ) ? false : *node.clause_status == __SELECT_SAT;
     324}
     325
     326// type used by select statement to capture a chan read as the selected operation
     327struct chan_read {
     328    channel(T) & chan;
     329    T & ret;
     330};
     331
     332static inline void ?{}( chan_read(T) & cr, channel(T) & chan, T & ret ) {
     333    &cr.chan = &chan;
     334    &cr.ret = &ret;
     335}
     336static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ chan, ret }; return cr; }
     337
     338static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(this.chan, this) {
     339    __closed_remove( chan, ret );
     340    // if we get here then the insert succeeded
     341    __make_select_node_available( node );
     342}
     343
     344static inline bool register_select( chan_read(T) & this, select_node & node ) with(this.chan, this) {
     345    // mutex(sout) sout | "register_read";
     346    lock( mutex_lock );
     347    node.extra = &ret; // set .extra so that if it == 0p later in on_selected it is due to channel close
     348
     349    #ifdef CHAN_STATS
     350    if ( !closed ) operations++;
     351    #endif
     352
     353    // check if we can complete operation. If so race to establish winner in special OR case
     354    if ( !node.park_counter && ( count != 0 || !prods`isEmpty || unlikely(closed) ) ) {
     355        if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
     356           unlock( mutex_lock );
     357           return false;
     358        }
     359    }
     360
     361    if ( unlikely(closed) ) {
     362        unlock( mutex_lock );
     363        __handle_select_closed_read( this, node );
     364        return true;
     365    }
     366
     367    // have to check for the zero size channel case
     368    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
     369        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
     370        __prods_handoff( chan, ret );
     371        __set_avail_then_unlock( node, mutex_lock );
     372        return true;
     373    }
     374
     375    // wait if buffer is empty, work will be completed by someone else
     376    if ( count == 0 ) {
     377        #ifdef CHAN_STATS
     378        blocks++;
     379        #endif
     380       
     381        insert_last( cons, node );
     382        unlock( mutex_lock );
     383        return false;
     384    }
     385
     386    // Remove from buffer
     387    __do_remove( chan, ret );
     388    __set_avail_then_unlock( node, mutex_lock );
     389    return true;
     390}
     391static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
     392static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) {
     393    if ( node.extra == 0p ) // check if woken up due to closed channel
     394        __closed_remove( chan, ret );
     395    // This is only reachable if not closed or closed exception was handled
     396    return true;
     397}
     398
     399// type used by select statement to capture a chan write as the selected operation
     400struct chan_write {
     401    channel(T) & chan;
     402    T elem;
     403};
     404
     405static inline void ?{}( chan_write(T) & cw, channel(T) & chan, T elem ) {
     406    &cw.chan = &chan;
     407    memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );
     408}
     409static inline chan_write(T) ?>>?( T elem, channel(T) & chan ) { chan_write(T) cw{ chan, elem }; return cw; }
     410
     411static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(this.chan, this) {
     412    __closed_insert( chan, elem );
     413    // if we get here then the insert succeeded
     414    __make_select_node_available( node );
     415}
     416
     417static inline bool register_select( chan_write(T) & this, select_node & node ) with(this.chan, this) {
     418    // mutex(sout) sout | "register_write";
     419    lock( mutex_lock );
     420    node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close
     421
     422    #ifdef CHAN_STATS
     423    if ( !closed ) operations++;
     424    #endif
     425
     426    // check if we can complete operation. If so race to establish winner in special OR case
     427    if ( !node.park_counter && ( count != size || !cons`isEmpty || unlikely(closed) ) ) {
     428        if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
     429           unlock( mutex_lock );
     430           return false;
     431        }
     432    }
     433
     434    // if closed handle
     435    if ( unlikely(closed) ) {
     436        unlock( mutex_lock );
     437        __handle_select_closed_write( this, node );
     438        return true;
     439    }
     440
     441    // handle blocked consumer case via handoff (buffer is implicitly empty)
     442    ConsEmpty: if ( !cons`isEmpty ) {
     443        if ( !__handle_waituntil_OR( cons ) ) {
     444            // mutex(sout) sout | "empty";
     445            break ConsEmpty;
     446        }
     447        // mutex(sout) sout | "signal";
     448        __cons_handoff( chan, elem );
     449        __set_avail_then_unlock( node, mutex_lock );
     450        return true;
     451    }
     452
     453    // insert node in list if buffer is full, work will be completed by someone else
     454    if ( count == size ) {
     455        #ifdef CHAN_STATS
     456        blocks++;
     457        #endif
     458
     459        insert_last( prods, node );
     460        unlock( mutex_lock );
     461        return false;
     462    } // if
     463
     464    // otherwise carry out write either via normal insert
     465    __buf_insert( chan, elem );
     466    __set_avail_then_unlock( node, mutex_lock );
     467    return true;
     468}
     469static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
     470
     471static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) {
     472    if ( node.extra == 0p ) // check if woken up due to closed channel
     473        __closed_insert( chan, elem );
     474
     475    // This is only reachable if not closed or closed exception was handled
     476    return true;
     477}
     478
     479
    305480} // forall( T )
     481
     482
     483
  • libcfa/src/concurrency/future.hfa

    rbb7422a rbeeff61e  
    1919#include "monitor.hfa"
    2020#include "select.hfa"
     21#include "locks.hfa"
    2122
    2223//----------------------------------------------------------------------------
     
    2627//  future_t is lockfree and uses atomics which aren't needed given we use locks here
    2728forall( T ) {
    28     // enum(int) { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 }; // Enums seem to be broken so feel free to add this back afterwards
     29    // enum { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 }; // Enums seem to be broken so feel free to add this back afterwards
    2930
    3031    // temporary enum replacement
     
    4445    };
    4546
    46     // C_TODO: perhaps allow exceptions to be inserted like uC++?
    47 
    4847        static inline {
    4948
     
    8281        void _internal_flush( future(T) & this ) with(this) {
    8382            while( ! waiters`isEmpty ) {
     83                if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case
     84                    break; // if handle_OR returns false then waiters is empty so break
    8485                select_node &s = try_pop_front( waiters );
    8586
    86                 if ( s.race_flag == 0p )
     87                if ( s.clause_status == 0p )
    8788                    // poke in result so that woken threads do not need to reacquire any locks
    88                     // *(((future_node(T) &)s).my_result) = result;
    8989                    copy_T( result, *(((future_node(T) &)s).my_result) );
    90                 else if ( !install_select_winner( s, &this ) ) continue;
     90                else if ( !__make_select_node_available( s ) ) continue;
    9191               
    9292                // only unpark if future is not selected
     
    9797
    9898                // Fulfil the future, returns whether or not someone was unblocked
    99                 bool fulfil( future(T) & this, T & val ) with(this) {
     99                bool fulfil( future(T) & this, T val ) with(this) {
    100100            lock( lock );
    101101            if( state != FUTURE_EMPTY )
     
    153153        }
    154154
    155         void * register_select( future(T) & this, select_node & s ) with(this) {
    156             lock( lock );
    157 
    158             // future not ready -> insert select node and return 0p
     155        bool register_select( future(T) & this, select_node & s ) with(this) {
     156            lock( lock );
     157
     158            // check if we can complete operation. If so race to establish winner in special OR case
     159            if ( !s.park_counter && state != FUTURE_EMPTY ) {
     160                if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering
     161                    unlock( lock );
     162                    return false;
     163                }
     164            }
     165
     166            // future not ready -> insert select node and return
    159167            if( state == FUTURE_EMPTY ) {
    160168                insert_last( waiters, s );
    161169                unlock( lock );
    162                 return 0p;
    163             }
    164 
    165             // future ready and we won race to install it as the select winner return 1p
    166             if ( install_select_winner( s, &this ) ) {
    167                 unlock( lock );
    168                 return 1p;
    169             }
    170 
    171             unlock( lock );
    172             // future ready and we lost race to install it as the select winner
    173             return 2p;
    174         }
    175 
    176         void unregister_select( future(T) & this, select_node & s ) with(this) {
     170                return false;
     171            }
     172
     173            __make_select_node_available( s );
     174            unlock( lock );
     175            return true;
     176        }
     177
     178        bool unregister_select( future(T) & this, select_node & s ) with(this) {
     179            if ( ! s`isListed ) return false;
    177180            lock( lock );
    178181            if ( s`isListed ) remove( s );
    179182            unlock( lock );
     183            return false;
    180184        }
    181185               
     186        bool on_selected( future(T) & this, select_node & node ) { return true; }
    182187        }
    183188}
     
    186191// These futures below do not support select statements so they may not be as useful as 'future'
    187192//  however the 'single_future' is cheap and cheerful and is most likely more performant than 'future'
    188 //  since it uses raw atomics and no locks afaik
     193//  since it uses raw atomics and no locks
    189194//
    190195// As far as 'multi_future' goes I can't see many use cases as it will be less performant than 'future'
  • libcfa/src/concurrency/invoke.h

    rbb7422a rbeeff61e  
    217217                struct __thread_user_link cltr_link;
    218218
    219                 // used to point to this thd's current clh node
    220                 volatile bool * clh_node;
    221 
    222219                struct processor * last_proc;
     220
     221        // ptr used during handover between blocking lists to allow for stack allocation of intrusive nodes
     222        // main use case is wait-morphing to allow a different node to be used to block on condvar vs lock
     223        void * link_node;
    223224
    224225                PRNG_STATE_T random_state;                                              // fast random numbers
  • libcfa/src/concurrency/locks.cfa

    rbb7422a rbeeff61e  
    7979        // lock is held by some other thread
    8080        if ( owner != 0p && owner != thrd ) {
    81                 insert_last( blocked_threads, *thrd );
     81        select_node node;
     82                insert_last( blocked_threads, node );
    8283                wait_count++;
    8384                unlock( lock );
    8485                park( );
    85         }
    86         // multi acquisition lock is held by current thread
    87         else if ( owner == thrd && multi_acquisition ) {
     86        return;
     87        } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread
    8888                recursion_count++;
    89                 unlock( lock );
    90         }
    91         // lock isn't held
    92         else {
     89        } else {  // lock isn't held
    9390                owner = thrd;
    9491                recursion_count = 1;
    95                 unlock( lock );
    96         }
     92        }
     93    unlock( lock );
    9794}
    9895
     
    117114}
    118115
    119 static void pop_and_set_new_owner( blocking_lock & this ) with( this ) {
    120         thread$ * t = &try_pop_front( blocked_threads );
    121         owner = t;
    122         recursion_count = ( t ? 1 : 0 );
    123         if ( t ) wait_count--;
    124         unpark( t );
     116// static void pop_and_set_new_owner( blocking_lock & this ) with( this ) {
     117//      thread$ * t = &try_pop_front( blocked_threads );
     118//      owner = t;
     119//      recursion_count = ( t ? 1 : 0 );
     120//      if ( t ) wait_count--;
     121//      unpark( t );
     122// }
     123
     124static inline void pop_node( blocking_lock & this ) with( this ) {
     125    __handle_waituntil_OR( blocked_threads );
     126    select_node * node = &try_pop_front( blocked_threads );
     127    if ( node ) {
     128        wait_count--;
     129        owner = node->blocked_thread;
     130        recursion_count = 1;
     131        // if ( !node->clause_status || __make_select_node_available( *node ) ) unpark( node->blocked_thread );
     132        wake_one( blocked_threads, *node );
     133    } else {
     134        owner = 0p;
     135        recursion_count = 0;
     136    }
    125137}
    126138
     
    134146        recursion_count--;
    135147        if ( recursion_count == 0 ) {
    136                 pop_and_set_new_owner( this );
     148                pop_node( this );
    137149        }
    138150        unlock( lock );
     
    147159        // lock held
    148160        if ( owner != 0p ) {
    149                 insert_last( blocked_threads, *t );
     161                insert_last( blocked_threads, *(select_node *)t->link_node );
    150162                wait_count++;
    151                 unlock( lock );
    152163        }
    153164        // lock not held
     
    156167                recursion_count = 1;
    157168                unpark( t );
    158                 unlock( lock );
    159         }
     169        }
     170    unlock( lock );
    160171}
    161172
     
    167178        size_t ret = recursion_count;
    168179
    169         pop_and_set_new_owner( this );
     180        pop_node( this );
     181
     182    select_node node;
     183    active_thread()->link_node = (void *)&node;
    170184        unlock( lock );
     185
     186    park();
     187
    171188        return ret;
    172189}
     
    175192        recursion_count = recursion;
    176193}
     194
     195// waituntil() support
     196bool register_select( blocking_lock & this, select_node & node ) with(this) {
     197    lock( lock __cfaabi_dbg_ctx2 );
     198        thread$ * thrd = active_thread();
     199
     200        // single acquisition lock is held by current thread
     201        /* paranoid */ verifyf( owner != thrd || multi_acquisition, "Single acquisition lock holder (%p) attempted to reacquire the lock %p resulting in a deadlock.", owner, &this );
     202
     203    if ( !node.park_counter && ( (owner == thrd && multi_acquisition) || owner == 0p ) ) { // OR special case
     204        if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
     205           unlock( lock );
     206           return false;
     207        }
     208    }
     209
     210        // lock is held by some other thread
     211        if ( owner != 0p && owner != thrd ) {
     212                insert_last( blocked_threads, node );
     213                wait_count++;
     214                unlock( lock );
     215        return false;
     216        } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread
     217                recursion_count++;
     218        } else {  // lock isn't held
     219                owner = thrd;
     220                recursion_count = 1;
     221        }
     222
     223    if ( node.park_counter ) __make_select_node_available( node );
     224    unlock( lock );
     225    return true;
     226}
     227
     228bool unregister_select( blocking_lock & this, select_node & node ) with(this) {
     229    lock( lock __cfaabi_dbg_ctx2 );
     230    if ( node`isListed ) {
     231        remove( node );
     232        wait_count--;
     233        unlock( lock );
     234        return false;
     235    }
     236   
     237    if ( owner == active_thread() ) {
     238        /* paranoid */ verifyf( recursion_count == 1 || multi_acquisition, "Thread %p attempted to unlock owner lock %p in waituntil unregister, which is not recursive but has a recursive count of %zu", active_thread(), &this, recursion_count );
     239        // if recursion count is zero release lock and set new owner if one is waiting
     240        recursion_count--;
     241        if ( recursion_count == 0 ) {
     242            pop_node( this );
     243        }
     244    }
     245        unlock( lock );
     246    return false;
     247}
     248
     249bool on_selected( blocking_lock & this, select_node & node ) { return true; }
    177250
    178251//-----------------------------------------------------------------------------
     
    311384        int counter( condition_variable(L) & this ) with(this) { return count; }
    312385
    313         static size_t queue_and_get_recursion( condition_variable(L) & this, info_thread(L) * i ) with(this) {
     386        static void enqueue_thread( condition_variable(L) & this, info_thread(L) * i ) with(this) {
    314387                // add info_thread to waiting queue
    315388                insert_last( blocked_threads, *i );
    316389                count++;
    317                 size_t recursion_count = 0;
    318                 if (i->lock) {
     390                // size_t recursion_count = 0;
     391                // if (i->lock) {
     392                //      // if lock was passed get recursion count to reset to after waking thread
     393                //      recursion_count = on_wait( *i->lock );
     394                // }
     395                // return recursion_count;
     396        }
     397
     398    static size_t block_and_get_recursion( info_thread(L) & i ) {
     399        size_t recursion_count = 0;
     400                if ( i.lock ) {
    319401                        // if lock was passed get recursion count to reset to after waking thread
    320                         recursion_count = on_wait( *i->lock );
    321                 }
    322                 return recursion_count;
    323         }
     402                        recursion_count = on_wait( *i.lock ); // this call blocks
     403                } else park( );
     404        return recursion_count;
     405    }
    324406
    325407        // helper for wait()'s' with no timeout
    326408        static void queue_info_thread( condition_variable(L) & this, info_thread(L) & i ) with(this) {
    327409                lock( lock __cfaabi_dbg_ctx2 );
    328                 size_t recursion_count = queue_and_get_recursion(this, &i);
     410        enqueue_thread( this, &i );
     411                // size_t recursion_count = queue_and_get_recursion( this, &i );
    329412                unlock( lock );
    330413
    331414                // blocks here
    332                 park( );
     415        size_t recursion_count = block_and_get_recursion( i );
     416                // park( );
    333417
    334418                // resets recursion count here after waking
    335                 if (i.lock) on_wakeup(*i.lock, recursion_count);
     419                if ( i.lock ) on_wakeup( *i.lock, recursion_count );
    336420        }
    337421
     
    343427        static void queue_info_thread_timeout( condition_variable(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) {
    344428                lock( lock __cfaabi_dbg_ctx2 );
    345                 size_t recursion_count = queue_and_get_recursion(this, &info);
     429        enqueue_thread( this, &info );
     430                // size_t recursion_count = queue_and_get_recursion( this, &info );
    346431                alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info };
    347432                unlock( lock );
     
    351436
    352437                // blocks here
    353                 park();
     438        size_t recursion_count = block_and_get_recursion( info );
     439                // park();
    354440
    355441                // unregisters alarm so it doesn't go off if this happens first
     
    357443
    358444                // resets recursion count here after waking
    359                 if (info.lock) on_wakeup(*info.lock, recursion_count);
     445                if ( info.lock ) on_wakeup( *info.lock, recursion_count );
    360446        }
    361447
     
    417503                info_thread( L ) i = { active_thread(), info, &l };
    418504                insert_last( blocked_threads, i );
    419                 size_t recursion_count = on_wait( *i.lock );
    420                 park( );
     505                size_t recursion_count = on_wait( *i.lock ); // blocks here
     506                // park( );
    421507                on_wakeup(*i.lock, recursion_count);
    422508        }
     
    459545        bool empty ( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty; }
    460546
    461         static size_t queue_and_get_recursion( pthread_cond_var(L) & this, info_thread(L) * i ) with(this) {
    462                 // add info_thread to waiting queue
    463                 insert_last( blocked_threads, *i );
    464                 size_t recursion_count = 0;
    465                 recursion_count = on_wait( *i->lock );
    466                 return recursion_count;
    467         }
     547        // static size_t queue_and_get_recursion( pthread_cond_var(L) & this, info_thread(L) * i ) with(this) {
     548        //      // add info_thread to waiting queue
     549        //      insert_last( blocked_threads, *i );
     550        //      size_t recursion_count = 0;
     551        //      recursion_count = on_wait( *i->lock );
     552        //      return recursion_count;
     553        // }
     554
    468555       
    469556        static void queue_info_thread_timeout( pthread_cond_var(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) {
    470557                lock( lock __cfaabi_dbg_ctx2 );
    471                 size_t recursion_count = queue_and_get_recursion(this, &info);
     558                // size_t recursion_count = queue_and_get_recursion(this, &info);
     559        insert_last( blocked_threads, info );
    472560                pthread_alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info };
    473561                unlock( lock );
     
    477565
    478566                // blocks here
    479                 park();
     567        size_t recursion_count = block_and_get_recursion( info );
     568                // park();
    480569
    481570                // unregisters alarm so it doesn't go off if this happens first
     
    483572
    484573                // resets recursion count here after waking
    485                 if (info.lock) on_wakeup(*info.lock, recursion_count);
     574                if ( info.lock ) on_wakeup( *info.lock, recursion_count );
    486575        }
    487576
     
    493582                lock( lock __cfaabi_dbg_ctx2 );
    494583                info_thread( L ) i = { active_thread(), info, &l };
    495                 size_t recursion_count = queue_and_get_recursion(this, &i);
    496                 unlock( lock );
    497                 park( );
    498                 on_wakeup(*i.lock, recursion_count);
     584        insert_last( blocked_threads, i );
     585                // size_t recursion_count = queue_and_get_recursion( this, &i );
     586                unlock( lock );
     587
     588        // blocks here
     589                size_t recursion_count = block_and_get_recursion( i );
     590                // park();
     591                on_wakeup( *i.lock, recursion_count );
    499592        }
    500593
  • libcfa/src/concurrency/locks.hfa

    rbb7422a rbeeff61e  
    3030#include "time.hfa"
    3131
     32#include "select.hfa"
     33
    3234#include <fstream.hfa>
    3335
     
    7072static inline void   on_wakeup( single_acquisition_lock & this, size_t v ) { on_wakeup ( (blocking_lock &)this, v ); }
    7173static inline void   on_notify( single_acquisition_lock & this, struct thread$ * t ) { on_notify( (blocking_lock &)this, t ); }
     74static inline bool   register_select( single_acquisition_lock & this, select_node & node ) { return register_select( (blocking_lock &)this, node ); }
     75static inline bool   unregister_select( single_acquisition_lock & this, select_node & node ) { return unregister_select( (blocking_lock &)this, node ); }
     76static inline bool   on_selected( single_acquisition_lock & this, select_node & node ) { return on_selected( (blocking_lock &)this, node ); }
    7277
    7378//----------
     
    8489static inline void   on_wakeup( owner_lock & this, size_t v ) { on_wakeup ( (blocking_lock &)this, v ); }
    8590static inline void   on_notify( owner_lock & this, struct thread$ * t ) { on_notify( (blocking_lock &)this, t ); }
     91static inline bool   register_select( owner_lock & this, select_node & node ) { return register_select( (blocking_lock &)this, node ); }
     92static inline bool   unregister_select( owner_lock & this, select_node & node ) { return unregister_select( (blocking_lock &)this, node ); }
     93static inline bool   on_selected( owner_lock & this, select_node & node ) { return on_selected( (blocking_lock &)this, node ); }
    8694
    8795//-----------------------------------------------------------------------------
     
    180188
    181189// if this is called recursively IT WILL DEADLOCK!!!!!
    182 static inline void lock(futex_mutex & this) with(this) {
     190static inline void lock( futex_mutex & this ) with(this) {
    183191        int state;
    184192
     
    190198                for (int i = 0; i < spin; i++) Pause();
    191199        }
    192 
    193         // // no contention try to acquire
    194         // if (internal_try_lock(this, state)) return;
    195200       
    196201        // if not in contended state, set to be in contended state
     
    213218
    214219static inline void on_notify( futex_mutex & f, thread$ * t){ unpark(t); }
    215 static inline size_t on_wait( futex_mutex & f ) {unlock(f); return 0;}
     220static inline size_t on_wait( futex_mutex & f ) { unlock(f); park(); return 0; }
    216221
    217222// to set recursion count after getting signalled;
     
    244249
    245250// if this is called recursively IT WILL DEADLOCK!!!!!
    246 static inline void lock(go_mutex & this) with(this) {
     251static inline void lock( go_mutex & this ) with( this ) {
    247252        int state, init_state;
    248253
     
    255260            while( !val ) { // lock unlocked
    256261                state = 0;
    257                 if (internal_try_lock(this, state, init_state)) return;
     262                if ( internal_try_lock( this, state, init_state ) ) return;
    258263            }
    259264            for (int i = 0; i < 30; i++) Pause();
     
    262267        while( !val ) { // lock unlocked
    263268            state = 0;
    264             if (internal_try_lock(this, state, init_state)) return;
     269            if ( internal_try_lock( this, state, init_state ) ) return;
    265270        }
    266271        sched_yield();
    267272       
    268273        // if not in contended state, set to be in contended state
    269         state = internal_exchange(this, 2);
     274        state = internal_exchange( this, 2 );
    270275        if ( !state ) return; // state == 0
    271276        init_state = 2;
    272         futex((int*)&val, FUTEX_WAIT, 2); // if val is not 2 this returns with EWOULDBLOCK
     277        futex( (int*)&val, FUTEX_WAIT, 2 ); // if val is not 2 this returns with EWOULDBLOCK
    273278    }
    274279}
     
    276281static inline void unlock( go_mutex & this ) with(this) {
    277282        // if uncontended do atomic unlock and then return
    278     if (__atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1) return;
     283    if ( __atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1 ) return;
    279284       
    280285        // otherwise threads are blocked so we must wake one
    281         futex((int *)&val, FUTEX_WAKE, 1);
    282 }
    283 
    284 static inline void on_notify( go_mutex & f, thread$ * t){ unpark(t); }
    285 static inline size_t on_wait( go_mutex & f ) {unlock(f); return 0;}
     286        futex( (int *)&val, FUTEX_WAKE, 1 );
     287}
     288
     289static inline void on_notify( go_mutex & f, thread$ * t){ unpark( t ); }
     290static inline size_t on_wait( go_mutex & f ) { unlock( f ); park(); return 0; }
    286291static inline void on_wakeup( go_mutex & f, size_t recursion ) {}
    287 
    288 //-----------------------------------------------------------------------------
    289 // CLH Spinlock
    290 // - No recursive acquisition
    291 // - Needs to be released by owner
    292 
    293 struct clh_lock {
    294         volatile bool * volatile tail;
    295     volatile bool * volatile head;
    296 };
    297 
    298 static inline void  ?{}( clh_lock & this ) { this.tail = malloc(); *this.tail = true; }
    299 static inline void ^?{}( clh_lock & this ) { free(this.tail); }
    300 
    301 static inline void lock(clh_lock & l) {
    302         thread$ * curr_thd = active_thread();
    303         *(curr_thd->clh_node) = false;
    304         volatile bool * prev = __atomic_exchange_n((bool **)(&l.tail), (bool *)(curr_thd->clh_node), __ATOMIC_SEQ_CST);
    305         while(!__atomic_load_n(prev, __ATOMIC_SEQ_CST)) Pause();
    306     __atomic_store_n((bool **)(&l.head), (bool *)curr_thd->clh_node, __ATOMIC_SEQ_CST);
    307     curr_thd->clh_node = prev;
    308 }
    309 
    310 static inline void unlock(clh_lock & l) {
    311         __atomic_store_n((bool *)(l.head), true, __ATOMIC_SEQ_CST);
    312 }
    313 
    314 static inline void on_notify(clh_lock & this, struct thread$ * t ) { unpark(t); }
    315 static inline size_t on_wait(clh_lock & this) { unlock(this); return 0; }
    316 static inline void on_wakeup(clh_lock & this, size_t recursion ) { lock(this); }
    317292
    318293//-----------------------------------------------------------------------------
     
    337312static inline void  ^?{}( exp_backoff_then_block_lock & this ){}
    338313
    339 static inline bool internal_try_lock(exp_backoff_then_block_lock & this, size_t & compare_val) with(this) {
     314static inline bool internal_try_lock( exp_backoff_then_block_lock & this, size_t & compare_val ) with(this) {
    340315        return __atomic_compare_exchange_n(&lock_value, &compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
    341316}
    342317
    343 static inline bool try_lock(exp_backoff_then_block_lock & this) { size_t compare_val = 0; return internal_try_lock(this, compare_val); }
    344 
    345 static inline bool try_lock_contention(exp_backoff_then_block_lock & this) with(this) {
    346         return !__atomic_exchange_n(&lock_value, 2, __ATOMIC_ACQUIRE);
    347 }
    348 
    349 static inline bool block(exp_backoff_then_block_lock & this) with(this) {
     318static inline bool try_lock( exp_backoff_then_block_lock & this ) { size_t compare_val = 0; return internal_try_lock( this, compare_val ); }
     319
     320static inline bool try_lock_contention( exp_backoff_then_block_lock & this ) with(this) {
     321        return !__atomic_exchange_n( &lock_value, 2, __ATOMIC_ACQUIRE );
     322}
     323
     324static inline bool block( exp_backoff_then_block_lock & this ) with(this) {
    350325    lock( spinlock __cfaabi_dbg_ctx2 );
    351326    if (__atomic_load_n( &lock_value, __ATOMIC_SEQ_CST) != 2) {
     
    359334}
    360335
    361 static inline void lock(exp_backoff_then_block_lock & this) with(this) {
     336static inline void lock( exp_backoff_then_block_lock & this ) with(this) {
    362337        size_t compare_val = 0;
    363338        int spin = 4;
     
    378353}
    379354
    380 static inline void unlock(exp_backoff_then_block_lock & this) with(this) {
     355static inline void unlock( exp_backoff_then_block_lock & this ) with(this) {
    381356    if (__atomic_exchange_n(&lock_value, 0, __ATOMIC_RELEASE) == 1) return;
    382357    lock( spinlock __cfaabi_dbg_ctx2 );
     
    386361}
    387362
    388 static inline void on_notify(exp_backoff_then_block_lock & this, struct thread$ * t ) { unpark(t); }
    389 static inline size_t on_wait(exp_backoff_then_block_lock & this) { unlock(this); return 0; }
    390 static inline void on_wakeup(exp_backoff_then_block_lock & this, size_t recursion ) { lock(this); }
     363static inline void on_notify( exp_backoff_then_block_lock & this, struct thread$ * t ) { unpark( t ); }
     364static inline size_t on_wait( exp_backoff_then_block_lock & this ) { unlock( this ); park(); return 0; }
     365static inline void on_wakeup( exp_backoff_then_block_lock & this, size_t recursion ) { lock( this ); }
    391366
    392367//-----------------------------------------------------------------------------
     
    418393
    419394// if this is called recursively IT WILL DEADLOCK!!!!!
    420 static inline void lock(fast_block_lock & this) with(this) {
     395static inline void lock( fast_block_lock & this ) with(this) {
    421396        lock( lock __cfaabi_dbg_ctx2 );
    422397        if ( held ) {
     
    430405}
    431406
    432 static inline void unlock(fast_block_lock & this) with(this) {
     407static inline void unlock( fast_block_lock & this ) with(this) {
    433408        lock( lock __cfaabi_dbg_ctx2 );
    434409        /* paranoid */ verifyf( held != false, "Attempt to release lock %p that isn't held", &this );
     
    439414}
    440415
    441 static inline void on_notify(fast_block_lock & this, struct thread$ * t ) with(this) {
     416static inline void on_notify( fast_block_lock & this, struct thread$ * t ) with(this) {
    442417    lock( lock __cfaabi_dbg_ctx2 );
    443418    insert_last( blocked_threads, *t );
    444419    unlock( lock );
    445420}
    446 static inline size_t on_wait(fast_block_lock & this) { unlock(this); return 0; }
    447 static inline void on_wakeup(fast_block_lock & this, size_t recursion ) { }
     421static inline size_t on_wait( fast_block_lock & this) { unlock(this); park(); return 0; }
     422static inline void on_wakeup( fast_block_lock & this, size_t recursion ) { }
    448423
    449424//-----------------------------------------------------------------------------
     
    456431struct simple_owner_lock {
    457432        // List of blocked threads
    458         dlist( thread$ ) blocked_threads;
     433        dlist( select_node ) blocked_threads;
    459434
    460435        // Spin lock used for mutual exclusion
     
    477452static inline void ?=?( simple_owner_lock & this, simple_owner_lock this2 ) = void;
    478453
    479 static inline void lock(simple_owner_lock & this) with(this) {
    480         if (owner == active_thread()) {
     454static inline void lock( simple_owner_lock & this ) with(this) {
     455        if ( owner == active_thread() ) {
    481456                recursion_count++;
    482457                return;
     
    484459        lock( lock __cfaabi_dbg_ctx2 );
    485460
    486         if (owner != 0p) {
    487                 insert_last( blocked_threads, *active_thread() );
     461        if ( owner != 0p ) {
     462        select_node node;
     463                insert_last( blocked_threads, node );
    488464                unlock( lock );
    489465                park( );
     
    495471}
    496472
    497 // TODO: fix duplicate def issue and bring this back
    498 // void pop_and_set_new_owner( simple_owner_lock & this ) with( this ) {
    499         // thread$ * t = &try_pop_front( blocked_threads );
    500         // owner = t;
    501         // recursion_count = ( t ? 1 : 0 );
    502         // unpark( t );
    503 // }
    504 
    505 static inline void unlock(simple_owner_lock & this) with(this) {
     473static inline void pop_node( simple_owner_lock & this ) with(this) {
     474    __handle_waituntil_OR( blocked_threads );
     475    select_node * node = &try_pop_front( blocked_threads );
     476    if ( node ) {
     477        owner = node->blocked_thread;
     478        recursion_count = 1;
     479        // if ( !node->clause_status || __make_select_node_available( *node ) ) unpark( node->blocked_thread );
     480        wake_one( blocked_threads, *node );
     481    } else {
     482        owner = 0p;
     483        recursion_count = 0;
     484    }
     485}
     486
     487static inline void unlock( simple_owner_lock & this ) with(this) {
    506488        lock( lock __cfaabi_dbg_ctx2 );
    507489        /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
     
    510492        recursion_count--;
    511493        if ( recursion_count == 0 ) {
    512                 // pop_and_set_new_owner( this );
    513                 thread$ * t = &try_pop_front( blocked_threads );
    514                 owner = t;
    515                 recursion_count = ( t ? 1 : 0 );
    516                 unpark( t );
     494                pop_node( this );
    517495        }
    518496        unlock( lock );
    519497}
    520498
    521 static inline void on_notify(simple_owner_lock & this, struct thread$ * t ) with(this) {
     499static inline void on_notify(simple_owner_lock & this, thread$ * t ) with(this) {
    522500        lock( lock __cfaabi_dbg_ctx2 );
    523501        // lock held
    524502        if ( owner != 0p ) {
    525                 insert_last( blocked_threads, *t );
     503                insert_last( blocked_threads, *(select_node *)t->link_node );
    526504        }
    527505        // lock not held
     
    534512}
    535513
    536 static inline size_t on_wait(simple_owner_lock & this) with(this) {
     514static inline size_t on_wait( simple_owner_lock & this ) with(this) {
    537515        lock( lock __cfaabi_dbg_ctx2 );
    538516        /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
     
    541519        size_t ret = recursion_count;
    542520
    543         // pop_and_set_new_owner( this );
    544 
    545         thread$ * t = &try_pop_front( blocked_threads );
    546         owner = t;
    547         recursion_count = ( t ? 1 : 0 );
    548         unpark( t );
    549 
     521        pop_node( this );
     522
     523    select_node node;
     524    active_thread()->link_node = (void *)&node;
    550525        unlock( lock );
     526    park();
     527
    551528        return ret;
    552529}
    553530
    554 static inline void on_wakeup(simple_owner_lock & this, size_t recursion ) with(this) { recursion_count = recursion; }
     531static inline void on_wakeup( simple_owner_lock & this, size_t recursion ) with(this) { recursion_count = recursion; }
     532
     533// waituntil() support
     534static inline bool register_select( simple_owner_lock & this, select_node & node ) with(this) {
     535    lock( lock __cfaabi_dbg_ctx2 );
     536
     537    // check if we can complete operation. If so race to establish winner in special OR case
     538    if ( !node.park_counter && ( owner == active_thread() || owner == 0p ) ) {
     539        if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
     540           unlock( lock );
     541           return false;
     542        }
     543    }
     544
     545    if ( owner == active_thread() ) {
     546                recursion_count++;
     547        if ( node.park_counter ) __make_select_node_available( node );
     548        unlock( lock );
     549                return true;
     550        }
     551
     552    if ( owner != 0p ) {
     553                insert_last( blocked_threads, node );
     554                unlock( lock );
     555                return false;
     556        }
     557   
     558        owner = active_thread();
     559        recursion_count = 1;
     560
     561    if ( node.park_counter ) __make_select_node_available( node );
     562    unlock( lock );
     563    return true;
     564}
     565
     566static inline bool unregister_select( simple_owner_lock & this, select_node & node ) with(this) {
     567    lock( lock __cfaabi_dbg_ctx2 );
     568    if ( node`isListed ) {
     569        remove( node );
     570        unlock( lock );
     571        return false;
     572    }
     573
     574    if ( owner == active_thread() ) {
     575        recursion_count--;
     576        if ( recursion_count == 0 ) {
     577            pop_node( this );
     578        }
     579    }
     580    unlock( lock );
     581    return false;
     582}
     583
     584static inline bool on_selected( simple_owner_lock & this, select_node & node ) { return true; }
     585
    555586
    556587//-----------------------------------------------------------------------------
     
    578609
    579610// if this is called recursively IT WILL DEADLOCK!
    580 static inline void lock(spin_queue_lock & this) with(this) {
     611static inline void lock( spin_queue_lock & this ) with(this) {
    581612        mcs_spin_node node;
    582613        lock( lock, node );
     
    586617}
    587618
    588 static inline void unlock(spin_queue_lock & this) with(this) {
     619static inline void unlock( spin_queue_lock & this ) with(this) {
    589620        __atomic_store_n(&held, false, __ATOMIC_RELEASE);
    590621}
    591622
    592 static inline void on_notify(spin_queue_lock & this, struct thread$ * t ) {
     623static inline void on_notify( spin_queue_lock & this, struct thread$ * t ) {
    593624        unpark(t);
    594625}
    595 static inline size_t on_wait(spin_queue_lock & this) { unlock(this); return 0; }
    596 static inline void on_wakeup(spin_queue_lock & this, size_t recursion ) { lock(this); }
     626static inline size_t on_wait( spin_queue_lock & this ) { unlock( this ); park(); return 0; }
     627static inline void on_wakeup( spin_queue_lock & this, size_t recursion ) { lock( this ); }
    597628
    598629
     
    621652
    622653// if this is called recursively IT WILL DEADLOCK!!!!!
    623 static inline void lock(mcs_block_spin_lock & this) with(this) {
     654static inline void lock( mcs_block_spin_lock & this ) with(this) {
    624655        mcs_node node;
    625656        lock( lock, node );
     
    633664}
    634665
    635 static inline void on_notify(mcs_block_spin_lock & this, struct thread$ * t ) { unpark(t); }
    636 static inline size_t on_wait(mcs_block_spin_lock & this) { unlock(this); return 0; }
    637 static inline void on_wakeup(mcs_block_spin_lock & this, size_t recursion ) {lock(this); }
     666static inline void on_notify( mcs_block_spin_lock & this, struct thread$ * t ) { unpark( t ); }
     667static inline size_t on_wait( mcs_block_spin_lock & this) { unlock( this ); park(); return 0; }
     668static inline void on_wakeup( mcs_block_spin_lock & this, size_t recursion ) {lock( this ); }
    638669
    639670//-----------------------------------------------------------------------------
     
    661692
    662693// if this is called recursively IT WILL DEADLOCK!!!!!
    663 static inline void lock(block_spin_lock & this) with(this) {
     694static inline void lock( block_spin_lock & this ) with(this) {
    664695        lock( lock );
    665696        while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause();
     
    668699}
    669700
    670 static inline void unlock(block_spin_lock & this) with(this) {
     701static inline void unlock( block_spin_lock & this ) with(this) {
    671702        __atomic_store_n(&held, false, __ATOMIC_RELEASE);
    672703}
    673704
    674 static inline void on_notify(block_spin_lock & this, struct thread$ * t ) with(this.lock) {
     705static inline void on_notify( block_spin_lock & this, struct thread$ * t ) with(this.lock) {
    675706        // first we acquire internal fast_block_lock
    676707        lock( lock __cfaabi_dbg_ctx2 );
     
    686717        unpark(t);
    687718}
    688 static inline size_t on_wait(block_spin_lock & this) { unlock(this); return 0; }
    689 static inline void on_wakeup(block_spin_lock & this, size_t recursion ) with(this) {
     719static inline size_t on_wait( block_spin_lock & this ) { unlock( this ); park(); return 0; }
     720static inline void on_wakeup( block_spin_lock & this, size_t recursion ) with(this) {
    690721        // now we acquire the entire block_spin_lock upon waking up
    691722        while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause();
     
    714745forall(L & | is_blocking_lock(L)) {
    715746        struct info_thread;
    716 
    717         // // for use by sequence
    718         // info_thread(L) *& Back( info_thread(L) * this );
    719         // info_thread(L) *& Next( info_thread(L) * this );
    720747}
    721748
  • libcfa/src/concurrency/mutex_stmt.hfa

    rbb7422a rbeeff61e  
    1515};
    1616
    17 
    1817struct __mutex_stmt_lock_guard {
    1918    void ** lockarr;
     
    3029
    3130forall(L & | is_lock(L)) {
    32 
    33     struct scoped_lock {
    34         L * internal_lock;
    35     };
    36 
    37     static inline void ?{}( scoped_lock(L) & this, L & internal_lock ) {
    38         this.internal_lock = &internal_lock;
    39         lock(internal_lock);
    40     }
    41    
    42     static inline void ^?{}( scoped_lock(L) & this ) with(this) {
    43         unlock(*internal_lock);
    44     }
    45 
    46     static inline void * __get_mutexstmt_lock_ptr( L & this ) {
    47         return &this;
    48     }
    49 
    50     static inline L __get_mutexstmt_lock_type( L & this );
    51 
    52     static inline L __get_mutexstmt_lock_type( L * this );
     31    static inline void * __get_mutexstmt_lock_ptr( L & this ) { return &this; }
     32    static inline L __get_mutexstmt_lock_type( L & this ) {}
     33    static inline L __get_mutexstmt_lock_type( L * this ) {}
    5334}
  • libcfa/src/concurrency/select.hfa

    rbb7422a rbeeff61e  
    22
    33#include "containers/list.hfa"
    4 #include <stdint.h>
    5 #include <kernel.hfa>
    6 #include <locks.hfa>
     4#include "stdint.h"
     5#include "kernel.hfa"
    76
     7struct select_node;
     8
     9// node status
     10static const unsigned long int __SELECT_UNSAT = 0;
     11static const unsigned long int __SELECT_SAT = 1;
     12static const unsigned long int __SELECT_RUN = 2;
     13
     14static inline bool __CFA_has_clause_run( unsigned long int status ) { return status == __SELECT_RUN; }
     15static inline void __CFA_maybe_park( int * park_counter ) {
     16    if ( __atomic_sub_fetch( park_counter, 1, __ATOMIC_SEQ_CST) < 0 )
     17        park();
     18}
     19
     20// node used for coordinating waituntil synchronization
    821struct select_node {
     22    int * park_counter;                 // If this is 0p then the node is in a special OR case waituntil
     23    unsigned long int * clause_status;  // needs to point at ptr sized location, if this is 0p then node is not part of a waituntil
     24
     25    void * extra;                       // used to store arbitrary data needed by some primitives
     26
    927    thread$ * blocked_thread;
    10     void ** race_flag;
    1128    inline dlink(select_node);
    1229};
    1330P9_EMBEDDED( select_node, dlink(select_node) )
    1431
    15 void ?{}( select_node & this ) {
    16     this.blocked_thread = 0p;
    17     this.race_flag = 0p;
     32static inline void ?{}( select_node & this ) {
     33    this.blocked_thread = active_thread();
     34    this.clause_status = 0p;
     35    this.park_counter = 0p;
     36    this.extra = 0p;
    1837}
    1938
    20 void ?{}( select_node & this, thread$ * blocked_thread ) {
     39static inline void ?{}( select_node & this, thread$ * blocked_thread ) {
    2140    this.blocked_thread = blocked_thread;
    22     this.race_flag = 0p;
     41    this.clause_status = 0p;
     42    this.park_counter = 0p;
     43    this.extra = 0p;
    2344}
    2445
    25 void ?{}( select_node & this, thread$ * blocked_thread, void ** race_flag ) {
     46static inline void ?{}( select_node & this, thread$ * blocked_thread, void * extra ) {
    2647    this.blocked_thread = blocked_thread;
    27     this.race_flag = race_flag;
     48    this.clause_status = 0p;
     49    this.park_counter = 0p;
     50    this.extra = extra;
    2851}
    2952
    30 void ^?{}( select_node & this ) {}
     53static inline void ^?{}( select_node & this ) {}
    3154
     55static inline unsigned long int * __get_clause_status( select_node & s ) { return s.clause_status; }
    3256
    3357//-----------------------------------------------------------------------------
    3458// is_selectable
    35 trait is_selectable(T & | sized(T)) {
    36     // For registering a select on a selectable concurrency primitive
    37     // return 0p if primitive not accessible yet
    38     // return 1p if primitive gets acquired
    39     // return 2p if primitive is accessible but some other primitive won the race
    40     // C_TODO: add enum for return values
    41     void * register_select( T &, select_node & );
     59forall(T & | sized(T))
     60trait is_selectable {
     61    // For registering a select stmt on a selectable concurrency primitive
     62    // Returns bool that indicates if operation is already SAT
     63    bool register_select( T &, select_node & );
    4264
    43     void unregister_select( T &, select_node &  );
     65    // For unregistering a select stmt on a selectable concurrency primitive
     66    // If true is returned then the corresponding code block is run (only in non-special OR case and only if node status is not RUN)
     67    bool unregister_select( T &, select_node &  );
     68
     69    // This routine is run on the selecting thread prior to executing the statement corresponding to the select_node
     70    //    passed as an arg to this routine
     71    // If on_selected returns false, the statement is not run, if it returns true it is run.
     72    bool on_selected( T &, select_node & );
    4473};
    4574
    46 static inline bool install_select_winner( select_node & this, void * primitive_ptr ) with(this) {
    47     // temporary needed for atomic instruction
    48     void * cmp_flag = 0p;
    49    
    50     // if we dont win the selector race we need to potentially
    51     //   ignore this node and move to the next one so we return accordingly
    52     if ( *race_flag != 0p ||
    53         !__atomic_compare_exchange_n(
    54             race_flag,
    55             &cmp_flag,
    56             primitive_ptr,
    57             false,
    58             __ATOMIC_SEQ_CST,
    59             __ATOMIC_SEQ_CST
    60         )
    61     ) return false; // lost race and some other node triggered select
    62     return true; // won race so this node is what the select proceeds with
     75// this is used inside the compiler to attempt to establish an else clause as a winner in the OR special case race
     76static inline bool __select_node_else_race( select_node & this ) with( this ) {
     77    unsigned long int cmp_status = __SELECT_UNSAT;
     78    return *clause_status == 0
     79            && __atomic_compare_exchange_n( clause_status, &cmp_status, 1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST );
    6380}
     81
     82// when a primitive becomes available it calls the following routine on it's node to update the select state:
     83// return true if we want to unpark the thd
     84static inline bool __make_select_node_available( select_node & this ) with( this ) {
     85    unsigned long int cmp_status = __SELECT_UNSAT;
     86
     87    if( !park_counter )
     88        return *clause_status == 0
     89            && __atomic_compare_exchange_n( clause_status, &cmp_status, (unsigned long int)&this, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ); // OR specific case where race was won
     90
     91    return *clause_status == 0
     92        && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) // can maybe just use atomic write
     93        && !__atomic_add_fetch( park_counter, 1, __ATOMIC_SEQ_CST);
     94}
     95
     96// Handles the special OR case of the waituntil statement
     97// Since only one select node can win in the OR case, we need to race to set the node available BEFORE
     98//    performing the operation since if we lose the race the operation should not be performed as it will be lost
     99// Returns true if execution can continue normally and false if the queue has now been drained
     100static inline bool __handle_waituntil_OR( dlist( select_node ) & queue ) {
     101    if ( queue`isEmpty ) return false;
     102    if ( queue`first.clause_status && !queue`first.park_counter ) {
     103        while ( !queue`isEmpty ) {
     104            // if node not a special OR case or if we win the special OR case race break
     105            if ( !queue`first.clause_status || queue`first.park_counter || __make_select_node_available( queue`first ) ) { return true; }
     106            // otherwise we lost the special OR race so discard node
     107            try_pop_front( queue );
     108        }
     109        return false;
     110    }
     111    return true;
     112}
     113
     114// wake one thread from the list
     115static inline void wake_one( dlist( select_node ) & queue, select_node & popped ) {
     116    if ( !popped.clause_status                              // normal case, node is not a select node
     117        || ( popped.clause_status && !popped.park_counter ) // If popped link is special case OR selecting unpark but don't call __make_select_node_available
     118        || __make_select_node_available( popped ) )         // check if popped link belongs to a selecting thread
     119        unpark( popped.blocked_thread );
     120}
     121
     122static inline void wake_one( dlist( select_node ) & queue ) { wake_one( queue, try_pop_front( queue ) ); }
     123
     124static inline void setup_clause( select_node & this, unsigned long int * clause_status, int * park_counter ) {
     125    this.blocked_thread = active_thread();
     126    this.clause_status = clause_status;
     127    this.park_counter = park_counter;
     128}
     129
  • libcfa/src/concurrency/thread.cfa

    rbb7422a rbeeff61e  
    5353        preferred = ready_queue_new_preferred();
    5454        last_proc = 0p;
     55    link_node = 0p;
    5556        PRNG_SET_SEED( random_state, __global_random_mask ? __global_random_prime : __global_random_prime ^ rdtscl() );
    5657        #if defined( __CFA_WITH_VERIFY__ )
     
    5960        #endif
    6061
    61         clh_node = malloc( );
    62         *clh_node = false;
    63 
    6462        doregister(curr_cluster, this);
    6563        monitors{ &self_mon_p, 1, (fptr_t)0 };
     
    7068                canary = 0xDEADDEADDEADDEADp;
    7169        #endif
    72         free(clh_node);
    7370        unregister(curr_cluster, this);
    7471        ^self_cor{};
Note: See TracChangeset for help on using the changeset viewer.