Changeset 6b33e89 for libcfa/src/concurrency/select.hfa
- Timestamp:
- Apr 25, 2025, 7:39:09 AM (5 months ago)
- Branches:
- master
- Children:
- 65bd3c2
- Parents:
- b195498
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/select.hfa
rb195498 r6b33e89 10 10 // Author : Colby Alexander Parsons 11 11 // Created On : Thu Jan 21 19:46:50 2023 12 // Last Modified By : Kyoung Seo13 // Last Modified On : Wed Mar 19 12:00:00202514 // Update Count : 112 // Last Modified By : Peter A. Buhr 13 // Last Modified On : Fri Apr 25 07:31:26 2025 14 // Update Count : 5 15 15 // 16 16 … … 33 33 static inline bool __CFA_has_clause_run( unsigned long int status ) { return status == __SELECT_RUN; } 34 34 static inline void __CFA_maybe_park( int * park_counter ) { 35 36 35 if ( __atomic_sub_fetch( park_counter, 1, __ATOMIC_SEQ_CST) < 0 ) 36 park(); 37 37 } 38 38 39 39 // node used for coordinating waituntil synchronization 40 40 struct select_node { 41 int * park_counter;// If this is 0p then the node is in a special OR case waituntil42 43 44 void * extra;// used to store arbitrary data needed by some primitives45 46 47 41 int * park_counter; // If this is 0p then the node is in a special OR case waituntil 42 unsigned long int * clause_status; // needs to point at ptr sized location, if this is 0p then node is not part of a waituntil 43 44 void * extra; // used to store arbitrary data needed by some primitives 45 46 thread$ * blocked_thread; 47 inline dlink(select_node); 48 48 }; 49 49 P9_EMBEDDED( select_node, dlink(select_node) ) 50 50 51 51 static inline void ?{}( select_node & this ) { 52 53 54 55 52 this.blocked_thread = active_thread(); 53 this.clause_status = 0p; 54 this.park_counter = 0p; 55 this.extra = 0p; 56 56 } 57 57 58 58 static inline void ?{}( select_node & this, thread$ * blocked_thread ) { 59 60 61 62 59 this.blocked_thread = blocked_thread; 60 this.clause_status = 0p; 61 this.park_counter = 0p; 62 this.extra = 0p; 63 63 } 64 64 65 65 static inline void ?{}( select_node & this, thread$ * blocked_thread, void * extra ) { 66 67 68 69 66 this.blocked_thread = blocked_thread; 67 this.clause_status = 0p; 68 this.park_counter = 0p; 69 this.extra = extra; 70 70 } 71 71 static inline void ^?{}( select_node & this ) {} … … 76 76 // this is used inside the compiler to attempt to establish an else clause as a winner in the OR special case race 77 77 static inline bool __select_node_else_race( select_node & this ) with( this ) { 78 79 80 78 unsigned long int cmp_status = __SELECT_UNSAT; 79 return *clause_status == 0 80 && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ); 81 81 } 82 82 … … 85 85 forall(T & | sized(T)) 86 86 trait is_selectable { 87 88 89 90 91 92 93 94 95 96 //passed as an arg to this routine. If true is returned proceed as normal, if false is returned the statement is skipped97 87 // For registering a select stmt on a selectable concurrency primitive 88 // Returns bool that indicates if operation is already SAT 89 bool register_select( T &, select_node & ); 90 91 // For unregistering a select stmt on a selectable concurrency primitive 92 // If true is returned then the corresponding code block is run (only in non-special OR case and only if node status is not RUN) 93 bool unregister_select( T &, select_node & ); 94 95 // This routine is run on the selecting thread prior to executing the statement corresponding to the select_node 96 // passed as an arg to this routine. If true is returned proceed as normal, if false is returned the statement is skipped 97 bool on_selected( T &, select_node & ); 98 98 }; 99 99 // Used inside the compiler to allow for overloading on return type for operations such as '?<<?' for channels … … 107 107 108 108 static inline void __make_select_node_unsat( select_node & this ) with( this ) { 109 109 __atomic_store_n( clause_status, __SELECT_UNSAT, __ATOMIC_SEQ_CST ); 110 110 } 111 111 static inline void __make_select_node_sat( select_node & this ) with( this ) { 112 112 __atomic_store_n( clause_status, __SELECT_SAT, __ATOMIC_SEQ_CST ); 113 113 } 114 114 115 115 // used for the 2-stage avail needed by the special OR case 116 116 static inline bool __mark_select_node( select_node & this, unsigned long int val ) with( this ) { 117 118 119 120 121 while( !__atomic_compare_exchange_n( clause_status, &cmp_status, val, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {122 123 124 125 117 /* paranoid */ verify( park_counter == 0p ); 118 /* paranoid */ verify( clause_status != 0p ); 119 120 unsigned long int cmp_status = __SELECT_UNSAT; 121 while( ! __atomic_compare_exchange_n( clause_status, &cmp_status, val, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) { 122 if ( cmp_status != __SELECT_PENDING ) return false; 123 cmp_status = __SELECT_UNSAT; 124 } 125 return true; 126 126 } 127 127 128 128 // used for the 2-stage avail by the thread who owns a pending node 129 129 static inline bool __pending_set_other( select_node & other, select_node & mine, unsigned long int val ) with( other ) { 130 131 132 133 134 while( !__atomic_compare_exchange_n( clause_status, &cmp_status, val, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {135 136 137 138 139 140 141 if ( !__atomic_compare_exchange_n( mine.clause_status, &cmp_status, __SELECT_PENDING, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) )142 143 144 145 130 /* paranoid */ verify( park_counter == 0p ); 131 /* paranoid */ verify( clause_status != 0p ); 132 133 unsigned long int cmp_status = __SELECT_UNSAT; 134 while( ! __atomic_compare_exchange_n( clause_status, &cmp_status, val, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) { 135 if ( cmp_status != __SELECT_PENDING ) 136 return false; 137 138 // toggle current status flag to avoid starvation/deadlock 139 __make_select_node_unsat( mine ); 140 cmp_status = __SELECT_UNSAT; 141 if ( ! __atomic_compare_exchange_n( mine.clause_status, &cmp_status, __SELECT_PENDING, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) 142 return false; 143 cmp_status = __SELECT_UNSAT; 144 } 145 return true; 146 146 } 147 147 148 148 static inline bool __make_select_node_pending( select_node & this ) with( this ) { 149 149 return __mark_select_node( this, __SELECT_PENDING ); 150 150 } 151 151 … … 153 153 // return true if we want to unpark the thd 154 154 static inline bool __make_select_node_available( select_node & this ) with( this ) { 155 156 if( !park_counter )157 158 159 160 161 162 163 && !__atomic_add_fetch( park_counter, 1, __ATOMIC_SEQ_CST);155 /* paranoid */ verify( clause_status != 0p ); 156 if ( ! park_counter ) 157 return __mark_select_node( this, (unsigned long int)&this ); 158 159 unsigned long int cmp_status = __SELECT_UNSAT; 160 161 return *clause_status == 0 // C_TODO might not need a cmp_xchg in non special OR case 162 && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) // can maybe just use atomic write 163 && ! __atomic_add_fetch( park_counter, 1, __ATOMIC_SEQ_CST); 164 164 } 165 165 166 166 // Handles the special OR case of the waituntil statement 167 167 // Since only one select node can win in the OR case, we need to race to set the node available BEFORE 168 // 168 // performing the operation since if we lose the race the operation should not be performed as it will be lost 169 169 // Returns true if execution can continue normally and false if the queue has now been drained 170 170 static inline bool __handle_waituntil_OR( dlist( select_node ) & queue ) { 171 if ( queue`isEmpty) return false;172 if ( queue`first.clause_status && !queue`first.park_counter ) {173 while ( !queue`isEmpty) {174 175 if ( !queue`first.clause_status || queue`first.park_counter || __make_select_node_available( queue`first) )176 177 178 try_pop_front( queue );179 180 181 182 171 if ( isEmpty( queue ) ) return false; 172 if ( first( queue ).clause_status && ! first( queue ).park_counter ) { 173 while ( ! isEmpty( queue ) ) { 174 // if node not a special OR case or if we win the special OR case race break 175 if ( ! first( queue ).clause_status || first( queue ).park_counter || __make_select_node_available( first( queue ) ) ) 176 return true; 177 // otherwise we lost the special OR race so discard node 178 remove_first( queue ); 179 } 180 return false; 181 } 182 return true; 183 183 } 184 184 185 185 // wake one thread from the list 186 186 static inline void wake_one( dlist( select_node ) & /*queue*/, select_node & popped ) { 187 if ( !popped.clause_status// normal case, node is not a select node188 || ( popped.clause_status && !popped.park_counter ) // If popped link is special case OR selecting unpark but don't call __make_select_node_available189 || __make_select_node_available( popped ) )// check if popped link belongs to a selecting thread190 191 } 192 193 static inline void wake_one( dlist( select_node ) & queue ) { wake_one( queue, try_pop_front( queue ) ); }187 if ( ! popped.clause_status // normal case, node is not a select node 188 || ( popped.clause_status && ! popped.park_counter ) // If popped link is special case OR selecting unpark but don't call __make_select_node_available 189 || __make_select_node_available( popped ) ) // check if popped link belongs to a selecting thread 190 unpark( popped.blocked_thread ); 191 } 192 193 static inline void wake_one( dlist( select_node ) & queue ) { wake_one( queue, remove_first( queue ) ); } 194 194 195 195 static inline void setup_clause( select_node & this, unsigned long int * clause_status, int * park_counter ) { 196 197 198 196 this.blocked_thread = active_thread(); 197 this.clause_status = clause_status; 198 this.park_counter = park_counter; 199 199 } 200 200 201 201 // waituntil ( timeout( ... ) ) support 202 202 struct select_timeout_node { 203 204 203 alarm_node_t a_node; 204 select_node * s_node; 205 205 }; 206 206 void ?{}( select_timeout_node & this, Duration duration, Alarm_Callback callback );
Note:
See TracChangeset
for help on using the changeset viewer.