Ignore:
Timestamp:
Apr 25, 2025, 7:39:09 AM (5 months ago)
Author:
Peter A. Buhr <pabuhr@…>
Branches:
master
Children:
65bd3c2
Parents:
b195498
Message:

change backquote call to regular call

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/select.hfa

    rb195498 r6b33e89  
    1010// Author           : Colby Alexander Parsons
    1111// Created On       : Thu Jan 21 19:46:50 2023
    12 // Last Modified By : Kyoung Seo
    13 // Last Modified On : Wed Mar 19 12:00:00 2025
    14 // Update Count     : 1
     12// Last Modified By : Peter A. Buhr
     13// Last Modified On : Fri Apr 25 07:31:26 2025
     14// Update Count     : 5
    1515//
    1616
     
    3333static inline bool __CFA_has_clause_run( unsigned long int status ) { return status == __SELECT_RUN; }
    3434static inline void __CFA_maybe_park( int * park_counter ) {
    35     if ( __atomic_sub_fetch( park_counter, 1, __ATOMIC_SEQ_CST) < 0 )
    36         park();
     35        if ( __atomic_sub_fetch( park_counter, 1, __ATOMIC_SEQ_CST) < 0 )
     36                park();
    3737}
    3838
    3939// node used for coordinating waituntil synchronization
    4040struct select_node {
    41     int * park_counter;                // If this is 0p then the node is in a special OR case waituntil
    42     unsigned long int * clause_status;  // needs to point at ptr sized location, if this is 0p then node is not part of a waituntil
    43 
    44     void * extra;                       // used to store arbitrary data needed by some primitives
    45 
    46     thread$ * blocked_thread;
    47     inline dlink(select_node);
     41        int * park_counter;                              // If this is 0p then the node is in a special OR case waituntil
     42        unsigned long int * clause_status;  // needs to point at ptr sized location, if this is 0p then node is not part of a waituntil
     43
     44        void * extra;                                      // used to store arbitrary data needed by some primitives
     45
     46        thread$ * blocked_thread;
     47        inline dlink(select_node);
    4848};
    4949P9_EMBEDDED( select_node, dlink(select_node) )
    5050
    5151static inline void ?{}( select_node & this ) {
    52     this.blocked_thread = active_thread();
    53     this.clause_status = 0p;
    54     this.park_counter = 0p;
    55     this.extra = 0p;
     52        this.blocked_thread = active_thread();
     53        this.clause_status = 0p;
     54        this.park_counter = 0p;
     55        this.extra = 0p;
    5656}
    5757
    5858static inline void ?{}( select_node & this, thread$ * blocked_thread ) {
    59     this.blocked_thread = blocked_thread;
    60     this.clause_status = 0p;
    61     this.park_counter = 0p;
    62     this.extra = 0p;
     59        this.blocked_thread = blocked_thread;
     60        this.clause_status = 0p;
     61        this.park_counter = 0p;
     62        this.extra = 0p;
    6363}
    6464
    6565static inline void ?{}( select_node & this, thread$ * blocked_thread, void * extra ) {
    66     this.blocked_thread = blocked_thread;
    67     this.clause_status = 0p;
    68     this.park_counter = 0p;
    69     this.extra = extra;
     66        this.blocked_thread = blocked_thread;
     67        this.clause_status = 0p;
     68        this.park_counter = 0p;
     69        this.extra = extra;
    7070}
    7171static inline void ^?{}( select_node & this ) {}
     
    7676// this is used inside the compiler to attempt to establish an else clause as a winner in the OR special case race
    7777static inline bool __select_node_else_race( select_node & this ) with( this ) {
    78     unsigned long int cmp_status = __SELECT_UNSAT;
    79     return *clause_status == 0
    80             && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST );
     78        unsigned long int cmp_status = __SELECT_UNSAT;
     79        return *clause_status == 0
     80                        && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST );
    8181}
    8282
     
    8585forall(T & | sized(T))
    8686trait is_selectable {
    87     // For registering a select stmt on a selectable concurrency primitive
    88     // Returns bool that indicates if operation is already SAT
    89     bool register_select( T &, select_node & );
    90 
    91     // For unregistering a select stmt on a selectable concurrency primitive
    92     // If true is returned then the corresponding code block is run (only in non-special OR case and only if node status is not RUN)
    93     bool unregister_select( T &, select_node & );
    94 
    95     // This routine is run on the selecting thread prior to executing the statement corresponding to the select_node
    96     //    passed as an arg to this routine. If true is returned proceed as normal, if false is returned the statement is skipped
    97     bool on_selected( T &, select_node & );
     87        // For registering a select stmt on a selectable concurrency primitive
     88        // Returns bool that indicates if operation is already SAT
     89        bool register_select( T &, select_node & );
     90
     91        // For unregistering a select stmt on a selectable concurrency primitive
     92        // If true is returned then the corresponding code block is run (only in non-special OR case and only if node status is not RUN)
     93        bool unregister_select( T &, select_node & );
     94
     95        // This routine is run on the selecting thread prior to executing the statement corresponding to the select_node
     96        //      passed as an arg to this routine. If true is returned proceed as normal, if false is returned the statement is skipped
     97        bool on_selected( T &, select_node & );
    9898};
    9999// Used inside the compiler to allow for overloading on return type for operations such as '?<<?' for channels
     
    107107
    108108static inline void __make_select_node_unsat( select_node & this ) with( this ) {
    109     __atomic_store_n( clause_status, __SELECT_UNSAT, __ATOMIC_SEQ_CST );
     109        __atomic_store_n( clause_status, __SELECT_UNSAT, __ATOMIC_SEQ_CST );
    110110}
    111111static inline void __make_select_node_sat( select_node & this ) with( this ) {
    112     __atomic_store_n( clause_status, __SELECT_SAT, __ATOMIC_SEQ_CST );
     112        __atomic_store_n( clause_status, __SELECT_SAT, __ATOMIC_SEQ_CST );
    113113}
    114114
    115115// used for the 2-stage avail needed by the special OR case
    116116static inline bool __mark_select_node( select_node & this, unsigned long int val ) with( this ) {
    117     /* paranoid */ verify( park_counter == 0p );
    118     /* paranoid */ verify( clause_status != 0p );
    119 
    120     unsigned long int cmp_status = __SELECT_UNSAT;
    121     while( !__atomic_compare_exchange_n( clause_status, &cmp_status, val, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {
    122         if ( cmp_status != __SELECT_PENDING ) return false;
    123         cmp_status = __SELECT_UNSAT;
    124     }
    125     return true;
     117        /* paranoid */ verify( park_counter == 0p );
     118        /* paranoid */ verify( clause_status != 0p );
     119
     120        unsigned long int cmp_status = __SELECT_UNSAT;
     121        while( ! __atomic_compare_exchange_n( clause_status, &cmp_status, val, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {
     122                if ( cmp_status != __SELECT_PENDING ) return false;
     123                cmp_status = __SELECT_UNSAT;
     124        }
     125        return true;
    126126}
    127127
    128128// used for the 2-stage avail by the thread who owns a pending node
    129129static inline bool __pending_set_other( select_node & other, select_node & mine, unsigned long int val ) with( other ) {
    130     /* paranoid */ verify( park_counter == 0p );
    131     /* paranoid */ verify( clause_status != 0p );
    132 
    133     unsigned long int cmp_status = __SELECT_UNSAT;
    134     while( !__atomic_compare_exchange_n( clause_status, &cmp_status, val, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {
    135         if ( cmp_status != __SELECT_PENDING )
    136             return false;
    137 
    138         // toggle current status flag to avoid starvation/deadlock
    139         __make_select_node_unsat( mine );
    140         cmp_status = __SELECT_UNSAT;
    141         if ( !__atomic_compare_exchange_n( mine.clause_status, &cmp_status, __SELECT_PENDING, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) )
    142             return false;
    143         cmp_status = __SELECT_UNSAT;
    144     }
    145     return true;
     130        /* paranoid */ verify( park_counter == 0p );
     131        /* paranoid */ verify( clause_status != 0p );
     132
     133        unsigned long int cmp_status = __SELECT_UNSAT;
     134        while( ! __atomic_compare_exchange_n( clause_status, &cmp_status, val, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {
     135                if ( cmp_status != __SELECT_PENDING )
     136                        return false;
     137
     138                // toggle current status flag to avoid starvation/deadlock
     139                __make_select_node_unsat( mine );
     140                cmp_status = __SELECT_UNSAT;
     141                if ( ! __atomic_compare_exchange_n( mine.clause_status, &cmp_status, __SELECT_PENDING, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) )
     142                        return false;
     143                cmp_status = __SELECT_UNSAT;
     144        }
     145        return true;
    146146}
    147147
    148148static inline bool __make_select_node_pending( select_node & this ) with( this ) {
    149     return __mark_select_node( this, __SELECT_PENDING );
     149        return __mark_select_node( this, __SELECT_PENDING );
    150150}
    151151
     
    153153// return true if we want to unpark the thd
    154154static inline bool __make_select_node_available( select_node & this ) with( this ) {
    155     /* paranoid */ verify( clause_status != 0p );
    156     if( !park_counter )
    157         return __mark_select_node( this, (unsigned long int)&this );
    158 
    159     unsigned long int cmp_status = __SELECT_UNSAT;
    160 
    161     return *clause_status == 0 // C_TODO might not need a cmp_xchg in non special OR case
    162         && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) // can maybe just use atomic write
    163         && !__atomic_add_fetch( park_counter, 1, __ATOMIC_SEQ_CST);
     155        /* paranoid */ verify( clause_status != 0p );
     156        if ( ! park_counter )
     157                return __mark_select_node( this, (unsigned long int)&this );
     158
     159        unsigned long int cmp_status = __SELECT_UNSAT;
     160
     161        return *clause_status == 0 // C_TODO might not need a cmp_xchg in non special OR case
     162                && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) // can maybe just use atomic write
     163                && ! __atomic_add_fetch( park_counter, 1, __ATOMIC_SEQ_CST);
    164164}
    165165
    166166// Handles the special OR case of the waituntil statement
    167167// Since only one select node can win in the OR case, we need to race to set the node available BEFORE
    168 //    performing the operation since if we lose the race the operation should not be performed as it will be lost
     168// performing the operation since if we lose the race the operation should not be performed as it will be lost
    169169// Returns true if execution can continue normally and false if the queue has now been drained
    170170static inline bool __handle_waituntil_OR( dlist( select_node ) & queue ) {
    171     if ( queue`isEmpty ) return false;
    172     if ( queue`first.clause_status && !queue`first.park_counter ) {
    173         while ( !queue`isEmpty ) {
    174             // if node not a special OR case or if we win the special OR case race break
    175             if ( !queue`first.clause_status || queue`first.park_counter || __make_select_node_available( queue`first ) )
    176                 return true;
    177             // otherwise we lost the special OR race so discard node
    178             try_pop_front( queue );
    179         }
    180         return false;
    181     }
    182     return true;
     171        if ( isEmpty( queue ) ) return false;
     172        if ( first( queue ).clause_status && ! first( queue ).park_counter ) {
     173                while ( ! isEmpty( queue ) ) {
     174                        // if node not a special OR case or if we win the special OR case race break
     175                        if ( ! first( queue ).clause_status || first( queue ).park_counter || __make_select_node_available( first( queue ) ) )
     176                                return true;
     177                        // otherwise we lost the special OR race so discard node
     178                        remove_first( queue );
     179                }
     180                return false;
     181        }
     182        return true;
    183183}
    184184
    185185// wake one thread from the list
    186186static inline void wake_one( dlist( select_node ) & /*queue*/, select_node & popped ) {
    187     if ( !popped.clause_status                              // normal case, node is not a select node
    188         || ( popped.clause_status && !popped.park_counter ) // If popped link is special case OR selecting unpark but don't call __make_select_node_available
    189         || __make_select_node_available( popped ) )        // check if popped link belongs to a selecting thread
    190         unpark( popped.blocked_thread );
    191 }
    192 
    193 static inline void wake_one( dlist( select_node ) & queue ) { wake_one( queue, try_pop_front( queue ) ); }
     187        if ( ! popped.clause_status                                                       // normal case, node is not a select node
     188                || ( popped.clause_status && ! popped.park_counter ) // If popped link is special case OR selecting unpark but don't call __make_select_node_available
     189                || __make_select_node_available( popped ) )              // check if popped link belongs to a selecting thread
     190                unpark( popped.blocked_thread );
     191}
     192
     193static inline void wake_one( dlist( select_node ) & queue ) { wake_one( queue, remove_first( queue ) ); }
    194194
    195195static inline void setup_clause( select_node & this, unsigned long int * clause_status, int * park_counter ) {
    196     this.blocked_thread = active_thread();
    197     this.clause_status = clause_status;
    198     this.park_counter = park_counter;
     196        this.blocked_thread = active_thread();
     197        this.clause_status = clause_status;
     198        this.park_counter = park_counter;
    199199}
    200200
    201201// waituntil ( timeout( ... ) ) support
    202202struct select_timeout_node {
    203     alarm_node_t a_node;
    204     select_node * s_node;
     203        alarm_node_t a_node;
     204        select_node * s_node;
    205205};
    206206void ?{}( select_timeout_node & this, Duration duration, Alarm_Callback callback );
Note: See TracChangeset for help on using the changeset viewer.