Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/select.hfa

    re23b3ce r5e4a830  
    1 //
    2 // Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
    3 //
    4 // The contents of this file are covered under the licence agreement in the
    5 // file "LICENCE" distributed with Cforall.
    6 //
    7 // channel.hfa -- LIBCFATHREAD
    8 // Runtime locks that used with the runtime thread system.
    9 //
    10 // Author           : Colby Alexander Parsons
    11 // Created On       : Thu Jan 21 19:46:50 2023
    12 // Last Modified By :
    13 // Last Modified On :
    14 // Update Count     :
    15 //
    16 
    171#pragma once
    182
    193#include "containers/list.hfa"
    20 #include "alarm.hfa"
    21 #include "kernel.hfa"
    22 #include "time.hfa"
     4#include <stdint.h>
     5#include <kernel.hfa>
     6#include <locks.hfa>
    237
    24 struct select_node;
    25 
    26 // node status
    27 static const unsigned long int __SELECT_UNSAT = 0;
    28 static const unsigned long int __SELECT_PENDING = 1; // used only by special OR case
    29 static const unsigned long int __SELECT_SAT = 2;
    30 static const unsigned long int __SELECT_RUN = 3;
    31 
    32 // these are used inside the compiler to aid in code generation
    33 static inline bool __CFA_has_clause_run( unsigned long int status ) { return status == __SELECT_RUN; }
    34 static inline void __CFA_maybe_park( int * park_counter ) {
    35     if ( __atomic_sub_fetch( park_counter, 1, __ATOMIC_SEQ_CST) < 0 )
    36         park();
    37 }
    38 
    39 // node used for coordinating waituntil synchronization
    408struct select_node {
    41     int * park_counter;                 // If this is 0p then the node is in a special OR case waituntil
    42     unsigned long int * clause_status;  // needs to point at ptr sized location, if this is 0p then node is not part of a waituntil
    43 
    44     void * extra;                       // used to store arbitrary data needed by some primitives
    45 
    469    thread$ * blocked_thread;
     10    void ** race_flag;
    4711    inline dlink(select_node);
    4812};
    4913P9_EMBEDDED( select_node, dlink(select_node) )
    5014
    51 static inline void ?{}( select_node & this ) {
    52     this.blocked_thread = active_thread();
    53     this.clause_status = 0p;
    54     this.park_counter = 0p;
    55     this.extra = 0p;
     15void ?{}( select_node & this ) {
     16    this.blocked_thread = 0p;
     17    this.race_flag = 0p;
    5618}
    5719
    58 static inline void ?{}( select_node & this, thread$ * blocked_thread ) {
     20void ?{}( select_node & this, thread$ * blocked_thread ) {
    5921    this.blocked_thread = blocked_thread;
    60     this.clause_status = 0p;
    61     this.park_counter = 0p;
    62     this.extra = 0p;
     22    this.race_flag = 0p;
    6323}
    6424
    65 static inline void ?{}( select_node & this, thread$ * blocked_thread, void * extra ) {
     25void ?{}( select_node & this, thread$ * blocked_thread, void ** race_flag ) {
    6626    this.blocked_thread = blocked_thread;
    67     this.clause_status = 0p;
    68     this.park_counter = 0p;
    69     this.extra = extra;
     27    this.race_flag = race_flag;
    7028}
    71 static inline void ^?{}( select_node & this ) {}
    7229
    73 // this is used inside the compiler to aid in code generation
    74 static inline unsigned long int * __get_clause_status( select_node & s ) { return s.clause_status; }
     30void ^?{}( select_node & this ) {}
    7531
    76 // this is used inside the compiler to attempt to establish an else clause as a winner in the OR special case race
    77 static inline bool __select_node_else_race( select_node & this ) with( this ) {
    78     unsigned long int cmp_status = __SELECT_UNSAT;
    79     return *clause_status == 0
    80             && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST );
    81 }
    8232
    8333//-----------------------------------------------------------------------------
    8434// is_selectable
    85 forall(T & | sized(T))
    86 trait is_selectable {
    87     // For registering a select stmt on a selectable concurrency primitive
    88     // Returns bool that indicates if operation is already SAT
    89     bool register_select( T &, select_node & );
     35trait is_selectable(T & | sized(T)) {
     36    // For registering a select on a selectable concurrency primitive
     37    // return 0p if primitive not accessible yet
     38    // return 1p if primitive gets acquired
     39    // return 2p if primitive is accessible but some other primitive won the race
     40    // C_TODO: add enum for return values
     41    void * register_select( T &, select_node & );
    9042
    91     // For unregistering a select stmt on a selectable concurrency primitive
    92     // If true is returned then the corresponding code block is run (only in non-special OR case and only if node status is not RUN)
    93     bool unregister_select( T &, select_node &  );
    94 
    95     // This routine is run on the selecting thread prior to executing the statement corresponding to the select_node
    96     //    passed as an arg to this routine
    97     // If on_selected returns false, the statement is not run, if it returns true it is run.
    98     bool on_selected( T &, select_node & );
     43    void unregister_select( T &, select_node &  );
    9944};
    10045
    101 //=============================================================================================
    102 // Waituntil Helpers
    103 //=============================================================================================
    104 
    105 // used for the 2-stage avail needed by the special OR case
    106 static inline bool __mark_select_node( select_node & this, unsigned long int val ) with( this ) {
    107     /* paranoid */ verify( park_counter == 0p );
    108     /* paranoid */ verify( clause_status != 0p );
    109 
    110     unsigned long int cmp_status = __SELECT_UNSAT;
    111     while( !__atomic_compare_exchange_n( clause_status, &cmp_status, val, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {
    112         if ( cmp_status != __SELECT_PENDING ) return false;
    113         cmp_status = __SELECT_UNSAT;
    114     }
    115     return true;
     46static inline bool install_select_winner( select_node & this, void * primitive_ptr ) with(this) {
     47    // temporary needed for atomic instruction
     48    void * cmp_flag = 0p;
     49   
     50    // if we dont win the selector race we need to potentially
     51    //   ignore this node and move to the next one so we return accordingly
     52    if ( *race_flag != 0p ||
     53        !__atomic_compare_exchange_n(
     54            race_flag,
     55            &cmp_flag,
     56            primitive_ptr,
     57            false,
     58            __ATOMIC_SEQ_CST,
     59            __ATOMIC_SEQ_CST
     60        )
     61    ) return false; // lost race and some other node triggered select
     62    return true; // won race so this node is what the select proceeds with
    11663}
    117 
    118 static inline void __make_select_node_unsat( select_node & this ) with( this ) {
    119     __atomic_store_n( clause_status, __SELECT_UNSAT, __ATOMIC_SEQ_CST );
    120 }
    121 static inline void __make_select_node_sat( select_node & this ) with( this ) {
    122     __atomic_store_n( clause_status, __SELECT_SAT, __ATOMIC_SEQ_CST );
    123 }
    124 
    125 static inline bool __make_select_node_pending( select_node & this ) with( this ) {
    126     return __mark_select_node( this, __SELECT_PENDING );
    127 }
    128 
    129 // when a primitive becomes available it calls the following routine on it's node to update the select state:
    130 // return true if we want to unpark the thd
    131 static inline bool __make_select_node_available( select_node & this ) with( this ) {
    132     /* paranoid */ verify( clause_status != 0p );
    133     if( !park_counter )
    134         return __mark_select_node( this, (unsigned long int)&this );
    135 
    136     unsigned long int cmp_status = __SELECT_UNSAT;
    137 
    138     return *clause_status == 0 // C_TODO might not need a cmp_xchg in non special OR case
    139         && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) // can maybe just use atomic write
    140         && !__atomic_add_fetch( park_counter, 1, __ATOMIC_SEQ_CST);
    141 }
    142 
    143 // Handles the special OR case of the waituntil statement
    144 // Since only one select node can win in the OR case, we need to race to set the node available BEFORE
    145 //    performing the operation since if we lose the race the operation should not be performed as it will be lost
    146 // Returns true if execution can continue normally and false if the queue has now been drained
    147 static inline bool __handle_waituntil_OR( dlist( select_node ) & queue ) {
    148     if ( queue`isEmpty ) return false;
    149     if ( queue`first.clause_status && !queue`first.park_counter ) {
    150         while ( !queue`isEmpty ) {
    151             // if node not a special OR case or if we win the special OR case race break
    152             if ( !queue`first.clause_status || queue`first.park_counter || __make_select_node_available( queue`first ) )
    153                 return true;
    154             // otherwise we lost the special OR race so discard node
    155             try_pop_front( queue );
    156         }
    157         return false;
    158     }
    159     return true;
    160 }
    161 
    162 // wake one thread from the list
    163 static inline void wake_one( dlist( select_node ) & queue, select_node & popped ) {
    164     if ( !popped.clause_status                              // normal case, node is not a select node
    165         || ( popped.clause_status && !popped.park_counter ) // If popped link is special case OR selecting unpark but don't call __make_select_node_available
    166         || __make_select_node_available( popped ) )         // check if popped link belongs to a selecting thread
    167         unpark( popped.blocked_thread );
    168 }
    169 
    170 static inline void wake_one( dlist( select_node ) & queue ) { wake_one( queue, try_pop_front( queue ) ); }
    171 
    172 static inline void setup_clause( select_node & this, unsigned long int * clause_status, int * park_counter ) {
    173     this.blocked_thread = active_thread();
    174     this.clause_status = clause_status;
    175     this.park_counter = park_counter;
    176 }
    177 
    178 // waituntil ( timeout( ... ) ) support
    179 struct select_timeout_node {
    180     alarm_node_t a_node;
    181     select_node * s_node;
    182 };
    183 void ?{}( select_timeout_node & this, Duration duration, Alarm_Callback callback );
    184 void ^?{}( select_timeout_node & this );
    185 void timeout_handler_select_cast( alarm_node_t & node );
    186 
    187 // Selectable trait routines
    188 bool register_select( select_timeout_node & this, select_node & node );
    189 bool unregister_select( select_timeout_node & this, select_node & node );
    190 bool on_selected( select_timeout_node & this, select_node & node );
    191 
    192 // Gateway routines to waituntil on duration
    193 select_timeout_node timeout( Duration duration );
    194 select_timeout_node sleep( Duration duration );
Note: See TracChangeset for help on using the changeset viewer.