Changeset 3982384 for libcfa


Ignore:
Timestamp:
May 17, 2023, 1:35:09 AM (2 years ago)
Author:
JiadaL <j82liang@…>
Branches:
ADT, master
Children:
f11010e
Parents:
6e4c44d (diff), 8db4708 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

Location:
libcfa/src
Files:
1 added
12 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/Makefile.am

    r6e4c44d r3982384  
    115115        concurrency/kernel/fwd.hfa \
    116116        concurrency/mutex_stmt.hfa \
    117     concurrency/select.hfa \
    118117    concurrency/channel.hfa \
    119118    concurrency/actor.hfa
     
    128127        concurrency/monitor.hfa \
    129128        concurrency/mutex.hfa \
     129    concurrency/select.hfa \
    130130        concurrency/thread.hfa
    131131
  • libcfa/src/bits/weakso_locks.cfa

    r6e4c44d r3982384  
    1515// Update Count     :
    1616//
    17 
    1817#include "bits/weakso_locks.hfa"
    19 
    2018#pragma GCC visibility push(default)
    2119
     
    2725void unlock( blocking_lock & ) {}
    2826void on_notify( blocking_lock &, struct thread$ * ) {}
    29 size_t on_wait( blocking_lock & ) { return 0; }
     27size_t on_wait( blocking_lock &, void (*pp_fn)( void * ), void * pp_datum ) { return 0; }
    3028void on_wakeup( blocking_lock &, size_t ) {}
    3129size_t wait_count( blocking_lock & ) { return 0; }
     30bool register_select( blocking_lock & this, select_node & node ) { return false; }
     31bool unregister_select( blocking_lock & this, select_node & node ) { return false; }
     32bool on_selected( blocking_lock & this, select_node & node ) { return true; }
     33
  • libcfa/src/bits/weakso_locks.hfa

    r6e4c44d r3982384  
    2323#include "containers/list.hfa"
    2424
    25 struct thread$;
     25struct select_node;
    2626
    2727//-----------------------------------------------------------------------------
     
    3232
    3333        // List of blocked threads
    34         dlist( thread$ ) blocked_threads;
     34        dlist( select_node ) blocked_threads;
    3535
    3636        // Count of current blocked threads
     
    5757void unlock( blocking_lock & this ) OPTIONAL_THREAD;
    5858void on_notify( blocking_lock & this, struct thread$ * t ) OPTIONAL_THREAD;
    59 size_t on_wait( blocking_lock & this ) OPTIONAL_THREAD;
     59size_t on_wait( blocking_lock & this, void (*pp_fn)( void * ), void * pp_datum ) OPTIONAL_THREAD;
    6060void on_wakeup( blocking_lock & this, size_t ) OPTIONAL_THREAD;
    6161size_t wait_count( blocking_lock & this ) OPTIONAL_THREAD;
     62bool register_select( blocking_lock & this, select_node & node ) OPTIONAL_THREAD;
     63bool unregister_select( blocking_lock & this, select_node & node ) OPTIONAL_THREAD;
     64bool on_selected( blocking_lock & this, select_node & node ) OPTIONAL_THREAD;
    6265
    6366//----------
     
    7275static inline bool   try_lock ( multiple_acquisition_lock & this ) { return try_lock( (blocking_lock &)this ); }
    7376static inline void   unlock   ( multiple_acquisition_lock & this ) { unlock  ( (blocking_lock &)this ); }
    74 static inline size_t on_wait  ( multiple_acquisition_lock & this ) { return on_wait ( (blocking_lock &)this ); }
     77static inline size_t on_wait  ( multiple_acquisition_lock & this, void (*pp_fn)( void * ), void * pp_datum ) { return on_wait ( (blocking_lock &)this, pp_fn, pp_datum ); }
    7578static inline void   on_wakeup( multiple_acquisition_lock & this, size_t v ) { on_wakeup ( (blocking_lock &)this, v ); }
    7679static inline void   on_notify( multiple_acquisition_lock & this, struct thread$ * t ){ on_notify( (blocking_lock &)this, t ); }
     80static inline bool   register_select( multiple_acquisition_lock & this, select_node & node ) { return register_select( (blocking_lock &)this, node ); }
     81static inline bool   unregister_select( multiple_acquisition_lock & this, select_node & node ) { return unregister_select( (blocking_lock &)this, node ); }
     82static inline bool   on_selected( multiple_acquisition_lock & this, select_node & node ) { return on_selected( (blocking_lock &)this, node ); }
  • libcfa/src/concurrency/channel.hfa

    r6e4c44d r3982384  
     1//
     2// Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
     3//
     4// The contents of this file are covered under the licence agreement in the
     5// file "LICENCE" distributed with Cforall.
     6//
     7// channel.hfa -- LIBCFATHREAD
     8// Runtime locks that used with the runtime thread system.
     9//
     10// Author           : Colby Alexander Parsons
     11// Created On       : Thu Jan 21 19:46:50 2022
     12// Last Modified By :
     13// Last Modified On :
     14// Update Count     :
     15//
     16
    117#pragma once
    218
    319#include <locks.hfa>
    420#include <list.hfa>
    5 #include <mutex_stmt.hfa>
    6 
    7 // link field used for threads waiting on channel
    8 struct wait_link {
    9     // used to put wait_link on a dl queue
    10     inline dlink(wait_link);
    11 
    12     // waiting thread
    13     struct thread$ * t;
    14 
    15     // shadow field
    16     void * elem;
    17 };
    18 P9_EMBEDDED( wait_link, dlink(wait_link) )
    19 
    20 static inline void ?{}( wait_link & this, thread$ * t, void * elem ) {
    21     this.t = t;
    22     this.elem = elem;
    23 }
    24 
    25 // wake one thread from the list
    26 static inline void wake_one( dlist( wait_link ) & queue ) {
    27     wait_link & popped = try_pop_front( queue );
    28     unpark( popped.t );
    29 }
     21#include "select.hfa"
    3022
    3123// returns true if woken due to shutdown
    3224// blocks thread on list and releases passed lock
    33 static inline bool block( dlist( wait_link ) & queue, void * elem_ptr, go_mutex & lock ) {
    34     wait_link w{ active_thread(), elem_ptr };
    35     insert_last( queue, w );
     25static inline bool block( dlist( select_node ) & queue, void * elem_ptr, go_mutex & lock ) {
     26    select_node sn{ active_thread(), elem_ptr };
     27    insert_last( queue, sn );
    3628    unlock( lock );
    3729    park();
    38     return w.elem == 0p;
     30    return sn.extra == 0p;
     31}
     32
     33// Waituntil support (un)register_select helper routine
     34// Sets select node avail if not special OR case and then unlocks
     35static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) {
     36    if ( node.park_counter ) __make_select_node_available( node );
     37    unlock( mutex_lock );
    3938}
    4039
     
    5958    size_t size, front, back, count;
    6059    T * buffer;
    61     dlist( wait_link ) prods, cons; // lists of blocked threads
    62     go_mutex mutex_lock;            // MX lock
    63     bool closed;                    // indicates channel close/open
     60    dlist( select_node ) prods, cons; // lists of blocked threads
     61    go_mutex mutex_lock;              // MX lock
     62    bool closed;                      // indicates channel close/open
    6463    #ifdef CHAN_STATS
    6564    size_t blocks, operations;      // counts total ops and ops resulting in a blocked thd
     
    7069    size = _size;
    7170    front = back = count = 0;
    72     buffer = aalloc( size );
     71    if ( size != 0 ) buffer = aalloc( size );
    7372    prods{};
    7473    cons{};
     
    8786    #endif
    8887    verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" );
    89     delete( buffer );
    90 }
    91 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    92 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
     88    if ( size != 0 ) delete( buffer );
     89}
     90static inline size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); }
     91static inline size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); }
    9392static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
    9493static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
     
    102101    // flush waiting consumers and producers
    103102    while ( has_waiting_consumers( chan ) ) {
    104         cons`first.elem = 0p;
     103        if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race
     104            break;  // if __handle_waituntil_OR returns false cons is empty so break
     105        cons`first.extra = 0p;
    105106        wake_one( cons );
    106107    }
    107108    while ( has_waiting_producers( chan ) ) {
    108         prods`first.elem = 0p;
     109        if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race
     110            break;  // if __handle_waituntil_OR returns false prods is empty so break
     111        prods`first.extra = 0p;
    109112        wake_one( prods );
    110113    }
     
    114117static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
    115118
     119// used to hand an element to a blocked consumer and signal it
     120static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {
     121    memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work
     122    wake_one( cons );
     123}
     124
     125// used to hand an element to a blocked producer and signal it
     126static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {
     127    memcpy( (void *)&retval, prods`first.extra, sizeof(T) );
     128    wake_one( prods );
     129}
     130
    116131static inline void flush( channel(T) & chan, T elem ) with(chan) {
    117132    lock( mutex_lock );
    118133    while ( count == 0 && !cons`isEmpty ) {
    119         memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    120         wake_one( cons );
     134        __cons_handoff( chan, elem );
    121135    }
    122136    unlock( mutex_lock );
     
    125139// handles buffer insert
    126140static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
    127     memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
     141    memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) );
    128142    count += 1;
    129143    back++;
     
    131145}
    132146
    133 // does the buffer insert or hands elem directly to consumer if one is waiting
    134 static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) {
    135     if ( count == 0 && !cons`isEmpty ) {
    136         memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    137         wake_one( cons );
    138     } else __buf_insert( chan, elem );
    139 }
    140 
    141147// needed to avoid an extra copy in closed case
    142148static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
     
    145151    operations++;
    146152    #endif
     153
     154    ConsEmpty: if ( !cons`isEmpty ) {
     155        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
     156        __cons_handoff( chan, elem );
     157        unlock( mutex_lock );
     158        return true;
     159    }
     160
    147161    if ( count == size ) { unlock( mutex_lock ); return false; }
    148     __do_insert( chan, elem );
     162
     163    __buf_insert( chan, elem );
    149164    unlock( mutex_lock );
    150165    return true;
     
    157172// handles closed case of insert routine
    158173static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
    159     channel_closed except{&channel_closed_vt, &elem, &chan };
     174    channel_closed except{ &channel_closed_vt, &elem, &chan };
    160175    throwResume except; // throw closed resumption
    161176    if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
     
    182197    }
    183198
    184     // have to check for the zero size channel case
    185     if ( size == 0 && !cons`isEmpty ) {
    186         memcpy(cons`first.elem, (void *)&elem, sizeof(T));
    187         wake_one( cons );
    188         unlock( mutex_lock );
    189         return true;
     199    // buffer count must be zero if cons are blocked (also handles zero-size case)
     200    ConsEmpty: if ( !cons`isEmpty ) {
     201        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
     202        __cons_handoff( chan, elem );
     203        unlock( mutex_lock );
     204        return;
    190205    }
    191206
     
    202217    } // if
    203218
    204     if ( count == 0 && !cons`isEmpty ) {
    205         memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    206         wake_one( cons );
    207     } else __buf_insert( chan, elem );
    208    
    209     unlock( mutex_lock );
    210     return;
    211 }
    212 
    213 // handles buffer remove
    214 static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) {
    215     memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
     219    __buf_insert( chan, elem );
     220    unlock( mutex_lock );
     221}
     222
     223// does the buffer remove and potentially does waiting producer work
     224static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
     225    memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) );
    216226    count -= 1;
    217227    front = (front + 1) % size;
    218 }
    219 
    220 // does the buffer remove and potentially does waiting producer work
    221 static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
    222     __buf_remove( chan, retval );
    223228    if (count == size - 1 && !prods`isEmpty ) {
    224         __buf_insert( chan, *(T *)prods`first.elem );  // do waiting producer work
     229        if ( !__handle_waituntil_OR( prods ) ) return;
     230        __buf_insert( chan, *(T *)prods`first.extra );  // do waiting producer work
    225231        wake_one( prods );
    226232    }
     
    233239    operations++;
    234240    #endif
     241
     242    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
     243        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
     244        __prods_handoff( chan, retval );
     245        unlock( mutex_lock );
     246        return true;
     247    }
     248
    235249    if ( count == 0 ) { unlock( mutex_lock ); return false; }
     250
    236251    __do_remove( chan, retval );
    237252    unlock( mutex_lock );
     
    244259static inline [T, bool] try_remove( channel(T) & chan ) {
    245260    T retval;
    246     return [ retval, __internal_try_remove( chan, retval ) ];
    247 }
    248 
    249 static inline T try_remove( channel(T) & chan, T elem ) {
     261    bool success = __internal_try_remove( chan, retval );
     262    return [ retval, success ];
     263}
     264
     265static inline T try_remove( channel(T) & chan ) {
    250266    T retval;
    251267    __internal_try_remove( chan, retval );
     
    255271// handles closed case of insert routine
    256272static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
    257     channel_closed except{&channel_closed_vt, 0p, &chan };
     273    channel_closed except{ &channel_closed_vt, 0p, &chan };
    258274    throwResume except; // throw resumption
    259275    if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
     
    279295
    280296    // have to check for the zero size channel case
    281     if ( size == 0 && !prods`isEmpty ) {
    282         memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T));
    283         wake_one( prods );
     297    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
     298        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
     299        __prods_handoff( chan, retval );
    284300        unlock( mutex_lock );
    285301        return retval;
     
    287303
    288304    // wait if buffer is empty, work will be completed by someone else
    289     if (count == 0) {
     305    if ( count == 0 ) {
    290306        #ifdef CHAN_STATS
    291307        blocks++;
     
    299315    // Remove from buffer
    300316    __do_remove( chan, retval );
    301 
    302317    unlock( mutex_lock );
    303318    return retval;
    304319}
     320
     321///////////////////////////////////////////////////////////////////////////////////////////
     322// The following is support for waituntil (select) statements
     323///////////////////////////////////////////////////////////////////////////////////////////
     324static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) {
     325    // if ( !node`isListed && !node.park_counter ) return false; // handle special OR case C_TODO: try adding this back
     326    lock( mutex_lock );
     327    if ( node`isListed ) { // op wasn't performed
     328        #ifdef CHAN_STATS
     329        operations--;
     330        #endif
     331        remove( node );
     332        unlock( mutex_lock );
     333        return false;
     334    }
     335    unlock( mutex_lock );
     336
     337    // only return true when not special OR case, not exceptional calse and status is SAT
     338    return ( node.extra == 0p || !node.park_counter ) ? false : *node.clause_status == __SELECT_SAT;
     339}
     340
     341// type used by select statement to capture a chan read as the selected operation
     342struct chan_read {
     343    T & ret;
     344    channel(T) & chan;
     345};
     346
     347static inline void ?{}( chan_read(T) & cr, channel(T) & chan, T & ret ) {
     348    &cr.chan = &chan;
     349    &cr.ret = &ret;
     350}
     351static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ chan, ret }; return cr; }
     352
     353static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(this.chan, this) {
     354    __closed_remove( chan, ret );
     355    // if we get here then the insert succeeded
     356    __make_select_node_available( node );
     357}
     358
     359static inline bool register_select( chan_read(T) & this, select_node & node ) with(this.chan, this) {
     360    lock( mutex_lock );
     361    node.extra = &ret; // set .extra so that if it == 0p later in on_selected it is due to channel close
     362
     363    #ifdef CHAN_STATS
     364    if ( !closed ) operations++;
     365    #endif
     366
     367    if ( !node.park_counter ) {
     368        // are we special case OR and front of cons is also special case OR
     369        if ( !unlikely(closed) && !prods`isEmpty && prods`first.clause_status && !prods`first.park_counter ) {
     370            if ( !__make_select_node_pending( node ) ) {
     371                unlock( mutex_lock );
     372                return false;
     373            }
     374           
     375            if ( __handle_waituntil_OR( prods ) ) {
     376                __prods_handoff( chan, ret );
     377                __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
     378                unlock( mutex_lock );
     379                return true;
     380            }
     381            __make_select_node_unsat( node );
     382        }
     383        // check if we can complete operation. If so race to establish winner in special OR case
     384        if ( count != 0 || !prods`isEmpty || unlikely(closed) ) {
     385            if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
     386                unlock( mutex_lock );
     387                return false;
     388            }
     389        }
     390    }
     391
     392    if ( unlikely(closed) ) {
     393        unlock( mutex_lock );
     394        __handle_select_closed_read( this, node );
     395        return true;
     396    }
     397
     398    // have to check for the zero size channel case
     399    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
     400        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
     401        __prods_handoff( chan, ret );
     402        __set_avail_then_unlock( node, mutex_lock );
     403        return true;
     404    }
     405
     406    // wait if buffer is empty, work will be completed by someone else
     407    if ( count == 0 ) {
     408        #ifdef CHAN_STATS
     409        blocks++;
     410        #endif
     411       
     412        insert_last( cons, node );
     413        unlock( mutex_lock );
     414        return false;
     415    }
     416
     417    // Remove from buffer
     418    __do_remove( chan, ret );
     419    __set_avail_then_unlock( node, mutex_lock );
     420    return true;
     421}
     422static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
     423static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) {
     424    if ( node.extra == 0p ) // check if woken up due to closed channel
     425        __closed_remove( chan, ret );
     426    // This is only reachable if not closed or closed exception was handled
     427    return true;
     428}
     429
     430// type used by select statement to capture a chan write as the selected operation
     431struct chan_write {
     432    T elem;
     433    channel(T) & chan;
     434};
     435
     436static inline void ?{}( chan_write(T) & cw, channel(T) & chan, T elem ) {
     437    &cw.chan = &chan;
     438    memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );
     439}
     440static inline chan_write(T) ?>>?( T elem, channel(T) & chan ) { chan_write(T) cw{ chan, elem }; return cw; }
     441
     442static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(this.chan, this) {
     443    __closed_insert( chan, elem );
     444    // if we get here then the insert succeeded
     445    __make_select_node_available( node );
     446}
     447
     448static inline bool register_select( chan_write(T) & this, select_node & node ) with(this.chan, this) {
     449    lock( mutex_lock );
     450    node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close
     451
     452    #ifdef CHAN_STATS
     453    if ( !closed ) operations++;
     454    #endif
     455
     456    // special OR case handling
     457    if ( !node.park_counter ) {
     458        // are we special case OR and front of cons is also special case OR
     459        if ( !unlikely(closed) && !cons`isEmpty && cons`first.clause_status && !cons`first.park_counter ) {
     460            if ( !__make_select_node_pending( node ) ) {
     461                unlock( mutex_lock );
     462                return false;
     463            }
     464           
     465            if ( __handle_waituntil_OR( cons ) ) {
     466                __cons_handoff( chan, elem );
     467                __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
     468                unlock( mutex_lock );
     469                return true;
     470            }
     471            __make_select_node_unsat( node );
     472        }
     473        // check if we can complete operation. If so race to establish winner in special OR case
     474        if ( count != size || !cons`isEmpty || unlikely(closed) ) {
     475            if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
     476                unlock( mutex_lock );
     477                return false;
     478            }
     479        }
     480    }
     481
     482    // if closed handle
     483    if ( unlikely(closed) ) {
     484        unlock( mutex_lock );
     485        __handle_select_closed_write( this, node );
     486        return true;
     487    }
     488
     489    // handle blocked consumer case via handoff (buffer is implicitly empty)
     490    ConsEmpty: if ( !cons`isEmpty ) {
     491        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
     492        __cons_handoff( chan, elem );
     493        __set_avail_then_unlock( node, mutex_lock );
     494        return true;
     495    }
     496
     497    // insert node in list if buffer is full, work will be completed by someone else
     498    if ( count == size ) {
     499        #ifdef CHAN_STATS
     500        blocks++;
     501        #endif
     502
     503        insert_last( prods, node );
     504        unlock( mutex_lock );
     505        return false;
     506    } // if
     507
     508    // otherwise carry out write either via normal insert
     509    __buf_insert( chan, elem );
     510    __set_avail_then_unlock( node, mutex_lock );
     511    return true;
     512}
     513static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
     514
     515static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) {
     516    if ( node.extra == 0p ) // check if woken up due to closed channel
     517        __closed_insert( chan, elem );
     518
     519    // This is only reachable if not closed or closed exception was handled
     520    return true;
     521}
     522
    305523} // forall( T )
     524
     525
  • libcfa/src/concurrency/future.hfa

    r6e4c44d r3982384  
    1919#include "monitor.hfa"
    2020#include "select.hfa"
     21#include "locks.hfa"
    2122
    2223//----------------------------------------------------------------------------
     
    2627//  future_t is lockfree and uses atomics which aren't needed given we use locks here
    2728forall( T ) {
    28     // enum(int) { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 }; // Enums seem to be broken so feel free to add this back afterwards
     29    // enum { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 }; // Enums seem to be broken so feel free to add this back afterwards
    2930
    3031    // temporary enum replacement
     
    4445    };
    4546
    46     // C_TODO: perhaps allow exceptions to be inserted like uC++?
    47 
    4847        static inline {
    4948
     
    5352        }
    5453
    55         void ?{}(future(T) & this) {
     54        void ?{}( future(T) & this ) {
    5655                        this.waiters{};
    5756            this.state = FUTURE_EMPTY;
     
    6059
    6160                // Reset future back to original state
    62                 void reset(future(T) & this) with(this)
     61                void reset( future(T) & this ) with(this)
    6362        {
    6463            lock( lock );
     
    8281        void _internal_flush( future(T) & this ) with(this) {
    8382            while( ! waiters`isEmpty ) {
     83                if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case
     84                    break; // if handle_OR returns false then waiters is empty so break
    8485                select_node &s = try_pop_front( waiters );
    8586
    86                 if ( s.race_flag == 0p )
    87                     // poke in result so that woken threads do not need to reacquire any locks
    88                     // *(((future_node(T) &)s).my_result) = result;
     87                if ( s.clause_status == 0p ) // poke in result so that woken threads do not need to reacquire any locks
    8988                    copy_T( result, *(((future_node(T) &)s).my_result) );
    90                 else if ( !install_select_winner( s, &this ) ) continue;
    9189               
    92                 // only unpark if future is not selected
    93                 // or if it is selected we only unpark if we win the race
    94                 unpark( s.blocked_thread );
     90                wake_one( waiters, s );
    9591            }
    9692        }
    9793
    9894                // Fulfil the future, returns whether or not someone was unblocked
    99                 bool fulfil( future(T) & this, T & val ) with(this) {
     95                bool fulfil( future(T) & this, T val ) with(this) {
    10096            lock( lock );
    10197            if( state != FUTURE_EMPTY )
     
    153149        }
    154150
    155         void * register_select( future(T) & this, select_node & s ) with(this) {
    156             lock( lock );
    157 
    158             // future not ready -> insert select node and return 0p
     151        bool register_select( future(T) & this, select_node & s ) with(this) {
     152            lock( lock );
     153
     154            // check if we can complete operation. If so race to establish winner in special OR case
     155            if ( !s.park_counter && state != FUTURE_EMPTY ) {
     156                if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering
     157                    unlock( lock );
     158                    return false;
     159                }
     160            }
     161
     162            // future not ready -> insert select node and return
    159163            if( state == FUTURE_EMPTY ) {
    160164                insert_last( waiters, s );
    161165                unlock( lock );
    162                 return 0p;
    163             }
    164 
    165             // future ready and we won race to install it as the select winner return 1p
    166             if ( install_select_winner( s, &this ) ) {
    167                 unlock( lock );
    168                 return 1p;
    169             }
    170 
    171             unlock( lock );
    172             // future ready and we lost race to install it as the select winner
    173             return 2p;
    174         }
    175 
    176         void unregister_select( future(T) & this, select_node & s ) with(this) {
     166                return false;
     167            }
     168
     169            __make_select_node_available( s );
     170            unlock( lock );
     171            return true;
     172        }
     173
     174        bool unregister_select( future(T) & this, select_node & s ) with(this) {
     175            if ( ! s`isListed ) return false;
    177176            lock( lock );
    178177            if ( s`isListed ) remove( s );
    179178            unlock( lock );
     179            return false;
    180180        }
    181181               
     182        bool on_selected( future(T) & this, select_node & node ) { return true; }
    182183        }
    183184}
     
    186187// These futures below do not support select statements so they may not be as useful as 'future'
    187188//  however the 'single_future' is cheap and cheerful and is most likely more performant than 'future'
    188 //  since it uses raw atomics and no locks afaik
     189//  since it uses raw atomics and no locks
    189190//
    190191// As far as 'multi_future' goes I can't see many use cases as it will be less performant than 'future'
  • libcfa/src/concurrency/invoke.h

    r6e4c44d r3982384  
    217217                struct __thread_user_link cltr_link;
    218218
    219                 // used to point to this thd's current clh node
    220                 volatile bool * clh_node;
    221 
    222219                struct processor * last_proc;
     220
     221        // ptr used during handover between blocking lists to allow for stack allocation of intrusive nodes
     222        // main use case is wait-morphing to allow a different node to be used to block on condvar vs lock
     223        void * link_node;
    223224
    224225                PRNG_STATE_T random_state;                                              // fast random numbers
  • libcfa/src/concurrency/locks.cfa

    r6e4c44d r3982384  
    55// file "LICENCE" distributed with Cforall.
    66//
    7 // locks.hfa -- LIBCFATHREAD
     7// locks.cfa -- LIBCFATHREAD
    88// Runtime locks that used with the runtime thread system.
    99//
     
    7979        // lock is held by some other thread
    8080        if ( owner != 0p && owner != thrd ) {
    81                 insert_last( blocked_threads, *thrd );
     81        select_node node;
     82                insert_last( blocked_threads, node );
    8283                wait_count++;
    8384                unlock( lock );
    8485                park( );
    85         }
    86         // multi acquisition lock is held by current thread
    87         else if ( owner == thrd && multi_acquisition ) {
     86        return;
     87        } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread
    8888                recursion_count++;
    89                 unlock( lock );
    90         }
    91         // lock isn't held
    92         else {
     89        } else {  // lock isn't held
    9390                owner = thrd;
    9491                recursion_count = 1;
    95                 unlock( lock );
    96         }
     92        }
     93    unlock( lock );
    9794}
    9895
     
    117114}
    118115
    119 static void pop_and_set_new_owner( blocking_lock & this ) with( this ) {
    120         thread$ * t = &try_pop_front( blocked_threads );
    121         owner = t;
    122         recursion_count = ( t ? 1 : 0 );
    123         if ( t ) wait_count--;
    124         unpark( t );
     116static inline void pop_node( blocking_lock & this ) with( this ) {
     117    __handle_waituntil_OR( blocked_threads );
     118    select_node * node = &try_pop_front( blocked_threads );
     119    if ( node ) {
     120        wait_count--;
     121        owner = node->blocked_thread;
     122        recursion_count = 1;
     123        // if ( !node->clause_status || __make_select_node_available( *node ) ) unpark( node->blocked_thread );
     124        wake_one( blocked_threads, *node );
     125    } else {
     126        owner = 0p;
     127        recursion_count = 0;
     128    }
    125129}
    126130
     
    134138        recursion_count--;
    135139        if ( recursion_count == 0 ) {
    136                 pop_and_set_new_owner( this );
     140                pop_node( this );
    137141        }
    138142        unlock( lock );
     
    147151        // lock held
    148152        if ( owner != 0p ) {
    149                 insert_last( blocked_threads, *t );
     153                insert_last( blocked_threads, *(select_node *)t->link_node );
    150154                wait_count++;
    151                 unlock( lock );
    152155        }
    153156        // lock not held
     
    156159                recursion_count = 1;
    157160                unpark( t );
    158                 unlock( lock );
    159         }
    160 }
    161 
    162 size_t on_wait( blocking_lock & this ) with( this ) {
     161        }
     162    unlock( lock );
     163}
     164
     165size_t on_wait( blocking_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) with( this ) {
    163166        lock( lock __cfaabi_dbg_ctx2 );
    164167        /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
     
    167170        size_t ret = recursion_count;
    168171
    169         pop_and_set_new_owner( this );
     172        pop_node( this );
     173
     174    select_node node;
     175    active_thread()->link_node = (void *)&node;
    170176        unlock( lock );
     177
     178    pre_park_then_park( pp_fn, pp_datum );
     179
    171180        return ret;
    172181}
     
    175184        recursion_count = recursion;
    176185}
     186
     187// waituntil() support
     188bool register_select( blocking_lock & this, select_node & node ) with(this) {
     189    lock( lock __cfaabi_dbg_ctx2 );
     190        thread$ * thrd = active_thread();
     191
     192        // single acquisition lock is held by current thread
     193        /* paranoid */ verifyf( owner != thrd || multi_acquisition, "Single acquisition lock holder (%p) attempted to reacquire the lock %p resulting in a deadlock.", owner, &this );
     194
     195    if ( !node.park_counter && ( (owner == thrd && multi_acquisition) || owner == 0p ) ) { // OR special case
     196        if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
     197           unlock( lock );
     198           return false;
     199        }
     200    }
     201
     202        // lock is held by some other thread
     203        if ( owner != 0p && owner != thrd ) {
     204                insert_last( blocked_threads, node );
     205                wait_count++;
     206                unlock( lock );
     207        return false;
     208        } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread
     209                recursion_count++;
     210        } else {  // lock isn't held
     211                owner = thrd;
     212                recursion_count = 1;
     213        }
     214
     215    if ( node.park_counter ) __make_select_node_available( node );
     216    unlock( lock );
     217    return true;
     218}
     219
     220bool unregister_select( blocking_lock & this, select_node & node ) with(this) {
     221    lock( lock __cfaabi_dbg_ctx2 );
     222    if ( node`isListed ) {
     223        remove( node );
     224        wait_count--;
     225        unlock( lock );
     226        return false;
     227    }
     228   
     229    if ( owner == active_thread() ) {
     230        /* paranoid */ verifyf( recursion_count == 1 || multi_acquisition, "Thread %p attempted to unlock owner lock %p in waituntil unregister, which is not recursive but has a recursive count of %zu", active_thread(), &this, recursion_count );
     231        // if recursion count is zero release lock and set new owner if one is waiting
     232        recursion_count--;
     233        if ( recursion_count == 0 ) {
     234            pop_node( this );
     235        }
     236    }
     237        unlock( lock );
     238    return false;
     239}
     240
     241bool on_selected( blocking_lock & this, select_node & node ) { return true; }
    177242
    178243//-----------------------------------------------------------------------------
     
    311376        int counter( condition_variable(L) & this ) with(this) { return count; }
    312377
    313         static size_t queue_and_get_recursion( condition_variable(L) & this, info_thread(L) * i ) with(this) {
     378        static void enqueue_thread( condition_variable(L) & this, info_thread(L) * i ) with(this) {
    314379                // add info_thread to waiting queue
    315380                insert_last( blocked_threads, *i );
    316381                count++;
    317                 size_t recursion_count = 0;
    318                 if (i->lock) {
    319                         // if lock was passed get recursion count to reset to after waking thread
    320                         recursion_count = on_wait( *i->lock );
    321                 }
    322                 return recursion_count;
    323         }
     382        }
     383
     384    static size_t block_and_get_recursion( info_thread(L) & i, __cfa_pre_park pp_fn, void * pp_datum ) {
     385        size_t recursion_count = 0;
     386                if ( i.lock ) // if lock was passed get recursion count to reset to after waking thread
     387                        recursion_count = on_wait( *i.lock, pp_fn, pp_datum ); // this call blocks
     388                else
     389            pre_park_then_park( pp_fn, pp_datum );
     390        return recursion_count;
     391    }
     392    static size_t block_and_get_recursion( info_thread(L) & i ) { return block_and_get_recursion( i, pre_park_noop, 0p ); }
    324393
    325394        // helper for wait()'s' with no timeout
    326395        static void queue_info_thread( condition_variable(L) & this, info_thread(L) & i ) with(this) {
    327396                lock( lock __cfaabi_dbg_ctx2 );
    328                 size_t recursion_count = queue_and_get_recursion(this, &i);
     397        enqueue_thread( this, &i );
    329398                unlock( lock );
    330399
    331400                // blocks here
    332                 park( );
     401        size_t recursion_count = block_and_get_recursion( i );
    333402
    334403                // resets recursion count here after waking
    335                 if (i.lock) on_wakeup(*i.lock, recursion_count);
     404                if ( i.lock ) on_wakeup( *i.lock, recursion_count );
    336405        }
    337406
     
    340409                queue_info_thread( this, i );
    341410
     411    static void cond_alarm_register( void * node_ptr ) { register_self( (alarm_node_t *)node_ptr ); }
     412
    342413        // helper for wait()'s' with a timeout
    343414        static void queue_info_thread_timeout( condition_variable(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) {
    344415                lock( lock __cfaabi_dbg_ctx2 );
    345                 size_t recursion_count = queue_and_get_recursion(this, &info);
     416        enqueue_thread( this, &info );
    346417                alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info };
    347418                unlock( lock );
    348419
    349                 // registers alarm outside cond lock to avoid deadlock
    350                 register_self( &node_wrap.alarm_node );
    351 
    352                 // blocks here
    353                 park();
     420                // blocks here and registers alarm node before blocking after releasing locks to avoid deadlock
     421        size_t recursion_count = block_and_get_recursion( info, cond_alarm_register, (void *)(&node_wrap.alarm_node) );
     422                // park();
    354423
    355424                // unregisters alarm so it doesn't go off if this happens first
     
    357426
    358427                // resets recursion count here after waking
    359                 if (info.lock) on_wakeup(*info.lock, recursion_count);
     428                if ( info.lock ) on_wakeup( *info.lock, recursion_count );
    360429        }
    361430
     
    417486                info_thread( L ) i = { active_thread(), info, &l };
    418487                insert_last( blocked_threads, i );
    419                 size_t recursion_count = on_wait( *i.lock );
    420                 park( );
     488                size_t recursion_count = on_wait( *i.lock, pre_park_noop, 0p ); // blocks here
     489                // park( );
    421490                on_wakeup(*i.lock, recursion_count);
    422491        }
     
    459528        bool empty ( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty; }
    460529
    461         static size_t queue_and_get_recursion( pthread_cond_var(L) & this, info_thread(L) * i ) with(this) {
    462                 // add info_thread to waiting queue
    463                 insert_last( blocked_threads, *i );
    464                 size_t recursion_count = 0;
    465                 recursion_count = on_wait( *i->lock );
    466                 return recursion_count;
    467         }
    468        
    469530        static void queue_info_thread_timeout( pthread_cond_var(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) {
    470531                lock( lock __cfaabi_dbg_ctx2 );
    471                 size_t recursion_count = queue_and_get_recursion(this, &info);
     532        insert_last( blocked_threads, info );
    472533                pthread_alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info };
    473534                unlock( lock );
    474535
    475                 // registers alarm outside cond lock to avoid deadlock
    476                 register_self( &node_wrap.alarm_node );
    477 
    478                 // blocks here
    479                 park();
    480 
    481                 // unregisters alarm so it doesn't go off if this happens first
     536                // blocks here and registers alarm node before blocking after releasing locks to avoid deadlock
     537        size_t recursion_count = block_and_get_recursion( info, cond_alarm_register, (void *)(&node_wrap.alarm_node) );
     538
     539                // unregisters alarm so it doesn't go off if signal happens first
    482540                unregister_self( &node_wrap.alarm_node );
    483541
    484542                // resets recursion count here after waking
    485                 if (info.lock) on_wakeup(*info.lock, recursion_count);
     543                if ( info.lock ) on_wakeup( *info.lock, recursion_count );
    486544        }
    487545
     
    493551                lock( lock __cfaabi_dbg_ctx2 );
    494552                info_thread( L ) i = { active_thread(), info, &l };
    495                 size_t recursion_count = queue_and_get_recursion(this, &i);
    496                 unlock( lock );
    497                 park( );
    498                 on_wakeup(*i.lock, recursion_count);
     553        insert_last( blocked_threads, i );
     554                unlock( lock );
     555
     556        // blocks here
     557                size_t recursion_count = block_and_get_recursion( i );
     558
     559                on_wakeup( *i.lock, recursion_count );
    499560        }
    500561
     
    584645        return thrd != 0p;
    585646}
     647
  • libcfa/src/concurrency/locks.hfa

    r6e4c44d r3982384  
    3030#include "time.hfa"
    3131
     32#include "select.hfa"
     33
    3234#include <fstream.hfa>
    3335
     
    3739#include <unistd.h>
    3840
    39 // C_TODO: cleanup this and locks.cfa
    40 // - appropriate separation of interface and impl
    41 // - clean up unused/unneeded locks
    42 // - change messy big blocking lock from inheritance to composition to remove need for flags
     41typedef void (*__cfa_pre_park)( void * );
     42
     43static inline void pre_park_noop( void * ) {}
     44
     45//-----------------------------------------------------------------------------
     46// is_blocking_lock
     47forall( L & | sized(L) )
     48trait is_blocking_lock {
     49        // For synchronization locks to use when acquiring
     50        void on_notify( L &, struct thread$ * );
     51
     52        // For synchronization locks to use when releasing
     53        size_t on_wait( L &, __cfa_pre_park pp_fn, void * pp_datum );
     54
     55        // to set recursion count after getting signalled;
     56        void on_wakeup( L &, size_t recursion );
     57};
     58
     59static inline void pre_park_then_park( __cfa_pre_park pp_fn, void * pp_datum ) {
     60    pp_fn( pp_datum );
     61    park();
     62}
     63
     64// macros for default routine impls for is_blocking_lock trait that do not wait-morph
     65
     66#define DEFAULT_ON_NOTIFY( lock_type ) \
     67    static inline void on_notify( lock_type & this, thread$ * t ){ unpark(t); }
     68
     69#define DEFAULT_ON_WAIT( lock_type ) \
     70    static inline size_t on_wait( lock_type & this, __cfa_pre_park pp_fn, void * pp_datum ) { \
     71        unlock( this ); \
     72        pre_park_then_park( pp_fn, pp_datum ); \
     73        return 0; \
     74    }
     75
     76// on_wakeup impl if lock should be reacquired after waking up
     77#define DEFAULT_ON_WAKEUP_REACQ( lock_type ) \
     78    static inline void on_wakeup( lock_type & this, size_t recursion ) { lock( this ); }
     79
     80// on_wakeup impl if lock will not be reacquired after waking up
     81#define DEFAULT_ON_WAKEUP_NO_REACQ( lock_type ) \
     82    static inline void on_wakeup( lock_type & this, size_t recursion ) {}
     83
     84
    4385
    4486//-----------------------------------------------------------------------------
     
    67109static inline bool   try_lock ( single_acquisition_lock & this ) { return try_lock( (blocking_lock &)this ); }
    68110static inline void   unlock   ( single_acquisition_lock & this ) { unlock  ( (blocking_lock &)this ); }
    69 static inline size_t on_wait  ( single_acquisition_lock & this ) { return on_wait ( (blocking_lock &)this ); }
     111static inline size_t on_wait  ( single_acquisition_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) { return on_wait ( (blocking_lock &)this, pp_fn, pp_datum ); }
    70112static inline void   on_wakeup( single_acquisition_lock & this, size_t v ) { on_wakeup ( (blocking_lock &)this, v ); }
    71113static inline void   on_notify( single_acquisition_lock & this, struct thread$ * t ) { on_notify( (blocking_lock &)this, t ); }
     114static inline bool   register_select( single_acquisition_lock & this, select_node & node ) { return register_select( (blocking_lock &)this, node ); }
     115static inline bool   unregister_select( single_acquisition_lock & this, select_node & node ) { return unregister_select( (blocking_lock &)this, node ); }
     116static inline bool   on_selected( single_acquisition_lock & this, select_node & node ) { return on_selected( (blocking_lock &)this, node ); }
    72117
    73118//----------
     
    81126static inline bool   try_lock ( owner_lock & this ) { return try_lock( (blocking_lock &)this ); }
    82127static inline void   unlock   ( owner_lock & this ) { unlock  ( (blocking_lock &)this ); }
    83 static inline size_t on_wait  ( owner_lock & this ) { return on_wait ( (blocking_lock &)this ); }
     128static inline size_t on_wait  ( owner_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) { return on_wait ( (blocking_lock &)this, pp_fn, pp_datum ); }
    84129static inline void   on_wakeup( owner_lock & this, size_t v ) { on_wakeup ( (blocking_lock &)this, v ); }
    85130static inline void   on_notify( owner_lock & this, struct thread$ * t ) { on_notify( (blocking_lock &)this, t ); }
     131static inline bool   register_select( owner_lock & this, select_node & node ) { return register_select( (blocking_lock &)this, node ); }
     132static inline bool   unregister_select( owner_lock & this, select_node & node ) { return unregister_select( (blocking_lock &)this, node ); }
     133static inline bool   on_selected( owner_lock & this, select_node & node ) { return on_selected( (blocking_lock &)this, node ); }
    86134
    87135//-----------------------------------------------------------------------------
     
    156204// - Kernel thd blocking alternative to the spinlock
    157205// - No ownership (will deadlock on reacq)
     206// - no reacq on wakeup
    158207struct futex_mutex {
    159208        // lock state any state other than UNLOCKED is locked
     
    169218}
    170219
    171 static inline void  ?{}( futex_mutex & this ) with(this) { val = 0; }
    172 
    173 static inline bool internal_try_lock(futex_mutex & this, int & compare_val) with(this) {
     220static inline void ?{}( futex_mutex & this ) with(this) { val = 0; }
     221
     222static inline bool internal_try_lock( futex_mutex & this, int & compare_val) with(this) {
    174223        return __atomic_compare_exchange_n((int*)&val, (int*)&compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE);
    175224}
    176225
    177 static inline int internal_exchange(futex_mutex & this) with(this) {
     226static inline int internal_exchange( futex_mutex & this ) with(this) {
    178227        return __atomic_exchange_n((int*)&val, 2, __ATOMIC_ACQUIRE);
    179228}
    180229
    181230// if this is called recursively IT WILL DEADLOCK!!!!!
    182 static inline void lock(futex_mutex & this) with(this) {
     231static inline void lock( futex_mutex & this ) with(this) {
    183232        int state;
    184233
     
    190239                for (int i = 0; i < spin; i++) Pause();
    191240        }
    192 
    193         // // no contention try to acquire
    194         // if (internal_try_lock(this, state)) return;
    195241       
    196242        // if not in contended state, set to be in contended state
     
    212258}
    213259
    214 static inline void on_notify( futex_mutex & f, thread$ * t){ unpark(t); }
    215 static inline size_t on_wait( futex_mutex & f ) {unlock(f); return 0;}
    216 
    217 // to set recursion count after getting signalled;
    218 static inline void on_wakeup( futex_mutex & f, size_t recursion ) {}
     260DEFAULT_ON_NOTIFY( futex_mutex )
     261DEFAULT_ON_WAIT( futex_mutex )
     262DEFAULT_ON_WAKEUP_NO_REACQ( futex_mutex )
    219263
    220264//-----------------------------------------------------------------------------
     
    232276        int val;
    233277};
    234 
    235278static inline void  ?{}( go_mutex & this ) with(this) { val = 0; }
     279// static inline void ?{}( go_mutex & this, go_mutex this2 ) = void; // these don't compile correctly at the moment so they should be omitted
     280// static inline void ?=?( go_mutex & this, go_mutex this2 ) = void;
    236281
    237282static inline bool internal_try_lock(go_mutex & this, int & compare_val, int new_val ) with(this) {
     
    244289
    245290// if this is called recursively IT WILL DEADLOCK!!!!!
    246 static inline void lock(go_mutex & this) with(this) {
     291static inline void lock( go_mutex & this ) with( this ) {
    247292        int state, init_state;
    248293
     
    255300            while( !val ) { // lock unlocked
    256301                state = 0;
    257                 if (internal_try_lock(this, state, init_state)) return;
     302                if ( internal_try_lock( this, state, init_state ) ) return;
    258303            }
    259304            for (int i = 0; i < 30; i++) Pause();
     
    262307        while( !val ) { // lock unlocked
    263308            state = 0;
    264             if (internal_try_lock(this, state, init_state)) return;
     309            if ( internal_try_lock( this, state, init_state ) ) return;
    265310        }
    266311        sched_yield();
    267312       
    268313        // if not in contended state, set to be in contended state
    269         state = internal_exchange(this, 2);
     314        state = internal_exchange( this, 2 );
    270315        if ( !state ) return; // state == 0
    271316        init_state = 2;
    272         futex((int*)&val, FUTEX_WAIT, 2); // if val is not 2 this returns with EWOULDBLOCK
     317        futex( (int*)&val, FUTEX_WAIT, 2 ); // if val is not 2 this returns with EWOULDBLOCK
    273318    }
    274319}
     
    276321static inline void unlock( go_mutex & this ) with(this) {
    277322        // if uncontended do atomic unlock and then return
    278     if (__atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1) return;
     323    if ( __atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1 ) return;
    279324       
    280325        // otherwise threads are blocked so we must wake one
    281         futex((int *)&val, FUTEX_WAKE, 1);
    282 }
    283 
    284 static inline void on_notify( go_mutex & f, thread$ * t){ unpark(t); }
    285 static inline size_t on_wait( go_mutex & f ) {unlock(f); return 0;}
    286 static inline void on_wakeup( go_mutex & f, size_t recursion ) {}
    287 
    288 //-----------------------------------------------------------------------------
    289 // CLH Spinlock
    290 // - No recursive acquisition
    291 // - Needs to be released by owner
    292 
    293 struct clh_lock {
    294         volatile bool * volatile tail;
    295     volatile bool * volatile head;
    296 };
    297 
    298 static inline void  ?{}( clh_lock & this ) { this.tail = malloc(); *this.tail = true; }
    299 static inline void ^?{}( clh_lock & this ) { free(this.tail); }
    300 
    301 static inline void lock(clh_lock & l) {
    302         thread$ * curr_thd = active_thread();
    303         *(curr_thd->clh_node) = false;
    304         volatile bool * prev = __atomic_exchange_n((bool **)(&l.tail), (bool *)(curr_thd->clh_node), __ATOMIC_SEQ_CST);
    305         while(!__atomic_load_n(prev, __ATOMIC_SEQ_CST)) Pause();
    306     __atomic_store_n((bool **)(&l.head), (bool *)curr_thd->clh_node, __ATOMIC_SEQ_CST);
    307     curr_thd->clh_node = prev;
    308 }
    309 
    310 static inline void unlock(clh_lock & l) {
    311         __atomic_store_n((bool *)(l.head), true, __ATOMIC_SEQ_CST);
    312 }
    313 
    314 static inline void on_notify(clh_lock & this, struct thread$ * t ) { unpark(t); }
    315 static inline size_t on_wait(clh_lock & this) { unlock(this); return 0; }
    316 static inline void on_wakeup(clh_lock & this, size_t recursion ) { lock(this); }
     326        futex( (int *)&val, FUTEX_WAKE, 1 );
     327}
     328
     329DEFAULT_ON_NOTIFY( go_mutex )
     330DEFAULT_ON_WAIT( go_mutex )
     331DEFAULT_ON_WAKEUP_NO_REACQ( go_mutex )
    317332
    318333//-----------------------------------------------------------------------------
     
    334349        this.lock_value = 0;
    335350}
     351static inline void ?{}( exp_backoff_then_block_lock & this, exp_backoff_then_block_lock this2 ) = void;
     352static inline void ?=?( exp_backoff_then_block_lock & this, exp_backoff_then_block_lock this2 ) = void;
    336353
    337354static inline void  ^?{}( exp_backoff_then_block_lock & this ){}
    338355
    339 static inline bool internal_try_lock(exp_backoff_then_block_lock & this, size_t & compare_val) with(this) {
     356static inline bool internal_try_lock( exp_backoff_then_block_lock & this, size_t & compare_val ) with(this) {
    340357        return __atomic_compare_exchange_n(&lock_value, &compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
    341358}
    342359
    343 static inline bool try_lock(exp_backoff_then_block_lock & this) { size_t compare_val = 0; return internal_try_lock(this, compare_val); }
    344 
    345 static inline bool try_lock_contention(exp_backoff_then_block_lock & this) with(this) {
    346         return !__atomic_exchange_n(&lock_value, 2, __ATOMIC_ACQUIRE);
    347 }
    348 
    349 static inline bool block(exp_backoff_then_block_lock & this) with(this) {
     360static inline bool try_lock( exp_backoff_then_block_lock & this ) { size_t compare_val = 0; return internal_try_lock( this, compare_val ); }
     361
     362static inline bool try_lock_contention( exp_backoff_then_block_lock & this ) with(this) {
     363        return !__atomic_exchange_n( &lock_value, 2, __ATOMIC_ACQUIRE );
     364}
     365
     366static inline bool block( exp_backoff_then_block_lock & this ) with(this) {
    350367    lock( spinlock __cfaabi_dbg_ctx2 );
    351368    if (__atomic_load_n( &lock_value, __ATOMIC_SEQ_CST) != 2) {
     
    359376}
    360377
    361 static inline void lock(exp_backoff_then_block_lock & this) with(this) {
     378static inline void lock( exp_backoff_then_block_lock & this ) with(this) {
    362379        size_t compare_val = 0;
    363380        int spin = 4;
     
    378395}
    379396
    380 static inline void unlock(exp_backoff_then_block_lock & this) with(this) {
     397static inline void unlock( exp_backoff_then_block_lock & this ) with(this) {
    381398    if (__atomic_exchange_n(&lock_value, 0, __ATOMIC_RELEASE) == 1) return;
    382399    lock( spinlock __cfaabi_dbg_ctx2 );
     
    386403}
    387404
    388 static inline void on_notify(exp_backoff_then_block_lock & this, struct thread$ * t ) { unpark(t); }
    389 static inline size_t on_wait(exp_backoff_then_block_lock & this) { unlock(this); return 0; }
    390 static inline void on_wakeup(exp_backoff_then_block_lock & this, size_t recursion ) { lock(this); }
     405DEFAULT_ON_NOTIFY( exp_backoff_then_block_lock )
     406DEFAULT_ON_WAIT( exp_backoff_then_block_lock )
     407DEFAULT_ON_WAKEUP_REACQ( exp_backoff_then_block_lock )
    391408
    392409//-----------------------------------------------------------------------------
     
    418435
    419436// if this is called recursively IT WILL DEADLOCK!!!!!
    420 static inline void lock(fast_block_lock & this) with(this) {
     437static inline void lock( fast_block_lock & this ) with(this) {
    421438        lock( lock __cfaabi_dbg_ctx2 );
    422439        if ( held ) {
     
    430447}
    431448
    432 static inline void unlock(fast_block_lock & this) with(this) {
     449static inline void unlock( fast_block_lock & this ) with(this) {
    433450        lock( lock __cfaabi_dbg_ctx2 );
    434451        /* paranoid */ verifyf( held != false, "Attempt to release lock %p that isn't held", &this );
     
    439456}
    440457
    441 static inline void on_notify(fast_block_lock & this, struct thread$ * t ) with(this) {
     458static inline void on_notify( fast_block_lock & this, struct thread$ * t ) with(this) {
    442459    lock( lock __cfaabi_dbg_ctx2 );
    443460    insert_last( blocked_threads, *t );
    444461    unlock( lock );
    445462}
    446 static inline size_t on_wait(fast_block_lock & this) { unlock(this); return 0; }
    447 static inline void on_wakeup(fast_block_lock & this, size_t recursion ) { }
     463DEFAULT_ON_WAIT( fast_block_lock )
     464DEFAULT_ON_WAKEUP_NO_REACQ( fast_block_lock )
    448465
    449466//-----------------------------------------------------------------------------
     
    456473struct simple_owner_lock {
    457474        // List of blocked threads
    458         dlist( thread$ ) blocked_threads;
     475        dlist( select_node ) blocked_threads;
    459476
    460477        // Spin lock used for mutual exclusion
     
    477494static inline void ?=?( simple_owner_lock & this, simple_owner_lock this2 ) = void;
    478495
    479 static inline void lock(simple_owner_lock & this) with(this) {
    480         if (owner == active_thread()) {
     496static inline void lock( simple_owner_lock & this ) with(this) {
     497        if ( owner == active_thread() ) {
    481498                recursion_count++;
    482499                return;
     
    484501        lock( lock __cfaabi_dbg_ctx2 );
    485502
    486         if (owner != 0p) {
    487                 insert_last( blocked_threads, *active_thread() );
     503        if ( owner != 0p ) {
     504        select_node node;
     505                insert_last( blocked_threads, node );
    488506                unlock( lock );
    489507                park( );
     
    495513}
    496514
    497 // TODO: fix duplicate def issue and bring this back
    498 // void pop_and_set_new_owner( simple_owner_lock & this ) with( this ) {
    499         // thread$ * t = &try_pop_front( blocked_threads );
    500         // owner = t;
    501         // recursion_count = ( t ? 1 : 0 );
    502         // unpark( t );
    503 // }
    504 
    505 static inline void unlock(simple_owner_lock & this) with(this) {
     515static inline void pop_node( simple_owner_lock & this ) with(this) {
     516    __handle_waituntil_OR( blocked_threads );
     517    select_node * node = &try_pop_front( blocked_threads );
     518    if ( node ) {
     519        owner = node->blocked_thread;
     520        recursion_count = 1;
     521        // if ( !node->clause_status || __make_select_node_available( *node ) ) unpark( node->blocked_thread );
     522        wake_one( blocked_threads, *node );
     523    } else {
     524        owner = 0p;
     525        recursion_count = 0;
     526    }
     527}
     528
     529static inline void unlock( simple_owner_lock & this ) with(this) {
    506530        lock( lock __cfaabi_dbg_ctx2 );
    507531        /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
     
    510534        recursion_count--;
    511535        if ( recursion_count == 0 ) {
    512                 // pop_and_set_new_owner( this );
    513                 thread$ * t = &try_pop_front( blocked_threads );
    514                 owner = t;
    515                 recursion_count = ( t ? 1 : 0 );
    516                 unpark( t );
     536                pop_node( this );
    517537        }
    518538        unlock( lock );
    519539}
    520540
    521 static inline void on_notify(simple_owner_lock & this, struct thread$ * t ) with(this) {
     541static inline void on_notify( simple_owner_lock & this, thread$ * t ) with(this) {
    522542        lock( lock __cfaabi_dbg_ctx2 );
    523543        // lock held
    524544        if ( owner != 0p ) {
    525                 insert_last( blocked_threads, *t );
     545                insert_last( blocked_threads, *(select_node *)t->link_node );
    526546        }
    527547        // lock not held
     
    534554}
    535555
    536 static inline size_t on_wait(simple_owner_lock & this) with(this) {
     556static inline size_t on_wait( simple_owner_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) with(this) {
    537557        lock( lock __cfaabi_dbg_ctx2 );
    538558        /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
     
    541561        size_t ret = recursion_count;
    542562
    543         // pop_and_set_new_owner( this );
    544 
    545         thread$ * t = &try_pop_front( blocked_threads );
    546         owner = t;
    547         recursion_count = ( t ? 1 : 0 );
    548         unpark( t );
    549 
     563        pop_node( this );
     564
     565    select_node node;
     566    active_thread()->link_node = (void *)&node;
    550567        unlock( lock );
     568
     569    pre_park_then_park( pp_fn, pp_datum );
     570
    551571        return ret;
    552572}
    553573
    554 static inline void on_wakeup(simple_owner_lock & this, size_t recursion ) with(this) { recursion_count = recursion; }
     574static inline void on_wakeup( simple_owner_lock & this, size_t recursion ) with(this) { recursion_count = recursion; }
     575
     576// waituntil() support
     577static inline bool register_select( simple_owner_lock & this, select_node & node ) with(this) {
     578    lock( lock __cfaabi_dbg_ctx2 );
     579
     580    // check if we can complete operation. If so race to establish winner in special OR case
     581    if ( !node.park_counter && ( owner == active_thread() || owner == 0p ) ) {
     582        if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
     583           unlock( lock );
     584           return false;
     585        }
     586    }
     587
     588    if ( owner == active_thread() ) {
     589                recursion_count++;
     590        if ( node.park_counter ) __make_select_node_available( node );
     591        unlock( lock );
     592                return true;
     593        }
     594
     595    if ( owner != 0p ) {
     596                insert_last( blocked_threads, node );
     597                unlock( lock );
     598                return false;
     599        }
     600   
     601        owner = active_thread();
     602        recursion_count = 1;
     603
     604    if ( node.park_counter ) __make_select_node_available( node );
     605    unlock( lock );
     606    return true;
     607}
     608
     609static inline bool unregister_select( simple_owner_lock & this, select_node & node ) with(this) {
     610    lock( lock __cfaabi_dbg_ctx2 );
     611    if ( node`isListed ) {
     612        remove( node );
     613        unlock( lock );
     614        return false;
     615    }
     616
     617    if ( owner == active_thread() ) {
     618        recursion_count--;
     619        if ( recursion_count == 0 ) {
     620            pop_node( this );
     621        }
     622    }
     623    unlock( lock );
     624    return false;
     625}
     626
     627static inline bool on_selected( simple_owner_lock & this, select_node & node ) { return true; }
     628
    555629
    556630//-----------------------------------------------------------------------------
     
    578652
    579653// if this is called recursively IT WILL DEADLOCK!
    580 static inline void lock(spin_queue_lock & this) with(this) {
     654static inline void lock( spin_queue_lock & this ) with(this) {
    581655        mcs_spin_node node;
    582656        lock( lock, node );
     
    586660}
    587661
    588 static inline void unlock(spin_queue_lock & this) with(this) {
     662static inline void unlock( spin_queue_lock & this ) with(this) {
    589663        __atomic_store_n(&held, false, __ATOMIC_RELEASE);
    590664}
    591665
    592 static inline void on_notify(spin_queue_lock & this, struct thread$ * t ) {
    593         unpark(t);
    594 }
    595 static inline size_t on_wait(spin_queue_lock & this) { unlock(this); return 0; }
    596 static inline void on_wakeup(spin_queue_lock & this, size_t recursion ) { lock(this); }
    597 
     666DEFAULT_ON_NOTIFY( spin_queue_lock )
     667DEFAULT_ON_WAIT( spin_queue_lock )
     668DEFAULT_ON_WAKEUP_REACQ( spin_queue_lock )
    598669
    599670//-----------------------------------------------------------------------------
     
    621692
    622693// if this is called recursively IT WILL DEADLOCK!!!!!
    623 static inline void lock(mcs_block_spin_lock & this) with(this) {
     694static inline void lock( mcs_block_spin_lock & this ) with(this) {
    624695        mcs_node node;
    625696        lock( lock, node );
     
    633704}
    634705
    635 static inline void on_notify(mcs_block_spin_lock & this, struct thread$ * t ) { unpark(t); }
    636 static inline size_t on_wait(mcs_block_spin_lock & this) { unlock(this); return 0; }
    637 static inline void on_wakeup(mcs_block_spin_lock & this, size_t recursion ) {lock(this); }
     706DEFAULT_ON_NOTIFY( mcs_block_spin_lock )
     707DEFAULT_ON_WAIT( mcs_block_spin_lock )
     708DEFAULT_ON_WAKEUP_REACQ( mcs_block_spin_lock )
    638709
    639710//-----------------------------------------------------------------------------
     
    661732
    662733// if this is called recursively IT WILL DEADLOCK!!!!!
    663 static inline void lock(block_spin_lock & this) with(this) {
     734static inline void lock( block_spin_lock & this ) with(this) {
    664735        lock( lock );
    665736        while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause();
     
    668739}
    669740
    670 static inline void unlock(block_spin_lock & this) with(this) {
     741static inline void unlock( block_spin_lock & this ) with(this) {
    671742        __atomic_store_n(&held, false, __ATOMIC_RELEASE);
    672743}
    673744
    674 static inline void on_notify(block_spin_lock & this, struct thread$ * t ) with(this.lock) {
     745static inline void on_notify( block_spin_lock & this, struct thread$ * t ) with(this.lock) {
    675746        // first we acquire internal fast_block_lock
    676747        lock( lock __cfaabi_dbg_ctx2 );
     
    686757        unpark(t);
    687758}
    688 static inline size_t on_wait(block_spin_lock & this) { unlock(this); return 0; }
    689 static inline void on_wakeup(block_spin_lock & this, size_t recursion ) with(this) {
     759DEFAULT_ON_WAIT( block_spin_lock )
     760static inline void on_wakeup( block_spin_lock & this, size_t recursion ) with(this) {
    690761        // now we acquire the entire block_spin_lock upon waking up
    691762        while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause();
     
    693764        unlock( lock ); // Now we release the internal fast_spin_lock
    694765}
    695 
    696 //-----------------------------------------------------------------------------
    697 // is_blocking_lock
    698 forall( L & | sized(L) )
    699 trait is_blocking_lock {
    700         // For synchronization locks to use when acquiring
    701         void on_notify( L &, struct thread$ * );
    702 
    703         // For synchronization locks to use when releasing
    704         size_t on_wait( L & );
    705 
    706         // to set recursion count after getting signalled;
    707         void on_wakeup( L &, size_t recursion );
    708 };
    709766
    710767//-----------------------------------------------------------------------------
     
    714771forall(L & | is_blocking_lock(L)) {
    715772        struct info_thread;
    716 
    717         // // for use by sequence
    718         // info_thread(L) *& Back( info_thread(L) * this );
    719         // info_thread(L) *& Next( info_thread(L) * this );
    720773}
    721774
  • libcfa/src/concurrency/mutex_stmt.hfa

    r6e4c44d r3982384  
    1515};
    1616
    17 
    1817struct __mutex_stmt_lock_guard {
    1918    void ** lockarr;
     
    3029
    3130forall(L & | is_lock(L)) {
    32 
    33     struct scoped_lock {
    34         L * internal_lock;
    35     };
    36 
    37     static inline void ?{}( scoped_lock(L) & this, L & internal_lock ) {
    38         this.internal_lock = &internal_lock;
    39         lock(internal_lock);
    40     }
    41    
    42     static inline void ^?{}( scoped_lock(L) & this ) with(this) {
    43         unlock(*internal_lock);
    44     }
    45 
    46     static inline void * __get_mutexstmt_lock_ptr( L & this ) {
    47         return &this;
    48     }
    49 
    50     static inline L __get_mutexstmt_lock_type( L & this );
    51 
    52     static inline L __get_mutexstmt_lock_type( L * this );
     31    static inline void * __get_mutexstmt_lock_ptr( L & this ) { return &this; }
     32    static inline L __get_mutexstmt_lock_type( L & this ) {}
     33    static inline L __get_mutexstmt_lock_type( L * this ) {}
    5334}
  • libcfa/src/concurrency/preemption.cfa

    r6e4c44d r3982384  
    117117                __cfadbg_print_buffer_decl( preemption, " KERNEL: preemption tick %lu\n", currtime.tn);
    118118                Duration period = node->period;
    119                 if( period == 0) {
     119                if( period == 0 ) {
    120120                        node->set = false;                  // Node is one-shot, just mark it as not pending
    121121                }
  • libcfa/src/concurrency/select.hfa

    r6e4c44d r3982384  
     1//
     2// Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
     3//
     4// The contents of this file are covered under the licence agreement in the
     5// file "LICENCE" distributed with Cforall.
     6//
     7// channel.hfa -- LIBCFATHREAD
     8// Runtime locks that used with the runtime thread system.
     9//
     10// Author           : Colby Alexander Parsons
     11// Created On       : Thu Jan 21 19:46:50 2023
     12// Last Modified By :
     13// Last Modified On :
     14// Update Count     :
     15//
     16
    117#pragma once
    218
    319#include "containers/list.hfa"
    4 #include <stdint.h>
    5 #include <kernel.hfa>
    6 #include <locks.hfa>
     20#include "alarm.hfa"
     21#include "kernel.hfa"
     22#include "time.hfa"
    723
     24struct select_node;
     25
     26// node status
     27static const unsigned long int __SELECT_UNSAT = 0;
     28static const unsigned long int __SELECT_PENDING = 1; // used only by special OR case
     29static const unsigned long int __SELECT_SAT = 2;
     30static const unsigned long int __SELECT_RUN = 3;
     31
     32// these are used inside the compiler to aid in code generation
     33static inline bool __CFA_has_clause_run( unsigned long int status ) { return status == __SELECT_RUN; }
     34static inline void __CFA_maybe_park( int * park_counter ) {
     35    if ( __atomic_sub_fetch( park_counter, 1, __ATOMIC_SEQ_CST) < 0 )
     36        park();
     37}
     38
     39// node used for coordinating waituntil synchronization
    840struct select_node {
     41    int * park_counter;                 // If this is 0p then the node is in a special OR case waituntil
     42    unsigned long int * clause_status;  // needs to point at ptr sized location, if this is 0p then node is not part of a waituntil
     43
     44    void * extra;                       // used to store arbitrary data needed by some primitives
     45
    946    thread$ * blocked_thread;
    10     void ** race_flag;
    1147    inline dlink(select_node);
    1248};
    1349P9_EMBEDDED( select_node, dlink(select_node) )
    1450
    15 void ?{}( select_node & this ) {
    16     this.blocked_thread = 0p;
    17     this.race_flag = 0p;
     51static inline void ?{}( select_node & this ) {
     52    this.blocked_thread = active_thread();
     53    this.clause_status = 0p;
     54    this.park_counter = 0p;
     55    this.extra = 0p;
    1856}
    1957
    20 void ?{}( select_node & this, thread$ * blocked_thread ) {
     58static inline void ?{}( select_node & this, thread$ * blocked_thread ) {
    2159    this.blocked_thread = blocked_thread;
    22     this.race_flag = 0p;
     60    this.clause_status = 0p;
     61    this.park_counter = 0p;
     62    this.extra = 0p;
    2363}
    2464
    25 void ?{}( select_node & this, thread$ * blocked_thread, void ** race_flag ) {
     65static inline void ?{}( select_node & this, thread$ * blocked_thread, void * extra ) {
    2666    this.blocked_thread = blocked_thread;
    27     this.race_flag = race_flag;
     67    this.clause_status = 0p;
     68    this.park_counter = 0p;
     69    this.extra = extra;
    2870}
     71static inline void ^?{}( select_node & this ) {}
    2972
    30 void ^?{}( select_node & this ) {}
     73// this is used inside the compiler to aid in code generation
     74static inline unsigned long int * __get_clause_status( select_node & s ) { return s.clause_status; }
    3175
     76// this is used inside the compiler to attempt to establish an else clause as a winner in the OR special case race
     77static inline bool __select_node_else_race( select_node & this ) with( this ) {
     78    unsigned long int cmp_status = __SELECT_UNSAT;
     79    return *clause_status == 0
     80            && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST );
     81}
    3282
    3383//-----------------------------------------------------------------------------
    3484// is_selectable
    35 trait is_selectable(T & | sized(T)) {
    36     // For registering a select on a selectable concurrency primitive
    37     // return 0p if primitive not accessible yet
    38     // return 1p if primitive gets acquired
    39     // return 2p if primitive is accessible but some other primitive won the race
    40     // C_TODO: add enum for return values
    41     void * register_select( T &, select_node & );
     85forall(T & | sized(T))
     86trait is_selectable {
     87    // For registering a select stmt on a selectable concurrency primitive
     88    // Returns bool that indicates if operation is already SAT
     89    bool register_select( T &, select_node & );
    4290
    43     void unregister_select( T &, select_node &  );
     91    // For unregistering a select stmt on a selectable concurrency primitive
     92    // If true is returned then the corresponding code block is run (only in non-special OR case and only if node status is not RUN)
     93    bool unregister_select( T &, select_node &  );
     94
     95    // This routine is run on the selecting thread prior to executing the statement corresponding to the select_node
     96    //    passed as an arg to this routine
     97    // If on_selected returns false, the statement is not run, if it returns true it is run.
     98    bool on_selected( T &, select_node & );
    4499};
    45100
    46 static inline bool install_select_winner( select_node & this, void * primitive_ptr ) with(this) {
    47     // temporary needed for atomic instruction
    48     void * cmp_flag = 0p;
    49    
    50     // if we dont win the selector race we need to potentially
    51     //   ignore this node and move to the next one so we return accordingly
    52     if ( *race_flag != 0p ||
    53         !__atomic_compare_exchange_n(
    54             race_flag,
    55             &cmp_flag,
    56             primitive_ptr,
    57             false,
    58             __ATOMIC_SEQ_CST,
    59             __ATOMIC_SEQ_CST
    60         )
    61     ) return false; // lost race and some other node triggered select
    62     return true; // won race so this node is what the select proceeds with
     101//=============================================================================================
     102// Waituntil Helpers
     103//=============================================================================================
     104
     105// used for the 2-stage avail needed by the special OR case
     106static inline bool __mark_select_node( select_node & this, unsigned long int val ) with( this ) {
     107    /* paranoid */ verify( park_counter == 0p );
     108    /* paranoid */ verify( clause_status != 0p );
     109
     110    unsigned long int cmp_status = __SELECT_UNSAT;
     111    while( !__atomic_compare_exchange_n( clause_status, &cmp_status, val, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {
     112        if ( cmp_status != __SELECT_PENDING ) return false;
     113        cmp_status = __SELECT_UNSAT;
     114    }
     115    return true;
    63116}
     117
     118static inline void __make_select_node_unsat( select_node & this ) with( this ) {
     119    __atomic_store_n( clause_status, __SELECT_UNSAT, __ATOMIC_SEQ_CST );
     120}
     121static inline void __make_select_node_sat( select_node & this ) with( this ) {
     122    __atomic_store_n( clause_status, __SELECT_SAT, __ATOMIC_SEQ_CST );
     123}
     124
     125static inline bool __make_select_node_pending( select_node & this ) with( this ) {
     126    return __mark_select_node( this, __SELECT_PENDING );
     127}
     128
     129// when a primitive becomes available it calls the following routine on it's node to update the select state:
     130// return true if we want to unpark the thd
     131static inline bool __make_select_node_available( select_node & this ) with( this ) {
     132    /* paranoid */ verify( clause_status != 0p );
     133    if( !park_counter )
     134        return __mark_select_node( this, (unsigned long int)&this );
     135
     136    unsigned long int cmp_status = __SELECT_UNSAT;
     137
     138    return *clause_status == 0 // C_TODO might not need a cmp_xchg in non special OR case
     139        && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) // can maybe just use atomic write
     140        && !__atomic_add_fetch( park_counter, 1, __ATOMIC_SEQ_CST);
     141}
     142
     143// Handles the special OR case of the waituntil statement
     144// Since only one select node can win in the OR case, we need to race to set the node available BEFORE
     145//    performing the operation since if we lose the race the operation should not be performed as it will be lost
     146// Returns true if execution can continue normally and false if the queue has now been drained
     147static inline bool __handle_waituntil_OR( dlist( select_node ) & queue ) {
     148    if ( queue`isEmpty ) return false;
     149    if ( queue`first.clause_status && !queue`first.park_counter ) {
     150        while ( !queue`isEmpty ) {
     151            // if node not a special OR case or if we win the special OR case race break
     152            if ( !queue`first.clause_status || queue`first.park_counter || __make_select_node_available( queue`first ) )
     153                return true;
     154            // otherwise we lost the special OR race so discard node
     155            try_pop_front( queue );
     156        }
     157        return false;
     158    }
     159    return true;
     160}
     161
     162// wake one thread from the list
     163static inline void wake_one( dlist( select_node ) & queue, select_node & popped ) {
     164    if ( !popped.clause_status                              // normal case, node is not a select node
     165        || ( popped.clause_status && !popped.park_counter ) // If popped link is special case OR selecting unpark but don't call __make_select_node_available
     166        || __make_select_node_available( popped ) )         // check if popped link belongs to a selecting thread
     167        unpark( popped.blocked_thread );
     168}
     169
     170static inline void wake_one( dlist( select_node ) & queue ) { wake_one( queue, try_pop_front( queue ) ); }
     171
     172static inline void setup_clause( select_node & this, unsigned long int * clause_status, int * park_counter ) {
     173    this.blocked_thread = active_thread();
     174    this.clause_status = clause_status;
     175    this.park_counter = park_counter;
     176}
     177
     178// waituntil ( timeout( ... ) ) support
     179struct select_timeout_node {
     180    alarm_node_t a_node;
     181    select_node * s_node;
     182};
     183void ?{}( select_timeout_node & this, Duration duration, Alarm_Callback callback );
     184void ^?{}( select_timeout_node & this );
     185void timeout_handler_select_cast( alarm_node_t & node );
     186
     187// Selectable trait routines
     188bool register_select( select_timeout_node & this, select_node & node );
     189bool unregister_select( select_timeout_node & this, select_node & node );
     190bool on_selected( select_timeout_node & this, select_node & node );
     191
     192// Gateway routines to waituntil on duration
     193select_timeout_node timeout( Duration duration );
     194select_timeout_node sleep( Duration duration );
  • libcfa/src/concurrency/thread.cfa

    r6e4c44d r3982384  
    5353        preferred = ready_queue_new_preferred();
    5454        last_proc = 0p;
     55    link_node = 0p;
    5556        PRNG_SET_SEED( random_state, __global_random_mask ? __global_random_prime : __global_random_prime ^ rdtscl() );
    5657        #if defined( __CFA_WITH_VERIFY__ )
     
    5960        #endif
    6061
    61         clh_node = malloc( );
    62         *clh_node = false;
    63 
    6462        doregister(curr_cluster, this);
    6563        monitors{ &self_mon_p, 1, (fptr_t)0 };
     
    7068                canary = 0xDEADDEADDEADDEADp;
    7169        #endif
    72         free(clh_node);
    7370        unregister(curr_cluster, this);
    7471        ^self_cor{};
Note: See TracChangeset for help on using the changeset viewer.