Changeset 044ae62 for libcfa


Ignore:
Timestamp:
May 29, 2023, 11:44:29 AM (3 years ago)
Author:
JiadaL <j82liang@…>
Branches:
ADT
Children:
fa2c005
Parents:
3a513d89 (diff), 2b78949 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' into ADT

Location:
libcfa/src
Files:
1 added
12 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/Makefile.am

    r3a513d89 r044ae62  
    115115        concurrency/kernel/fwd.hfa \
    116116        concurrency/mutex_stmt.hfa \
    117     concurrency/select.hfa \
    118117    concurrency/channel.hfa \
    119118    concurrency/actor.hfa
     
    128127        concurrency/monitor.hfa \
    129128        concurrency/mutex.hfa \
     129    concurrency/select.hfa \
    130130        concurrency/thread.hfa
    131131
  • libcfa/src/bits/weakso_locks.cfa

    r3a513d89 r044ae62  
    1515// Update Count     :
    1616//
    17 
    1817#include "bits/weakso_locks.hfa"
    19 
    2018#pragma GCC visibility push(default)
    2119
     
    2725void unlock( blocking_lock & ) {}
    2826void on_notify( blocking_lock &, struct thread$ * ) {}
    29 size_t on_wait( blocking_lock & ) { return 0; }
     27size_t on_wait( blocking_lock &, void (*pp_fn)( void * ), void * pp_datum ) { return 0; }
    3028void on_wakeup( blocking_lock &, size_t ) {}
    3129size_t wait_count( blocking_lock & ) { return 0; }
     30bool register_select( blocking_lock & this, select_node & node ) { return false; }
     31bool unregister_select( blocking_lock & this, select_node & node ) { return false; }
     32bool on_selected( blocking_lock & this, select_node & node ) { return true; }
     33
  • libcfa/src/bits/weakso_locks.hfa

    r3a513d89 r044ae62  
    2323#include "containers/list.hfa"
    2424
    25 struct thread$;
     25struct select_node;
    2626
    2727//-----------------------------------------------------------------------------
     
    3232
    3333        // List of blocked threads
    34         dlist( thread$ ) blocked_threads;
     34        dlist( select_node ) blocked_threads;
    3535
    3636        // Count of current blocked threads
     
    5757void unlock( blocking_lock & this ) OPTIONAL_THREAD;
    5858void on_notify( blocking_lock & this, struct thread$ * t ) OPTIONAL_THREAD;
    59 size_t on_wait( blocking_lock & this ) OPTIONAL_THREAD;
     59size_t on_wait( blocking_lock & this, void (*pp_fn)( void * ), void * pp_datum ) OPTIONAL_THREAD;
    6060void on_wakeup( blocking_lock & this, size_t ) OPTIONAL_THREAD;
    6161size_t wait_count( blocking_lock & this ) OPTIONAL_THREAD;
     62bool register_select( blocking_lock & this, select_node & node ) OPTIONAL_THREAD;
     63bool unregister_select( blocking_lock & this, select_node & node ) OPTIONAL_THREAD;
     64bool on_selected( blocking_lock & this, select_node & node ) OPTIONAL_THREAD;
    6265
    6366//----------
     
    7275static inline bool   try_lock ( multiple_acquisition_lock & this ) { return try_lock( (blocking_lock &)this ); }
    7376static inline void   unlock   ( multiple_acquisition_lock & this ) { unlock  ( (blocking_lock &)this ); }
    74 static inline size_t on_wait  ( multiple_acquisition_lock & this ) { return on_wait ( (blocking_lock &)this ); }
     77static inline size_t on_wait  ( multiple_acquisition_lock & this, void (*pp_fn)( void * ), void * pp_datum ) { return on_wait ( (blocking_lock &)this, pp_fn, pp_datum ); }
    7578static inline void   on_wakeup( multiple_acquisition_lock & this, size_t v ) { on_wakeup ( (blocking_lock &)this, v ); }
    7679static inline void   on_notify( multiple_acquisition_lock & this, struct thread$ * t ){ on_notify( (blocking_lock &)this, t ); }
     80static inline bool   register_select( multiple_acquisition_lock & this, select_node & node ) { return register_select( (blocking_lock &)this, node ); }
     81static inline bool   unregister_select( multiple_acquisition_lock & this, select_node & node ) { return unregister_select( (blocking_lock &)this, node ); }
     82static inline bool   on_selected( multiple_acquisition_lock & this, select_node & node ) { return on_selected( (blocking_lock &)this, node ); }
  • libcfa/src/concurrency/channel.hfa

    r3a513d89 r044ae62  
     1//
     2// Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
     3//
     4// The contents of this file are covered under the licence agreement in the
     5// file "LICENCE" distributed with Cforall.
     6//
     7// channel.hfa -- LIBCFATHREAD
     8// Runtime locks that used with the runtime thread system.
     9//
     10// Author           : Colby Alexander Parsons
     11// Created On       : Thu Jan 21 19:46:50 2022
     12// Last Modified By :
     13// Last Modified On :
     14// Update Count     :
     15//
     16
    117#pragma once
    218
    319#include <locks.hfa>
    420#include <list.hfa>
    5 #include <mutex_stmt.hfa>
    6 
    7 // link field used for threads waiting on channel
    8 struct wait_link {
    9     // used to put wait_link on a dl queue
    10     inline dlink(wait_link);
    11 
    12     // waiting thread
    13     struct thread$ * t;
    14 
    15     // shadow field
    16     void * elem;
    17 };
    18 P9_EMBEDDED( wait_link, dlink(wait_link) )
    19 
    20 static inline void ?{}( wait_link & this, thread$ * t, void * elem ) {
    21     this.t = t;
    22     this.elem = elem;
    23 }
    24 
    25 // wake one thread from the list
    26 static inline void wake_one( dlist( wait_link ) & queue ) {
    27     wait_link & popped = try_pop_front( queue );
    28     unpark( popped.t );
    29 }
     21#include "select.hfa"
    3022
    3123// returns true if woken due to shutdown
    3224// blocks thread on list and releases passed lock
    33 static inline bool block( dlist( wait_link ) & queue, void * elem_ptr, go_mutex & lock ) {
    34     wait_link w{ active_thread(), elem_ptr };
    35     insert_last( queue, w );
     25static inline bool block( dlist( select_node ) & queue, void * elem_ptr, go_mutex & lock ) {
     26    select_node sn{ active_thread(), elem_ptr };
     27    insert_last( queue, sn );
    3628    unlock( lock );
    3729    park();
    38     return w.elem == 0p;
     30    return sn.extra == 0p;
     31}
     32
     33// Waituntil support (un)register_select helper routine
     34// Sets select node avail if not special OR case and then unlocks
     35static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) {
     36    if ( node.park_counter ) __make_select_node_available( node );
     37    unlock( mutex_lock );
    3938}
    4039
     
    5958    size_t size, front, back, count;
    6059    T * buffer;
    61     dlist( wait_link ) prods, cons; // lists of blocked threads
    62     go_mutex mutex_lock;            // MX lock
    63     bool closed;                    // indicates channel close/open
    64     #ifdef CHAN_STATS
    65     size_t blocks, operations;      // counts total ops and ops resulting in a blocked thd
     60    dlist( select_node ) prods, cons; // lists of blocked threads
     61    go_mutex mutex_lock;              // MX lock
     62    bool closed;                      // indicates channel close/open
     63    #ifdef CHAN_STATS
     64    size_t p_blocks, p_ops, c_blocks, c_ops;      // counts total ops and ops resulting in a blocked thd
    6665    #endif
    6766};
     
    7069    size = _size;
    7170    front = back = count = 0;
    72     buffer = aalloc( size );
     71    if ( size != 0 ) buffer = aalloc( size );
    7372    prods{};
    7473    cons{};
     
    7675    closed = false;
    7776    #ifdef CHAN_STATS
    78     blocks = 0;
    79     operations = 0;
     77    p_blocks = 0;
     78    p_ops = 0;
     79    c_blocks = 0;
     80    c_ops = 0;
    8081    #endif
    8182}
     
    8485static inline void ^?{}( channel(T) &c ) with(c) {
    8586    #ifdef CHAN_STATS
    86     printf("Channel %p Blocks: %lu, Operations: %lu, %.2f%% of ops blocked\n", &c, blocks, operations, ((double)blocks)/operations * 100);
    87     #endif
    88     verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" );
    89     delete( buffer );
    90 }
    91 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    92 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
     87    printf("Channel %p Blocks: %lu,\t\tOperations: %lu,\t%.2f%% of ops blocked\n", &c, p_blocks + c_blocks, p_ops + c_ops, ((double)p_blocks + c_blocks)/(p_ops + c_ops) * 100);
     88    printf("Channel %p Consumer Blocks: %lu,\tConsumer Ops: %lu,\t%.2f%% of Consumer ops blocked\n", &c, p_blocks, p_ops, ((double)p_blocks)/p_ops * 100);
     89    printf("Channel %p Producer Blocks: %lu,\tProducer Ops: %lu,\t%.2f%% of Producer ops blocked\n", &c, c_blocks, c_ops, ((double)c_blocks)/c_ops * 100);
     90    #endif
     91    verifyf( __handle_waituntil_OR( cons ) || __handle_waituntil_OR( prods ) || cons`isEmpty && prods`isEmpty,
     92        "Attempted to delete channel with waiting threads (Deadlock).\n" );
     93    if ( size != 0 ) delete( buffer );
     94}
     95static inline size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); }
     96static inline size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); }
    9397static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
    9498static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
     
    102106    // flush waiting consumers and producers
    103107    while ( has_waiting_consumers( chan ) ) {
    104         cons`first.elem = 0p;
     108        if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race
     109            break;  // if __handle_waituntil_OR returns false cons is empty so break
     110        cons`first.extra = 0p;
    105111        wake_one( cons );
    106112    }
    107113    while ( has_waiting_producers( chan ) ) {
    108         prods`first.elem = 0p;
     114        if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race
     115            break;  // if __handle_waituntil_OR returns false prods is empty so break
     116        prods`first.extra = 0p;
    109117        wake_one( prods );
    110118    }
     
    114122static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
    115123
     124// used to hand an element to a blocked consumer and signal it
     125static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {
     126    memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work
     127    wake_one( cons );
     128}
     129
     130// used to hand an element to a blocked producer and signal it
     131static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {
     132    memcpy( (void *)&retval, prods`first.extra, sizeof(T) );
     133    wake_one( prods );
     134}
     135
    116136static inline void flush( channel(T) & chan, T elem ) with(chan) {
    117137    lock( mutex_lock );
    118138    while ( count == 0 && !cons`isEmpty ) {
    119         memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    120         wake_one( cons );
     139        __cons_handoff( chan, elem );
    121140    }
    122141    unlock( mutex_lock );
     
    125144// handles buffer insert
    126145static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
    127     memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
     146    memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) );
    128147    count += 1;
    129148    back++;
     
    131150}
    132151
    133 // does the buffer insert or hands elem directly to consumer if one is waiting
    134 static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) {
    135     if ( count == 0 && !cons`isEmpty ) {
    136         memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    137         wake_one( cons );
    138     } else __buf_insert( chan, elem );
    139 }
    140 
    141152// needed to avoid an extra copy in closed case
    142153static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
    143154    lock( mutex_lock );
    144155    #ifdef CHAN_STATS
    145     operations++;
    146     #endif
     156    p_ops++;
     157    #endif
     158
     159    ConsEmpty: if ( !cons`isEmpty ) {
     160        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
     161        __cons_handoff( chan, elem );
     162        unlock( mutex_lock );
     163        return true;
     164    }
     165
    147166    if ( count == size ) { unlock( mutex_lock ); return false; }
    148     __do_insert( chan, elem );
     167
     168    __buf_insert( chan, elem );
    149169    unlock( mutex_lock );
    150170    return true;
     
    157177// handles closed case of insert routine
    158178static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
    159     channel_closed except{&channel_closed_vt, &elem, &chan };
     179    channel_closed except{ &channel_closed_vt, &elem, &chan };
    160180    throwResume except; // throw closed resumption
    161181    if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
     
    172192
    173193    #ifdef CHAN_STATS
    174     if ( !closed ) operations++;
     194    if ( !closed ) p_ops++;
    175195    #endif
    176196
     
    182202    }
    183203
    184     // have to check for the zero size channel case
    185     if ( size == 0 && !cons`isEmpty ) {
    186         memcpy(cons`first.elem, (void *)&elem, sizeof(T));
    187         wake_one( cons );
    188         unlock( mutex_lock );
    189         return true;
     204    // buffer count must be zero if cons are blocked (also handles zero-size case)
     205    ConsEmpty: if ( !cons`isEmpty ) {
     206        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
     207        __cons_handoff( chan, elem );
     208        unlock( mutex_lock );
     209        return;
    190210    }
    191211
     
    193213    if ( count == size ) {
    194214        #ifdef CHAN_STATS
    195         blocks++;
     215        p_blocks++;
    196216        #endif
    197217
     
    202222    } // if
    203223
    204     if ( count == 0 && !cons`isEmpty ) {
    205         memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    206         wake_one( cons );
    207     } else __buf_insert( chan, elem );
    208    
    209     unlock( mutex_lock );
    210     return;
    211 }
    212 
    213 // handles buffer remove
    214 static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) {
    215     memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
     224    __buf_insert( chan, elem );
     225    unlock( mutex_lock );
     226}
     227
     228// does the buffer remove and potentially does waiting producer work
     229static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
     230    memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) );
    216231    count -= 1;
    217232    front = (front + 1) % size;
    218 }
    219 
    220 // does the buffer remove and potentially does waiting producer work
    221 static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
    222     __buf_remove( chan, retval );
    223233    if (count == size - 1 && !prods`isEmpty ) {
    224         __buf_insert( chan, *(T *)prods`first.elem );  // do waiting producer work
     234        if ( !__handle_waituntil_OR( prods ) ) return;
     235        __buf_insert( chan, *(T *)prods`first.extra );  // do waiting producer work
    225236        wake_one( prods );
    226237    }
     
    231242    lock( mutex_lock );
    232243    #ifdef CHAN_STATS
    233     operations++;
    234     #endif
     244    c_ops++;
     245    #endif
     246
     247    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
     248        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
     249        __prods_handoff( chan, retval );
     250        unlock( mutex_lock );
     251        return true;
     252    }
     253
    235254    if ( count == 0 ) { unlock( mutex_lock ); return false; }
     255
    236256    __do_remove( chan, retval );
    237257    unlock( mutex_lock );
     
    244264static inline [T, bool] try_remove( channel(T) & chan ) {
    245265    T retval;
    246     return [ retval, __internal_try_remove( chan, retval ) ];
    247 }
    248 
    249 static inline T try_remove( channel(T) & chan, T elem ) {
     266    bool success = __internal_try_remove( chan, retval );
     267    return [ retval, success ];
     268}
     269
     270static inline T try_remove( channel(T) & chan ) {
    250271    T retval;
    251272    __internal_try_remove( chan, retval );
     
    255276// handles closed case of insert routine
    256277static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
    257     channel_closed except{&channel_closed_vt, 0p, &chan };
     278    channel_closed except{ &channel_closed_vt, 0p, &chan };
    258279    throwResume except; // throw resumption
    259280    if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
     
    269290
    270291    #ifdef CHAN_STATS
    271     if ( !closed ) operations++;
     292    if ( !closed ) c_ops++;
    272293    #endif
    273294
     
    279300
    280301    // have to check for the zero size channel case
    281     if ( size == 0 && !prods`isEmpty ) {
    282         memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T));
    283         wake_one( prods );
     302    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
     303        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
     304        __prods_handoff( chan, retval );
    284305        unlock( mutex_lock );
    285306        return retval;
     
    287308
    288309    // wait if buffer is empty, work will be completed by someone else
    289     if (count == 0) {
     310    if ( count == 0 ) {
    290311        #ifdef CHAN_STATS
    291         blocks++;
     312        c_blocks++;
    292313        #endif
    293314        // check for if woken due to close
     
    299320    // Remove from buffer
    300321    __do_remove( chan, retval );
    301 
    302322    unlock( mutex_lock );
    303323    return retval;
    304324}
     325
     326///////////////////////////////////////////////////////////////////////////////////////////
     327// The following is support for waituntil (select) statements
     328///////////////////////////////////////////////////////////////////////////////////////////
     329static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) {
     330    if ( !node`isListed && !node.park_counter ) return false; // handle special OR case
     331    lock( mutex_lock );
     332    if ( node`isListed ) { // op wasn't performed
     333        remove( node );
     334        unlock( mutex_lock );
     335        return false;
     336    }
     337    unlock( mutex_lock );
     338
     339    // only return true when not special OR case, not exceptional calse and status is SAT
     340    return ( node.extra == 0p || !node.park_counter ) ? false : *node.clause_status == __SELECT_SAT;
     341}
     342
     343// type used by select statement to capture a chan read as the selected operation
     344struct chan_read {
     345    T & ret;
     346    channel(T) & chan;
     347};
     348
     349static inline void ?{}( chan_read(T) & cr, channel(T) & chan, T & ret ) {
     350    &cr.chan = &chan;
     351    &cr.ret = &ret;
     352}
     353static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ chan, ret }; return cr; }
     354
     355static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(this.chan, this) {
     356    __closed_remove( chan, ret );
     357    // if we get here then the insert succeeded
     358    __make_select_node_available( node );
     359}
     360
     361static inline bool register_select( chan_read(T) & this, select_node & node ) with(this.chan, this) {
     362    lock( mutex_lock );
     363    node.extra = &ret; // set .extra so that if it == 0p later in on_selected it is due to channel close
     364
     365    #ifdef CHAN_STATS
     366    if ( !closed ) c_ops++;
     367    #endif
     368
     369    if ( !node.park_counter ) {
     370        // are we special case OR and front of cons is also special case OR
     371        if ( !unlikely(closed) && !prods`isEmpty && prods`first.clause_status && !prods`first.park_counter ) {
     372            if ( !__make_select_node_pending( node ) ) {
     373                unlock( mutex_lock );
     374                return false;
     375            }
     376           
     377            if ( __handle_waituntil_OR( prods ) ) {
     378                __prods_handoff( chan, ret );
     379                __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
     380                unlock( mutex_lock );
     381                return true;
     382            }
     383            __make_select_node_unsat( node );
     384        }
     385        // check if we can complete operation. If so race to establish winner in special OR case
     386        if ( count != 0 || !prods`isEmpty || unlikely(closed) ) {
     387            if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
     388                unlock( mutex_lock );
     389                return false;
     390            }
     391        }
     392    }
     393
     394    if ( unlikely(closed) ) {
     395        unlock( mutex_lock );
     396        __handle_select_closed_read( this, node );
     397        return true;
     398    }
     399
     400    // have to check for the zero size channel case
     401    ZeroSize: if ( size == 0 && !prods`isEmpty ) {
     402        if ( !__handle_waituntil_OR( prods ) ) break ZeroSize;
     403        __prods_handoff( chan, ret );
     404        __set_avail_then_unlock( node, mutex_lock );
     405        return true;
     406    }
     407
     408    // wait if buffer is empty, work will be completed by someone else
     409    if ( count == 0 ) {
     410        #ifdef CHAN_STATS
     411        c_blocks++;
     412        #endif
     413       
     414        insert_last( cons, node );
     415        unlock( mutex_lock );
     416        return false;
     417    }
     418
     419    // Remove from buffer
     420    __do_remove( chan, ret );
     421    __set_avail_then_unlock( node, mutex_lock );
     422    return true;
     423}
     424static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
     425static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) {
     426    if ( node.extra == 0p ) // check if woken up due to closed channel
     427        __closed_remove( chan, ret );
     428    // This is only reachable if not closed or closed exception was handled
     429    return true;
     430}
     431
     432// type used by select statement to capture a chan write as the selected operation
     433struct chan_write {
     434    T elem;
     435    channel(T) & chan;
     436};
     437
     438static inline void ?{}( chan_write(T) & cw, channel(T) & chan, T elem ) {
     439    &cw.chan = &chan;
     440    memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) );
     441}
     442static inline chan_write(T) ?>>?( T elem, channel(T) & chan ) { chan_write(T) cw{ chan, elem }; return cw; }
     443
     444static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(this.chan, this) {
     445    __closed_insert( chan, elem );
     446    // if we get here then the insert succeeded
     447    __make_select_node_available( node );
     448}
     449
     450static inline bool register_select( chan_write(T) & this, select_node & node ) with(this.chan, this) {
     451    lock( mutex_lock );
     452    node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close
     453
     454    #ifdef CHAN_STATS
     455    if ( !closed ) p_ops++;
     456    #endif
     457
     458    // special OR case handling
     459    if ( !node.park_counter ) {
     460        // are we special case OR and front of cons is also special case OR
     461        if ( !unlikely(closed) && !cons`isEmpty && cons`first.clause_status && !cons`first.park_counter ) {
     462            if ( !__make_select_node_pending( node ) ) {
     463                unlock( mutex_lock );
     464                return false;
     465            }
     466           
     467            if ( __handle_waituntil_OR( cons ) ) {
     468                __cons_handoff( chan, elem );
     469                __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
     470                unlock( mutex_lock );
     471                return true;
     472            }
     473            __make_select_node_unsat( node );
     474        }
     475        // check if we can complete operation. If so race to establish winner in special OR case
     476        if ( count != size || !cons`isEmpty || unlikely(closed) ) {
     477            if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
     478                unlock( mutex_lock );
     479                return false;
     480            }
     481        }
     482    }
     483
     484    // if closed handle
     485    if ( unlikely(closed) ) {
     486        unlock( mutex_lock );
     487        __handle_select_closed_write( this, node );
     488        return true;
     489    }
     490
     491    // handle blocked consumer case via handoff (buffer is implicitly empty)
     492    ConsEmpty: if ( !cons`isEmpty ) {
     493        if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty;
     494        __cons_handoff( chan, elem );
     495        __set_avail_then_unlock( node, mutex_lock );
     496        return true;
     497    }
     498
     499    // insert node in list if buffer is full, work will be completed by someone else
     500    if ( count == size ) {
     501        #ifdef CHAN_STATS
     502        p_blocks++;
     503        #endif
     504
     505        insert_last( prods, node );
     506        unlock( mutex_lock );
     507        return false;
     508    } // if
     509
     510    // otherwise carry out write either via normal insert
     511    __buf_insert( chan, elem );
     512    __set_avail_then_unlock( node, mutex_lock );
     513    return true;
     514}
     515static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
     516
     517static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) {
     518    if ( node.extra == 0p ) // check if woken up due to closed channel
     519        __closed_insert( chan, elem );
     520
     521    // This is only reachable if not closed or closed exception was handled
     522    return true;
     523}
     524
    305525} // forall( T )
     526
     527
  • libcfa/src/concurrency/future.hfa

    r3a513d89 r044ae62  
    1919#include "monitor.hfa"
    2020#include "select.hfa"
     21#include "locks.hfa"
    2122
    2223//----------------------------------------------------------------------------
     
    2627//  future_t is lockfree and uses atomics which aren't needed given we use locks here
    2728forall( T ) {
    28     // enum(int) { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 }; // Enums seem to be broken so feel free to add this back afterwards
     29    // enum { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 }; // Enums seem to be broken so feel free to add this back afterwards
    2930
    3031    // temporary enum replacement
     
    4445    };
    4546
    46     // C_TODO: perhaps allow exceptions to be inserted like uC++?
    47 
    4847        static inline {
    4948
     
    5352        }
    5453
    55         void ?{}(future(T) & this) {
     54        void ?{}( future(T) & this ) {
    5655                        this.waiters{};
    5756            this.state = FUTURE_EMPTY;
     
    6059
    6160                // Reset future back to original state
    62                 void reset(future(T) & this) with(this)
     61                void reset( future(T) & this ) with(this)
    6362        {
    6463            lock( lock );
     
    8281        void _internal_flush( future(T) & this ) with(this) {
    8382            while( ! waiters`isEmpty ) {
     83                if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case
     84                    break; // if handle_OR returns false then waiters is empty so break
    8485                select_node &s = try_pop_front( waiters );
    8586
    86                 if ( s.race_flag == 0p )
    87                     // poke in result so that woken threads do not need to reacquire any locks
    88                     // *(((future_node(T) &)s).my_result) = result;
     87                if ( s.clause_status == 0p ) // poke in result so that woken threads do not need to reacquire any locks
    8988                    copy_T( result, *(((future_node(T) &)s).my_result) );
    90                 else if ( !install_select_winner( s, &this ) ) continue;
    9189               
    92                 // only unpark if future is not selected
    93                 // or if it is selected we only unpark if we win the race
    94                 unpark( s.blocked_thread );
     90                wake_one( waiters, s );
    9591            }
    9692        }
    9793
    9894                // Fulfil the future, returns whether or not someone was unblocked
    99                 bool fulfil( future(T) & this, T & val ) with(this) {
     95                bool fulfil( future(T) & this, T val ) with(this) {
    10096            lock( lock );
    10197            if( state != FUTURE_EMPTY )
     
    153149        }
    154150
    155         void * register_select( future(T) & this, select_node & s ) with(this) {
    156             lock( lock );
    157 
    158             // future not ready -> insert select node and return 0p
     151        bool register_select( future(T) & this, select_node & s ) with(this) {
     152            lock( lock );
     153
     154            // check if we can complete operation. If so race to establish winner in special OR case
     155            if ( !s.park_counter && state != FUTURE_EMPTY ) {
     156                if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering
     157                    unlock( lock );
     158                    return false;
     159                }
     160            }
     161
     162            // future not ready -> insert select node and return
    159163            if( state == FUTURE_EMPTY ) {
    160164                insert_last( waiters, s );
    161165                unlock( lock );
    162                 return 0p;
    163             }
    164 
    165             // future ready and we won race to install it as the select winner return 1p
    166             if ( install_select_winner( s, &this ) ) {
    167                 unlock( lock );
    168                 return 1p;
    169             }
    170 
    171             unlock( lock );
    172             // future ready and we lost race to install it as the select winner
    173             return 2p;
    174         }
    175 
    176         void unregister_select( future(T) & this, select_node & s ) with(this) {
     166                return false;
     167            }
     168
     169            __make_select_node_available( s );
     170            unlock( lock );
     171            return true;
     172        }
     173
     174        bool unregister_select( future(T) & this, select_node & s ) with(this) {
     175            if ( ! s`isListed ) return false;
    177176            lock( lock );
    178177            if ( s`isListed ) remove( s );
    179178            unlock( lock );
     179            return false;
    180180        }
    181181               
     182        bool on_selected( future(T) & this, select_node & node ) { return true; }
    182183        }
    183184}
     
    186187// These futures below do not support select statements so they may not be as useful as 'future'
    187188//  however the 'single_future' is cheap and cheerful and is most likely more performant than 'future'
    188 //  since it uses raw atomics and no locks afaik
     189//  since it uses raw atomics and no locks
    189190//
    190191// As far as 'multi_future' goes I can't see many use cases as it will be less performant than 'future'
  • libcfa/src/concurrency/invoke.h

    r3a513d89 r044ae62  
    217217                struct __thread_user_link cltr_link;
    218218
    219                 // used to point to this thd's current clh node
    220                 volatile bool * clh_node;
    221 
    222219                struct processor * last_proc;
     220
     221        // ptr used during handover between blocking lists to allow for stack allocation of intrusive nodes
     222        // main use case is wait-morphing to allow a different node to be used to block on condvar vs lock
     223        void * link_node;
    223224
    224225                PRNG_STATE_T random_state;                                              // fast random numbers
  • libcfa/src/concurrency/locks.cfa

    r3a513d89 r044ae62  
    55// file "LICENCE" distributed with Cforall.
    66//
    7 // locks.hfa -- LIBCFATHREAD
     7// locks.cfa -- LIBCFATHREAD
    88// Runtime locks that used with the runtime thread system.
    99//
     
    7979        // lock is held by some other thread
    8080        if ( owner != 0p && owner != thrd ) {
    81                 insert_last( blocked_threads, *thrd );
     81        select_node node;
     82                insert_last( blocked_threads, node );
    8283                wait_count++;
    8384                unlock( lock );
    8485                park( );
    85         }
    86         // multi acquisition lock is held by current thread
    87         else if ( owner == thrd && multi_acquisition ) {
     86        return;
     87        } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread
    8888                recursion_count++;
    89                 unlock( lock );
    90         }
    91         // lock isn't held
    92         else {
     89        } else {  // lock isn't held
    9390                owner = thrd;
    9491                recursion_count = 1;
    95                 unlock( lock );
    96         }
     92        }
     93    unlock( lock );
    9794}
    9895
     
    117114}
    118115
    119 static void pop_and_set_new_owner( blocking_lock & this ) with( this ) {
    120         thread$ * t = &try_pop_front( blocked_threads );
    121         owner = t;
    122         recursion_count = ( t ? 1 : 0 );
    123         if ( t ) wait_count--;
    124         unpark( t );
     116static inline void pop_node( blocking_lock & this ) with( this ) {
     117    __handle_waituntil_OR( blocked_threads );
     118    select_node * node = &try_pop_front( blocked_threads );
     119    if ( node ) {
     120        wait_count--;
     121        owner = node->blocked_thread;
     122        recursion_count = 1;
     123        // if ( !node->clause_status || __make_select_node_available( *node ) ) unpark( node->blocked_thread );
     124        wake_one( blocked_threads, *node );
     125    } else {
     126        owner = 0p;
     127        recursion_count = 0;
     128    }
    125129}
    126130
     
    134138        recursion_count--;
    135139        if ( recursion_count == 0 ) {
    136                 pop_and_set_new_owner( this );
     140                pop_node( this );
    137141        }
    138142        unlock( lock );
     
    147151        // lock held
    148152        if ( owner != 0p ) {
    149                 insert_last( blocked_threads, *t );
     153                insert_last( blocked_threads, *(select_node *)t->link_node );
    150154                wait_count++;
    151                 unlock( lock );
    152155        }
    153156        // lock not held
     
    156159                recursion_count = 1;
    157160                unpark( t );
    158                 unlock( lock );
    159         }
    160 }
    161 
    162 size_t on_wait( blocking_lock & this ) with( this ) {
     161        }
     162    unlock( lock );
     163}
     164
     165size_t on_wait( blocking_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) with( this ) {
    163166        lock( lock __cfaabi_dbg_ctx2 );
    164167        /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
     
    167170        size_t ret = recursion_count;
    168171
    169         pop_and_set_new_owner( this );
     172        pop_node( this );
     173
     174    select_node node;
     175    active_thread()->link_node = (void *)&node;
    170176        unlock( lock );
     177
     178    pre_park_then_park( pp_fn, pp_datum );
     179
    171180        return ret;
    172181}
     
    175184        recursion_count = recursion;
    176185}
     186
     187// waituntil() support
     188bool register_select( blocking_lock & this, select_node & node ) with(this) {
     189    lock( lock __cfaabi_dbg_ctx2 );
     190        thread$ * thrd = active_thread();
     191
     192        // single acquisition lock is held by current thread
     193        /* paranoid */ verifyf( owner != thrd || multi_acquisition, "Single acquisition lock holder (%p) attempted to reacquire the lock %p resulting in a deadlock.", owner, &this );
     194
     195    if ( !node.park_counter && ( (owner == thrd && multi_acquisition) || owner == 0p ) ) { // OR special case
     196        if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
     197           unlock( lock );
     198           return false;
     199        }
     200    }
     201
     202        // lock is held by some other thread
     203        if ( owner != 0p && owner != thrd ) {
     204                insert_last( blocked_threads, node );
     205                wait_count++;
     206                unlock( lock );
     207        return false;
     208        } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread
     209                recursion_count++;
     210        } else {  // lock isn't held
     211                owner = thrd;
     212                recursion_count = 1;
     213        }
     214
     215    if ( node.park_counter ) __make_select_node_available( node );
     216    unlock( lock );
     217    return true;
     218}
     219
     220bool unregister_select( blocking_lock & this, select_node & node ) with(this) {
     221    lock( lock __cfaabi_dbg_ctx2 );
     222    if ( node`isListed ) {
     223        remove( node );
     224        wait_count--;
     225        unlock( lock );
     226        return false;
     227    }
     228   
     229    if ( owner == active_thread() ) {
     230        /* paranoid */ verifyf( recursion_count == 1 || multi_acquisition, "Thread %p attempted to unlock owner lock %p in waituntil unregister, which is not recursive but has a recursive count of %zu", active_thread(), &this, recursion_count );
     231        // if recursion count is zero release lock and set new owner if one is waiting
     232        recursion_count--;
     233        if ( recursion_count == 0 ) {
     234            pop_node( this );
     235        }
     236    }
     237        unlock( lock );
     238    return false;
     239}
     240
     241bool on_selected( blocking_lock & this, select_node & node ) { return true; }
    177242
    178243//-----------------------------------------------------------------------------
     
    311376        int counter( condition_variable(L) & this ) with(this) { return count; }
    312377
    313         static size_t queue_and_get_recursion( condition_variable(L) & this, info_thread(L) * i ) with(this) {
     378        static void enqueue_thread( condition_variable(L) & this, info_thread(L) * i ) with(this) {
    314379                // add info_thread to waiting queue
    315380                insert_last( blocked_threads, *i );
    316381                count++;
    317                 size_t recursion_count = 0;
    318                 if (i->lock) {
    319                         // if lock was passed get recursion count to reset to after waking thread
    320                         recursion_count = on_wait( *i->lock );
    321                 }
    322                 return recursion_count;
    323         }
     382        }
     383
     384    static size_t block_and_get_recursion( info_thread(L) & i, __cfa_pre_park pp_fn, void * pp_datum ) {
     385        size_t recursion_count = 0;
     386                if ( i.lock ) // if lock was passed get recursion count to reset to after waking thread
     387                        recursion_count = on_wait( *i.lock, pp_fn, pp_datum ); // this call blocks
     388                else
     389            pre_park_then_park( pp_fn, pp_datum );
     390        return recursion_count;
     391    }
     392    static size_t block_and_get_recursion( info_thread(L) & i ) { return block_and_get_recursion( i, pre_park_noop, 0p ); }
    324393
    325394        // helper for wait()'s' with no timeout
    326395        static void queue_info_thread( condition_variable(L) & this, info_thread(L) & i ) with(this) {
    327396                lock( lock __cfaabi_dbg_ctx2 );
    328                 size_t recursion_count = queue_and_get_recursion(this, &i);
     397        enqueue_thread( this, &i );
    329398                unlock( lock );
    330399
    331400                // blocks here
    332                 park( );
     401        size_t recursion_count = block_and_get_recursion( i );
    333402
    334403                // resets recursion count here after waking
    335                 if (i.lock) on_wakeup(*i.lock, recursion_count);
     404                if ( i.lock ) on_wakeup( *i.lock, recursion_count );
    336405        }
    337406
     
    340409                queue_info_thread( this, i );
    341410
     411    static void cond_alarm_register( void * node_ptr ) { register_self( (alarm_node_t *)node_ptr ); }
     412
    342413        // helper for wait()'s' with a timeout
    343414        static void queue_info_thread_timeout( condition_variable(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) {
    344415                lock( lock __cfaabi_dbg_ctx2 );
    345                 size_t recursion_count = queue_and_get_recursion(this, &info);
     416        enqueue_thread( this, &info );
    346417                alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info };
    347418                unlock( lock );
    348419
    349                 // registers alarm outside cond lock to avoid deadlock
    350                 register_self( &node_wrap.alarm_node );
    351 
    352                 // blocks here
    353                 park();
     420                // blocks here and registers alarm node before blocking after releasing locks to avoid deadlock
     421        size_t recursion_count = block_and_get_recursion( info, cond_alarm_register, (void *)(&node_wrap.alarm_node) );
     422                // park();
    354423
    355424                // unregisters alarm so it doesn't go off if this happens first
     
    357426
    358427                // resets recursion count here after waking
    359                 if (info.lock) on_wakeup(*info.lock, recursion_count);
     428                if ( info.lock ) on_wakeup( *info.lock, recursion_count );
    360429        }
    361430
     
    417486                info_thread( L ) i = { active_thread(), info, &l };
    418487                insert_last( blocked_threads, i );
    419                 size_t recursion_count = on_wait( *i.lock );
    420                 park( );
     488                size_t recursion_count = on_wait( *i.lock, pre_park_noop, 0p ); // blocks here
     489                // park( );
    421490                on_wakeup(*i.lock, recursion_count);
    422491        }
     
    459528        bool empty ( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty; }
    460529
    461         static size_t queue_and_get_recursion( pthread_cond_var(L) & this, info_thread(L) * i ) with(this) {
    462                 // add info_thread to waiting queue
    463                 insert_last( blocked_threads, *i );
    464                 size_t recursion_count = 0;
    465                 recursion_count = on_wait( *i->lock );
    466                 return recursion_count;
    467         }
    468        
    469530        static void queue_info_thread_timeout( pthread_cond_var(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) {
    470531                lock( lock __cfaabi_dbg_ctx2 );
    471                 size_t recursion_count = queue_and_get_recursion(this, &info);
     532        insert_last( blocked_threads, info );
    472533                pthread_alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info };
    473534                unlock( lock );
    474535
    475                 // registers alarm outside cond lock to avoid deadlock
    476                 register_self( &node_wrap.alarm_node );
    477 
    478                 // blocks here
    479                 park();
    480 
    481                 // unregisters alarm so it doesn't go off if this happens first
     536                // blocks here and registers alarm node before blocking after releasing locks to avoid deadlock
     537        size_t recursion_count = block_and_get_recursion( info, cond_alarm_register, (void *)(&node_wrap.alarm_node) );
     538
     539                // unregisters alarm so it doesn't go off if signal happens first
    482540                unregister_self( &node_wrap.alarm_node );
    483541
    484542                // resets recursion count here after waking
    485                 if (info.lock) on_wakeup(*info.lock, recursion_count);
     543                if ( info.lock ) on_wakeup( *info.lock, recursion_count );
    486544        }
    487545
     
    493551                lock( lock __cfaabi_dbg_ctx2 );
    494552                info_thread( L ) i = { active_thread(), info, &l };
    495                 size_t recursion_count = queue_and_get_recursion(this, &i);
    496                 unlock( lock );
    497                 park( );
    498                 on_wakeup(*i.lock, recursion_count);
     553        insert_last( blocked_threads, i );
     554                unlock( lock );
     555
     556        // blocks here
     557                size_t recursion_count = block_and_get_recursion( i );
     558
     559                on_wakeup( *i.lock, recursion_count );
    499560        }
    500561
     
    584645        return thrd != 0p;
    585646}
     647
  • libcfa/src/concurrency/locks.hfa

    r3a513d89 r044ae62  
    3030#include "time.hfa"
    3131
     32#include "select.hfa"
     33
    3234#include <fstream.hfa>
    3335
     
    3739#include <unistd.h>
    3840
    39 // C_TODO: cleanup this and locks.cfa
    40 // - appropriate separation of interface and impl
    41 // - clean up unused/unneeded locks
    42 // - change messy big blocking lock from inheritance to composition to remove need for flags
     41typedef void (*__cfa_pre_park)( void * );
     42
     43static inline void pre_park_noop( void * ) {}
     44
     45//-----------------------------------------------------------------------------
     46// is_blocking_lock
     47forall( L & | sized(L) )
     48trait is_blocking_lock {
     49        // For synchronization locks to use when acquiring
     50        void on_notify( L &, struct thread$ * );
     51
     52        // For synchronization locks to use when releasing
     53        size_t on_wait( L &, __cfa_pre_park pp_fn, void * pp_datum );
     54
     55        // to set recursion count after getting signalled;
     56        void on_wakeup( L &, size_t recursion );
     57};
     58
     59static inline void pre_park_then_park( __cfa_pre_park pp_fn, void * pp_datum ) {
     60    pp_fn( pp_datum );
     61    park();
     62}
     63
     64// macros for default routine impls for is_blocking_lock trait that do not wait-morph
     65
     66#define DEFAULT_ON_NOTIFY( lock_type ) \
     67    static inline void on_notify( lock_type & this, thread$ * t ){ unpark(t); }
     68
     69#define DEFAULT_ON_WAIT( lock_type ) \
     70    static inline size_t on_wait( lock_type & this, __cfa_pre_park pp_fn, void * pp_datum ) { \
     71        unlock( this ); \
     72        pre_park_then_park( pp_fn, pp_datum ); \
     73        return 0; \
     74    }
     75
     76// on_wakeup impl if lock should be reacquired after waking up
     77#define DEFAULT_ON_WAKEUP_REACQ( lock_type ) \
     78    static inline void on_wakeup( lock_type & this, size_t recursion ) { lock( this ); }
     79
     80// on_wakeup impl if lock will not be reacquired after waking up
     81#define DEFAULT_ON_WAKEUP_NO_REACQ( lock_type ) \
     82    static inline void on_wakeup( lock_type & this, size_t recursion ) {}
     83
     84
    4385
    4486//-----------------------------------------------------------------------------
     
    67109static inline bool   try_lock ( single_acquisition_lock & this ) { return try_lock( (blocking_lock &)this ); }
    68110static inline void   unlock   ( single_acquisition_lock & this ) { unlock  ( (blocking_lock &)this ); }
    69 static inline size_t on_wait  ( single_acquisition_lock & this ) { return on_wait ( (blocking_lock &)this ); }
     111static inline size_t on_wait  ( single_acquisition_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) { return on_wait ( (blocking_lock &)this, pp_fn, pp_datum ); }
    70112static inline void   on_wakeup( single_acquisition_lock & this, size_t v ) { on_wakeup ( (blocking_lock &)this, v ); }
    71113static inline void   on_notify( single_acquisition_lock & this, struct thread$ * t ) { on_notify( (blocking_lock &)this, t ); }
     114static inline bool   register_select( single_acquisition_lock & this, select_node & node ) { return register_select( (blocking_lock &)this, node ); }
     115static inline bool   unregister_select( single_acquisition_lock & this, select_node & node ) { return unregister_select( (blocking_lock &)this, node ); }
     116static inline bool   on_selected( single_acquisition_lock & this, select_node & node ) { return on_selected( (blocking_lock &)this, node ); }
    72117
    73118//----------
     
    81126static inline bool   try_lock ( owner_lock & this ) { return try_lock( (blocking_lock &)this ); }
    82127static inline void   unlock   ( owner_lock & this ) { unlock  ( (blocking_lock &)this ); }
    83 static inline size_t on_wait  ( owner_lock & this ) { return on_wait ( (blocking_lock &)this ); }
     128static inline size_t on_wait  ( owner_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) { return on_wait ( (blocking_lock &)this, pp_fn, pp_datum ); }
    84129static inline void   on_wakeup( owner_lock & this, size_t v ) { on_wakeup ( (blocking_lock &)this, v ); }
    85130static inline void   on_notify( owner_lock & this, struct thread$ * t ) { on_notify( (blocking_lock &)this, t ); }
     131static inline bool   register_select( owner_lock & this, select_node & node ) { return register_select( (blocking_lock &)this, node ); }
     132static inline bool   unregister_select( owner_lock & this, select_node & node ) { return unregister_select( (blocking_lock &)this, node ); }
     133static inline bool   on_selected( owner_lock & this, select_node & node ) { return on_selected( (blocking_lock &)this, node ); }
    86134
    87135//-----------------------------------------------------------------------------
     
    128176static inline void ?{}(mcs_spin_node & this) { this.next = 0p; this.locked = true; }
    129177
    130 static inline mcs_spin_node * volatile & ?`next ( mcs_spin_node * node ) {
    131         return node->next;
    132 }
    133 
    134178struct mcs_spin_lock {
    135179        mcs_spin_queue queue;
     
    137181
    138182static inline void lock(mcs_spin_lock & l, mcs_spin_node & n) {
     183    n.locked = true;
    139184        mcs_spin_node * prev = __atomic_exchange_n(&l.queue.tail, &n, __ATOMIC_SEQ_CST);
    140         n.locked = true;
    141         if(prev == 0p) return;
     185        if( prev == 0p ) return;
    142186        prev->next = &n;
    143         while(__atomic_load_n(&n.locked, __ATOMIC_RELAXED)) Pause();
     187        while( __atomic_load_n(&n.locked, __ATOMIC_RELAXED) ) Pause();
    144188}
    145189
     
    147191        mcs_spin_node * n_ptr = &n;
    148192        if (__atomic_compare_exchange_n(&l.queue.tail, &n_ptr, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) return;
    149         while (__atomic_load_n(&n.next, __ATOMIC_RELAXED) == 0p) {}
     193        while (__atomic_load_n(&n.next, __ATOMIC_RELAXED) == 0p) Pause();
    150194        n.next->locked = false;
    151195}
     
    156200// - Kernel thd blocking alternative to the spinlock
    157201// - No ownership (will deadlock on reacq)
     202// - no reacq on wakeup
    158203struct futex_mutex {
    159204        // lock state any state other than UNLOCKED is locked
     
    169214}
    170215
    171 static inline void  ?{}( futex_mutex & this ) with(this) { val = 0; }
    172 
    173 static inline bool internal_try_lock(futex_mutex & this, int & compare_val) with(this) {
     216static inline void ?{}( futex_mutex & this ) with(this) { val = 0; }
     217
     218static inline bool internal_try_lock( futex_mutex & this, int & compare_val) with(this) {
    174219        return __atomic_compare_exchange_n((int*)&val, (int*)&compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE);
    175220}
    176221
    177 static inline int internal_exchange(futex_mutex & this) with(this) {
     222static inline int internal_exchange( futex_mutex & this ) with(this) {
    178223        return __atomic_exchange_n((int*)&val, 2, __ATOMIC_ACQUIRE);
    179224}
    180225
    181226// if this is called recursively IT WILL DEADLOCK!!!!!
    182 static inline void lock(futex_mutex & this) with(this) {
     227static inline void lock( futex_mutex & this ) with(this) {
    183228        int state;
    184229
     
    190235                for (int i = 0; i < spin; i++) Pause();
    191236        }
    192 
    193         // // no contention try to acquire
    194         // if (internal_try_lock(this, state)) return;
    195237       
    196238        // if not in contended state, set to be in contended state
     
    212254}
    213255
    214 static inline void on_notify( futex_mutex & f, thread$ * t){ unpark(t); }
    215 static inline size_t on_wait( futex_mutex & f ) {unlock(f); return 0;}
    216 
    217 // to set recursion count after getting signalled;
    218 static inline void on_wakeup( futex_mutex & f, size_t recursion ) {}
     256DEFAULT_ON_NOTIFY( futex_mutex )
     257DEFAULT_ON_WAIT( futex_mutex )
     258DEFAULT_ON_WAKEUP_NO_REACQ( futex_mutex )
    219259
    220260//-----------------------------------------------------------------------------
     
    232272        int val;
    233273};
    234 
    235274static inline void  ?{}( go_mutex & this ) with(this) { val = 0; }
     275// static inline void ?{}( go_mutex & this, go_mutex this2 ) = void; // these don't compile correctly at the moment so they should be omitted
     276// static inline void ?=?( go_mutex & this, go_mutex this2 ) = void;
    236277
    237278static inline bool internal_try_lock(go_mutex & this, int & compare_val, int new_val ) with(this) {
     
    244285
    245286// if this is called recursively IT WILL DEADLOCK!!!!!
    246 static inline void lock(go_mutex & this) with(this) {
     287static inline void lock( go_mutex & this ) with( this ) {
    247288        int state, init_state;
    248289
     
    255296            while( !val ) { // lock unlocked
    256297                state = 0;
    257                 if (internal_try_lock(this, state, init_state)) return;
     298                if ( internal_try_lock( this, state, init_state ) ) return;
    258299            }
    259300            for (int i = 0; i < 30; i++) Pause();
     
    262303        while( !val ) { // lock unlocked
    263304            state = 0;
    264             if (internal_try_lock(this, state, init_state)) return;
     305            if ( internal_try_lock( this, state, init_state ) ) return;
    265306        }
    266307        sched_yield();
    267308       
    268309        // if not in contended state, set to be in contended state
    269         state = internal_exchange(this, 2);
     310        state = internal_exchange( this, 2 );
    270311        if ( !state ) return; // state == 0
    271312        init_state = 2;
    272         futex((int*)&val, FUTEX_WAIT, 2); // if val is not 2 this returns with EWOULDBLOCK
     313        futex( (int*)&val, FUTEX_WAIT, 2 ); // if val is not 2 this returns with EWOULDBLOCK
    273314    }
    274315}
     
    276317static inline void unlock( go_mutex & this ) with(this) {
    277318        // if uncontended do atomic unlock and then return
    278     if (__atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1) return;
     319    if ( __atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1 ) return;
    279320       
    280321        // otherwise threads are blocked so we must wake one
    281         futex((int *)&val, FUTEX_WAKE, 1);
    282 }
    283 
    284 static inline void on_notify( go_mutex & f, thread$ * t){ unpark(t); }
    285 static inline size_t on_wait( go_mutex & f ) {unlock(f); return 0;}
    286 static inline void on_wakeup( go_mutex & f, size_t recursion ) {}
    287 
    288 //-----------------------------------------------------------------------------
    289 // CLH Spinlock
    290 // - No recursive acquisition
    291 // - Needs to be released by owner
    292 
    293 struct clh_lock {
    294         volatile bool * volatile tail;
    295     volatile bool * volatile head;
    296 };
    297 
    298 static inline void  ?{}( clh_lock & this ) { this.tail = malloc(); *this.tail = true; }
    299 static inline void ^?{}( clh_lock & this ) { free(this.tail); }
    300 
    301 static inline void lock(clh_lock & l) {
    302         thread$ * curr_thd = active_thread();
    303         *(curr_thd->clh_node) = false;
    304         volatile bool * prev = __atomic_exchange_n((bool **)(&l.tail), (bool *)(curr_thd->clh_node), __ATOMIC_SEQ_CST);
    305         while(!__atomic_load_n(prev, __ATOMIC_SEQ_CST)) Pause();
    306     __atomic_store_n((bool **)(&l.head), (bool *)curr_thd->clh_node, __ATOMIC_SEQ_CST);
    307     curr_thd->clh_node = prev;
    308 }
    309 
    310 static inline void unlock(clh_lock & l) {
    311         __atomic_store_n((bool *)(l.head), true, __ATOMIC_SEQ_CST);
    312 }
    313 
    314 static inline void on_notify(clh_lock & this, struct thread$ * t ) { unpark(t); }
    315 static inline size_t on_wait(clh_lock & this) { unlock(this); return 0; }
    316 static inline void on_wakeup(clh_lock & this, size_t recursion ) { lock(this); }
     322        futex( (int *)&val, FUTEX_WAKE, 1 );
     323}
     324
     325DEFAULT_ON_NOTIFY( go_mutex )
     326DEFAULT_ON_WAIT( go_mutex )
     327DEFAULT_ON_WAKEUP_NO_REACQ( go_mutex )
    317328
    318329//-----------------------------------------------------------------------------
     
    334345        this.lock_value = 0;
    335346}
     347static inline void ?{}( exp_backoff_then_block_lock & this, exp_backoff_then_block_lock this2 ) = void;
     348static inline void ?=?( exp_backoff_then_block_lock & this, exp_backoff_then_block_lock this2 ) = void;
    336349
    337350static inline void  ^?{}( exp_backoff_then_block_lock & this ){}
    338351
    339 static inline bool internal_try_lock(exp_backoff_then_block_lock & this, size_t & compare_val) with(this) {
     352static inline bool internal_try_lock( exp_backoff_then_block_lock & this, size_t & compare_val ) with(this) {
    340353        return __atomic_compare_exchange_n(&lock_value, &compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
    341354}
    342355
    343 static inline bool try_lock(exp_backoff_then_block_lock & this) { size_t compare_val = 0; return internal_try_lock(this, compare_val); }
    344 
    345 static inline bool try_lock_contention(exp_backoff_then_block_lock & this) with(this) {
    346         return !__atomic_exchange_n(&lock_value, 2, __ATOMIC_ACQUIRE);
    347 }
    348 
    349 static inline bool block(exp_backoff_then_block_lock & this) with(this) {
     356static inline bool try_lock( exp_backoff_then_block_lock & this ) { size_t compare_val = 0; return internal_try_lock( this, compare_val ); }
     357
     358static inline bool try_lock_contention( exp_backoff_then_block_lock & this ) with(this) {
     359        return !__atomic_exchange_n( &lock_value, 2, __ATOMIC_ACQUIRE );
     360}
     361
     362static inline bool block( exp_backoff_then_block_lock & this ) with(this) {
    350363    lock( spinlock __cfaabi_dbg_ctx2 );
    351364    if (__atomic_load_n( &lock_value, __ATOMIC_SEQ_CST) != 2) {
     
    359372}
    360373
    361 static inline void lock(exp_backoff_then_block_lock & this) with(this) {
     374static inline void lock( exp_backoff_then_block_lock & this ) with(this) {
    362375        size_t compare_val = 0;
    363376        int spin = 4;
     
    378391}
    379392
    380 static inline void unlock(exp_backoff_then_block_lock & this) with(this) {
     393static inline void unlock( exp_backoff_then_block_lock & this ) with(this) {
    381394    if (__atomic_exchange_n(&lock_value, 0, __ATOMIC_RELEASE) == 1) return;
    382395    lock( spinlock __cfaabi_dbg_ctx2 );
     
    386399}
    387400
    388 static inline void on_notify(exp_backoff_then_block_lock & this, struct thread$ * t ) { unpark(t); }
    389 static inline size_t on_wait(exp_backoff_then_block_lock & this) { unlock(this); return 0; }
    390 static inline void on_wakeup(exp_backoff_then_block_lock & this, size_t recursion ) { lock(this); }
     401DEFAULT_ON_NOTIFY( exp_backoff_then_block_lock )
     402DEFAULT_ON_WAIT( exp_backoff_then_block_lock )
     403DEFAULT_ON_WAKEUP_REACQ( exp_backoff_then_block_lock )
    391404
    392405//-----------------------------------------------------------------------------
     
    418431
    419432// if this is called recursively IT WILL DEADLOCK!!!!!
    420 static inline void lock(fast_block_lock & this) with(this) {
     433static inline void lock( fast_block_lock & this ) with(this) {
    421434        lock( lock __cfaabi_dbg_ctx2 );
    422435        if ( held ) {
     
    430443}
    431444
    432 static inline void unlock(fast_block_lock & this) with(this) {
     445static inline void unlock( fast_block_lock & this ) with(this) {
    433446        lock( lock __cfaabi_dbg_ctx2 );
    434447        /* paranoid */ verifyf( held != false, "Attempt to release lock %p that isn't held", &this );
     
    439452}
    440453
    441 static inline void on_notify(fast_block_lock & this, struct thread$ * t ) with(this) {
     454static inline void on_notify( fast_block_lock & this, struct thread$ * t ) with(this) {
    442455    lock( lock __cfaabi_dbg_ctx2 );
    443456    insert_last( blocked_threads, *t );
    444457    unlock( lock );
    445458}
    446 static inline size_t on_wait(fast_block_lock & this) { unlock(this); return 0; }
    447 static inline void on_wakeup(fast_block_lock & this, size_t recursion ) { }
     459DEFAULT_ON_WAIT( fast_block_lock )
     460DEFAULT_ON_WAKEUP_NO_REACQ( fast_block_lock )
    448461
    449462//-----------------------------------------------------------------------------
     
    456469struct simple_owner_lock {
    457470        // List of blocked threads
    458         dlist( thread$ ) blocked_threads;
     471        dlist( select_node ) blocked_threads;
    459472
    460473        // Spin lock used for mutual exclusion
     
    477490static inline void ?=?( simple_owner_lock & this, simple_owner_lock this2 ) = void;
    478491
    479 static inline void lock(simple_owner_lock & this) with(this) {
    480         if (owner == active_thread()) {
     492static inline void lock( simple_owner_lock & this ) with(this) {
     493        if ( owner == active_thread() ) {
    481494                recursion_count++;
    482495                return;
     
    484497        lock( lock __cfaabi_dbg_ctx2 );
    485498
    486         if (owner != 0p) {
    487                 insert_last( blocked_threads, *active_thread() );
     499        if ( owner != 0p ) {
     500        select_node node;
     501                insert_last( blocked_threads, node );
    488502                unlock( lock );
    489503                park( );
     
    495509}
    496510
    497 // TODO: fix duplicate def issue and bring this back
    498 // void pop_and_set_new_owner( simple_owner_lock & this ) with( this ) {
    499         // thread$ * t = &try_pop_front( blocked_threads );
    500         // owner = t;
    501         // recursion_count = ( t ? 1 : 0 );
    502         // unpark( t );
    503 // }
    504 
    505 static inline void unlock(simple_owner_lock & this) with(this) {
     511static inline void pop_node( simple_owner_lock & this ) with(this) {
     512    __handle_waituntil_OR( blocked_threads );
     513    select_node * node = &try_pop_front( blocked_threads );
     514    if ( node ) {
     515        owner = node->blocked_thread;
     516        recursion_count = 1;
     517        // if ( !node->clause_status || __make_select_node_available( *node ) ) unpark( node->blocked_thread );
     518        wake_one( blocked_threads, *node );
     519    } else {
     520        owner = 0p;
     521        recursion_count = 0;
     522    }
     523}
     524
     525static inline void unlock( simple_owner_lock & this ) with(this) {
    506526        lock( lock __cfaabi_dbg_ctx2 );
    507527        /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
     
    510530        recursion_count--;
    511531        if ( recursion_count == 0 ) {
    512                 // pop_and_set_new_owner( this );
    513                 thread$ * t = &try_pop_front( blocked_threads );
    514                 owner = t;
    515                 recursion_count = ( t ? 1 : 0 );
    516                 unpark( t );
     532                pop_node( this );
    517533        }
    518534        unlock( lock );
    519535}
    520536
    521 static inline void on_notify(simple_owner_lock & this, struct thread$ * t ) with(this) {
     537static inline void on_notify( simple_owner_lock & this, thread$ * t ) with(this) {
    522538        lock( lock __cfaabi_dbg_ctx2 );
    523539        // lock held
    524540        if ( owner != 0p ) {
    525                 insert_last( blocked_threads, *t );
     541                insert_last( blocked_threads, *(select_node *)t->link_node );
    526542        }
    527543        // lock not held
     
    534550}
    535551
    536 static inline size_t on_wait(simple_owner_lock & this) with(this) {
     552static inline size_t on_wait( simple_owner_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) with(this) {
    537553        lock( lock __cfaabi_dbg_ctx2 );
    538554        /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
     
    541557        size_t ret = recursion_count;
    542558
    543         // pop_and_set_new_owner( this );
    544 
    545         thread$ * t = &try_pop_front( blocked_threads );
    546         owner = t;
    547         recursion_count = ( t ? 1 : 0 );
    548         unpark( t );
    549 
     559        pop_node( this );
     560
     561    select_node node;
     562    active_thread()->link_node = (void *)&node;
    550563        unlock( lock );
     564
     565    pre_park_then_park( pp_fn, pp_datum );
     566
    551567        return ret;
    552568}
    553569
    554 static inline void on_wakeup(simple_owner_lock & this, size_t recursion ) with(this) { recursion_count = recursion; }
     570static inline void on_wakeup( simple_owner_lock & this, size_t recursion ) with(this) { recursion_count = recursion; }
     571
     572// waituntil() support
     573static inline bool register_select( simple_owner_lock & this, select_node & node ) with(this) {
     574    lock( lock __cfaabi_dbg_ctx2 );
     575
     576    // check if we can complete operation. If so race to establish winner in special OR case
     577    if ( !node.park_counter && ( owner == active_thread() || owner == 0p ) ) {
     578        if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
     579           unlock( lock );
     580           return false;
     581        }
     582    }
     583
     584    if ( owner == active_thread() ) {
     585                recursion_count++;
     586        if ( node.park_counter ) __make_select_node_available( node );
     587        unlock( lock );
     588                return true;
     589        }
     590
     591    if ( owner != 0p ) {
     592                insert_last( blocked_threads, node );
     593                unlock( lock );
     594                return false;
     595        }
     596   
     597        owner = active_thread();
     598        recursion_count = 1;
     599
     600    if ( node.park_counter ) __make_select_node_available( node );
     601    unlock( lock );
     602    return true;
     603}
     604
     605static inline bool unregister_select( simple_owner_lock & this, select_node & node ) with(this) {
     606    lock( lock __cfaabi_dbg_ctx2 );
     607    if ( node`isListed ) {
     608        remove( node );
     609        unlock( lock );
     610        return false;
     611    }
     612
     613    if ( owner == active_thread() ) {
     614        recursion_count--;
     615        if ( recursion_count == 0 ) {
     616            pop_node( this );
     617        }
     618    }
     619    unlock( lock );
     620    return false;
     621}
     622
     623static inline bool on_selected( simple_owner_lock & this, select_node & node ) { return true; }
     624
    555625
    556626//-----------------------------------------------------------------------------
     
    578648
    579649// if this is called recursively IT WILL DEADLOCK!
    580 static inline void lock(spin_queue_lock & this) with(this) {
     650static inline void lock( spin_queue_lock & this ) with(this) {
    581651        mcs_spin_node node;
    582652        lock( lock, node );
     
    586656}
    587657
    588 static inline void unlock(spin_queue_lock & this) with(this) {
     658static inline void unlock( spin_queue_lock & this ) with(this) {
    589659        __atomic_store_n(&held, false, __ATOMIC_RELEASE);
    590660}
    591661
    592 static inline void on_notify(spin_queue_lock & this, struct thread$ * t ) {
    593         unpark(t);
    594 }
    595 static inline size_t on_wait(spin_queue_lock & this) { unlock(this); return 0; }
    596 static inline void on_wakeup(spin_queue_lock & this, size_t recursion ) { lock(this); }
    597 
     662DEFAULT_ON_NOTIFY( spin_queue_lock )
     663DEFAULT_ON_WAIT( spin_queue_lock )
     664DEFAULT_ON_WAKEUP_REACQ( spin_queue_lock )
    598665
    599666//-----------------------------------------------------------------------------
     
    621688
    622689// if this is called recursively IT WILL DEADLOCK!!!!!
    623 static inline void lock(mcs_block_spin_lock & this) with(this) {
     690static inline void lock( mcs_block_spin_lock & this ) with(this) {
    624691        mcs_node node;
    625692        lock( lock, node );
     
    633700}
    634701
    635 static inline void on_notify(mcs_block_spin_lock & this, struct thread$ * t ) { unpark(t); }
    636 static inline size_t on_wait(mcs_block_spin_lock & this) { unlock(this); return 0; }
    637 static inline void on_wakeup(mcs_block_spin_lock & this, size_t recursion ) {lock(this); }
     702DEFAULT_ON_NOTIFY( mcs_block_spin_lock )
     703DEFAULT_ON_WAIT( mcs_block_spin_lock )
     704DEFAULT_ON_WAKEUP_REACQ( mcs_block_spin_lock )
    638705
    639706//-----------------------------------------------------------------------------
     
    661728
    662729// if this is called recursively IT WILL DEADLOCK!!!!!
    663 static inline void lock(block_spin_lock & this) with(this) {
     730static inline void lock( block_spin_lock & this ) with(this) {
    664731        lock( lock );
    665732        while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause();
     
    668735}
    669736
    670 static inline void unlock(block_spin_lock & this) with(this) {
     737static inline void unlock( block_spin_lock & this ) with(this) {
    671738        __atomic_store_n(&held, false, __ATOMIC_RELEASE);
    672739}
    673740
    674 static inline void on_notify(block_spin_lock & this, struct thread$ * t ) with(this.lock) {
     741static inline void on_notify( block_spin_lock & this, struct thread$ * t ) with(this.lock) {
    675742        // first we acquire internal fast_block_lock
    676743        lock( lock __cfaabi_dbg_ctx2 );
     
    686753        unpark(t);
    687754}
    688 static inline size_t on_wait(block_spin_lock & this) { unlock(this); return 0; }
    689 static inline void on_wakeup(block_spin_lock & this, size_t recursion ) with(this) {
     755DEFAULT_ON_WAIT( block_spin_lock )
     756static inline void on_wakeup( block_spin_lock & this, size_t recursion ) with(this) {
    690757        // now we acquire the entire block_spin_lock upon waking up
    691758        while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause();
     
    693760        unlock( lock ); // Now we release the internal fast_spin_lock
    694761}
    695 
    696 //-----------------------------------------------------------------------------
    697 // is_blocking_lock
    698 forall( L & | sized(L) )
    699 trait is_blocking_lock {
    700         // For synchronization locks to use when acquiring
    701         void on_notify( L &, struct thread$ * );
    702 
    703         // For synchronization locks to use when releasing
    704         size_t on_wait( L & );
    705 
    706         // to set recursion count after getting signalled;
    707         void on_wakeup( L &, size_t recursion );
    708 };
    709762
    710763//-----------------------------------------------------------------------------
     
    714767forall(L & | is_blocking_lock(L)) {
    715768        struct info_thread;
    716 
    717         // // for use by sequence
    718         // info_thread(L) *& Back( info_thread(L) * this );
    719         // info_thread(L) *& Next( info_thread(L) * this );
    720769}
    721770
  • libcfa/src/concurrency/mutex_stmt.hfa

    r3a513d89 r044ae62  
    1515};
    1616
    17 
    1817struct __mutex_stmt_lock_guard {
    1918    void ** lockarr;
     
    3029
    3130forall(L & | is_lock(L)) {
    32 
    33     struct scoped_lock {
    34         L * internal_lock;
    35     };
    36 
    37     static inline void ?{}( scoped_lock(L) & this, L & internal_lock ) {
    38         this.internal_lock = &internal_lock;
    39         lock(internal_lock);
    40     }
    41    
    42     static inline void ^?{}( scoped_lock(L) & this ) with(this) {
    43         unlock(*internal_lock);
    44     }
    45 
    46     static inline void * __get_mutexstmt_lock_ptr( L & this ) {
    47         return &this;
    48     }
    49 
    50     static inline L __get_mutexstmt_lock_type( L & this );
    51 
    52     static inline L __get_mutexstmt_lock_type( L * this );
     31    static inline void * __get_mutexstmt_lock_ptr( L & this ) { return &this; }
     32    static inline L __get_mutexstmt_lock_type( L & this ) {}
     33    static inline L __get_mutexstmt_lock_type( L * this ) {}
    5334}
  • libcfa/src/concurrency/preemption.cfa

    r3a513d89 r044ae62  
    117117                __cfadbg_print_buffer_decl( preemption, " KERNEL: preemption tick %lu\n", currtime.tn);
    118118                Duration period = node->period;
    119                 if( period == 0) {
     119                if( period == 0 ) {
    120120                        node->set = false;                  // Node is one-shot, just mark it as not pending
    121121                }
  • libcfa/src/concurrency/select.hfa

    r3a513d89 r044ae62  
     1//
     2// Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
     3//
     4// The contents of this file are covered under the licence agreement in the
     5// file "LICENCE" distributed with Cforall.
     6//
     7// channel.hfa -- LIBCFATHREAD
     8// Runtime locks that used with the runtime thread system.
     9//
     10// Author           : Colby Alexander Parsons
     11// Created On       : Thu Jan 21 19:46:50 2023
     12// Last Modified By :
     13// Last Modified On :
     14// Update Count     :
     15//
     16
    117#pragma once
    218
    319#include "containers/list.hfa"
    4 #include <stdint.h>
    5 #include <kernel.hfa>
    6 #include <locks.hfa>
     20#include "alarm.hfa"
     21#include "kernel.hfa"
     22#include "time.hfa"
    723
     24struct select_node;
     25
     26// node status
     27static const unsigned long int __SELECT_UNSAT = 0;
     28static const unsigned long int __SELECT_PENDING = 1; // used only by special OR case
     29static const unsigned long int __SELECT_SAT = 2;
     30static const unsigned long int __SELECT_RUN = 3;
     31
     32// these are used inside the compiler to aid in code generation
     33static inline bool __CFA_has_clause_run( unsigned long int status ) { return status == __SELECT_RUN; }
     34static inline void __CFA_maybe_park( int * park_counter ) {
     35    if ( __atomic_sub_fetch( park_counter, 1, __ATOMIC_SEQ_CST) < 0 )
     36        park();
     37}
     38
     39// node used for coordinating waituntil synchronization
    840struct select_node {
     41    int * park_counter;                 // If this is 0p then the node is in a special OR case waituntil
     42    unsigned long int * clause_status;  // needs to point at ptr sized location, if this is 0p then node is not part of a waituntil
     43
     44    void * extra;                       // used to store arbitrary data needed by some primitives
     45
    946    thread$ * blocked_thread;
    10     void ** race_flag;
    1147    inline dlink(select_node);
    1248};
    1349P9_EMBEDDED( select_node, dlink(select_node) )
    1450
    15 void ?{}( select_node & this ) {
    16     this.blocked_thread = 0p;
    17     this.race_flag = 0p;
     51static inline void ?{}( select_node & this ) {
     52    this.blocked_thread = active_thread();
     53    this.clause_status = 0p;
     54    this.park_counter = 0p;
     55    this.extra = 0p;
    1856}
    1957
    20 void ?{}( select_node & this, thread$ * blocked_thread ) {
     58static inline void ?{}( select_node & this, thread$ * blocked_thread ) {
    2159    this.blocked_thread = blocked_thread;
    22     this.race_flag = 0p;
     60    this.clause_status = 0p;
     61    this.park_counter = 0p;
     62    this.extra = 0p;
    2363}
    2464
    25 void ?{}( select_node & this, thread$ * blocked_thread, void ** race_flag ) {
     65static inline void ?{}( select_node & this, thread$ * blocked_thread, void * extra ) {
    2666    this.blocked_thread = blocked_thread;
    27     this.race_flag = race_flag;
     67    this.clause_status = 0p;
     68    this.park_counter = 0p;
     69    this.extra = extra;
    2870}
     71static inline void ^?{}( select_node & this ) {}
    2972
    30 void ^?{}( select_node & this ) {}
     73// this is used inside the compiler to aid in code generation
     74static inline unsigned long int * __get_clause_status( select_node & s ) { return s.clause_status; }
    3175
     76// this is used inside the compiler to attempt to establish an else clause as a winner in the OR special case race
     77static inline bool __select_node_else_race( select_node & this ) with( this ) {
     78    unsigned long int cmp_status = __SELECT_UNSAT;
     79    return *clause_status == 0
     80            && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST );
     81}
    3282
    3383//-----------------------------------------------------------------------------
    3484// is_selectable
    35 trait is_selectable(T & | sized(T)) {
    36     // For registering a select on a selectable concurrency primitive
    37     // return 0p if primitive not accessible yet
    38     // return 1p if primitive gets acquired
    39     // return 2p if primitive is accessible but some other primitive won the race
    40     // C_TODO: add enum for return values
    41     void * register_select( T &, select_node & );
     85forall(T & | sized(T))
     86trait is_selectable {
     87    // For registering a select stmt on a selectable concurrency primitive
     88    // Returns bool that indicates if operation is already SAT
     89    bool register_select( T &, select_node & );
    4290
    43     void unregister_select( T &, select_node &  );
     91    // For unregistering a select stmt on a selectable concurrency primitive
     92    // If true is returned then the corresponding code block is run (only in non-special OR case and only if node status is not RUN)
     93    bool unregister_select( T &, select_node &  );
     94
     95    // This routine is run on the selecting thread prior to executing the statement corresponding to the select_node
     96    //    passed as an arg to this routine
     97    // If on_selected returns false, the statement is not run, if it returns true it is run.
     98    bool on_selected( T &, select_node & );
    4499};
    45100
    46 static inline bool install_select_winner( select_node & this, void * primitive_ptr ) with(this) {
    47     // temporary needed for atomic instruction
    48     void * cmp_flag = 0p;
    49    
    50     // if we dont win the selector race we need to potentially
    51     //   ignore this node and move to the next one so we return accordingly
    52     if ( *race_flag != 0p ||
    53         !__atomic_compare_exchange_n(
    54             race_flag,
    55             &cmp_flag,
    56             primitive_ptr,
    57             false,
    58             __ATOMIC_SEQ_CST,
    59             __ATOMIC_SEQ_CST
    60         )
    61     ) return false; // lost race and some other node triggered select
    62     return true; // won race so this node is what the select proceeds with
     101//=============================================================================================
     102// Waituntil Helpers
     103//=============================================================================================
     104
     105// used for the 2-stage avail needed by the special OR case
     106static inline bool __mark_select_node( select_node & this, unsigned long int val ) with( this ) {
     107    /* paranoid */ verify( park_counter == 0p );
     108    /* paranoid */ verify( clause_status != 0p );
     109
     110    unsigned long int cmp_status = __SELECT_UNSAT;
     111    while( !__atomic_compare_exchange_n( clause_status, &cmp_status, val, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {
     112        if ( cmp_status != __SELECT_PENDING ) return false;
     113        cmp_status = __SELECT_UNSAT;
     114    }
     115    return true;
    63116}
     117
     118static inline void __make_select_node_unsat( select_node & this ) with( this ) {
     119    __atomic_store_n( clause_status, __SELECT_UNSAT, __ATOMIC_SEQ_CST );
     120}
     121static inline void __make_select_node_sat( select_node & this ) with( this ) {
     122    __atomic_store_n( clause_status, __SELECT_SAT, __ATOMIC_SEQ_CST );
     123}
     124
     125static inline bool __make_select_node_pending( select_node & this ) with( this ) {
     126    return __mark_select_node( this, __SELECT_PENDING );
     127}
     128
     129// when a primitive becomes available it calls the following routine on it's node to update the select state:
     130// return true if we want to unpark the thd
     131static inline bool __make_select_node_available( select_node & this ) with( this ) {
     132    /* paranoid */ verify( clause_status != 0p );
     133    if( !park_counter )
     134        return __mark_select_node( this, (unsigned long int)&this );
     135
     136    unsigned long int cmp_status = __SELECT_UNSAT;
     137
     138    return *clause_status == 0 // C_TODO might not need a cmp_xchg in non special OR case
     139        && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) // can maybe just use atomic write
     140        && !__atomic_add_fetch( park_counter, 1, __ATOMIC_SEQ_CST);
     141}
     142
     143// Handles the special OR case of the waituntil statement
     144// Since only one select node can win in the OR case, we need to race to set the node available BEFORE
     145//    performing the operation since if we lose the race the operation should not be performed as it will be lost
     146// Returns true if execution can continue normally and false if the queue has now been drained
     147static inline bool __handle_waituntil_OR( dlist( select_node ) & queue ) {
     148    if ( queue`isEmpty ) return false;
     149    if ( queue`first.clause_status && !queue`first.park_counter ) {
     150        while ( !queue`isEmpty ) {
     151            // if node not a special OR case or if we win the special OR case race break
     152            if ( !queue`first.clause_status || queue`first.park_counter || __make_select_node_available( queue`first ) )
     153                return true;
     154            // otherwise we lost the special OR race so discard node
     155            try_pop_front( queue );
     156        }
     157        return false;
     158    }
     159    return true;
     160}
     161
     162// wake one thread from the list
     163static inline void wake_one( dlist( select_node ) & queue, select_node & popped ) {
     164    if ( !popped.clause_status                              // normal case, node is not a select node
     165        || ( popped.clause_status && !popped.park_counter ) // If popped link is special case OR selecting unpark but don't call __make_select_node_available
     166        || __make_select_node_available( popped ) )         // check if popped link belongs to a selecting thread
     167        unpark( popped.blocked_thread );
     168}
     169
     170static inline void wake_one( dlist( select_node ) & queue ) { wake_one( queue, try_pop_front( queue ) ); }
     171
     172static inline void setup_clause( select_node & this, unsigned long int * clause_status, int * park_counter ) {
     173    this.blocked_thread = active_thread();
     174    this.clause_status = clause_status;
     175    this.park_counter = park_counter;
     176}
     177
     178// waituntil ( timeout( ... ) ) support
     179struct select_timeout_node {
     180    alarm_node_t a_node;
     181    select_node * s_node;
     182};
     183void ?{}( select_timeout_node & this, Duration duration, Alarm_Callback callback );
     184void ^?{}( select_timeout_node & this );
     185void timeout_handler_select_cast( alarm_node_t & node );
     186
     187// Selectable trait routines
     188bool register_select( select_timeout_node & this, select_node & node );
     189bool unregister_select( select_timeout_node & this, select_node & node );
     190bool on_selected( select_timeout_node & this, select_node & node );
     191
     192// Gateway routines to waituntil on duration
     193select_timeout_node timeout( Duration duration );
     194select_timeout_node sleep( Duration duration );
  • libcfa/src/concurrency/thread.cfa

    r3a513d89 r044ae62  
    5353        preferred = ready_queue_new_preferred();
    5454        last_proc = 0p;
     55    link_node = 0p;
    5556        PRNG_SET_SEED( random_state, __global_random_mask ? __global_random_prime : __global_random_prime ^ rdtscl() );
    5657        #if defined( __CFA_WITH_VERIFY__ )
     
    5960        #endif
    6061
    61         clh_node = malloc( );
    62         *clh_node = false;
    63 
    6462        doregister(curr_cluster, this);
    6563        monitors{ &self_mon_p, 1, (fptr_t)0 };
     
    7068                canary = 0xDEADDEADDEADDEADp;
    7169        #endif
    72         free(clh_node);
    7370        unregister(curr_cluster, this);
    7471        ^self_cor{};
Note: See TracChangeset for help on using the changeset viewer.