- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/select.hfa
r5e4a830 re23b3ce 1 // 2 // Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo 3 // 4 // The contents of this file are covered under the licence agreement in the 5 // file "LICENCE" distributed with Cforall. 6 // 7 // channel.hfa -- LIBCFATHREAD 8 // Runtime locks that used with the runtime thread system. 9 // 10 // Author : Colby Alexander Parsons 11 // Created On : Thu Jan 21 19:46:50 2023 12 // Last Modified By : 13 // Last Modified On : 14 // Update Count : 15 // 16 1 17 #pragma once 2 18 3 19 #include "containers/list.hfa" 4 #include <stdint.h>5 #include <kernel.hfa>6 #include <locks.hfa>20 #include "alarm.hfa" 21 #include "kernel.hfa" 22 #include "time.hfa" 7 23 24 struct select_node; 25 26 // node status 27 static const unsigned long int __SELECT_UNSAT = 0; 28 static const unsigned long int __SELECT_PENDING = 1; // used only by special OR case 29 static const unsigned long int __SELECT_SAT = 2; 30 static const unsigned long int __SELECT_RUN = 3; 31 32 // these are used inside the compiler to aid in code generation 33 static inline bool __CFA_has_clause_run( unsigned long int status ) { return status == __SELECT_RUN; } 34 static inline void __CFA_maybe_park( int * park_counter ) { 35 if ( __atomic_sub_fetch( park_counter, 1, __ATOMIC_SEQ_CST) < 0 ) 36 park(); 37 } 38 39 // node used for coordinating waituntil synchronization 8 40 struct select_node { 41 int * park_counter; // If this is 0p then the node is in a special OR case waituntil 42 unsigned long int * clause_status; // needs to point at ptr sized location, if this is 0p then node is not part of a waituntil 43 44 void * extra; // used to store arbitrary data needed by some primitives 45 9 46 thread$ * blocked_thread; 10 void ** race_flag;11 47 inline dlink(select_node); 12 48 }; 13 49 P9_EMBEDDED( select_node, dlink(select_node) ) 14 50 15 void ?{}( select_node & this ) { 16 this.blocked_thread = 0p; 17 this.race_flag = 0p; 51 static inline void ?{}( select_node & this ) { 52 this.blocked_thread = active_thread(); 53 this.clause_status = 0p; 54 this.park_counter = 0p; 55 this.extra = 0p; 18 56 } 19 57 20 void ?{}( select_node & this, thread$ * blocked_thread ) {58 static inline void ?{}( select_node & this, thread$ * blocked_thread ) { 21 59 this.blocked_thread = blocked_thread; 22 this.race_flag = 0p; 60 this.clause_status = 0p; 61 this.park_counter = 0p; 62 this.extra = 0p; 23 63 } 24 64 25 void ?{}( select_node & this, thread$ * blocked_thread, void ** race_flag) {65 static inline void ?{}( select_node & this, thread$ * blocked_thread, void * extra ) { 26 66 this.blocked_thread = blocked_thread; 27 this.race_flag = race_flag; 67 this.clause_status = 0p; 68 this.park_counter = 0p; 69 this.extra = extra; 28 70 } 71 static inline void ^?{}( select_node & this ) {} 29 72 30 void ^?{}( select_node & this ) {} 73 // this is used inside the compiler to aid in code generation 74 static inline unsigned long int * __get_clause_status( select_node & s ) { return s.clause_status; } 31 75 76 // this is used inside the compiler to attempt to establish an else clause as a winner in the OR special case race 77 static inline bool __select_node_else_race( select_node & this ) with( this ) { 78 unsigned long int cmp_status = __SELECT_UNSAT; 79 return *clause_status == 0 80 && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ); 81 } 32 82 33 83 //----------------------------------------------------------------------------- 34 84 // is_selectable 35 trait is_selectable(T & | sized(T)) { 36 // For registering a select on a selectable concurrency primitive 37 // return 0p if primitive not accessible yet 38 // return 1p if primitive gets acquired 39 // return 2p if primitive is accessible but some other primitive won the race 40 // C_TODO: add enum for return values 41 void * register_select( T &, select_node & ); 85 forall(T & | sized(T)) 86 trait is_selectable { 87 // For registering a select stmt on a selectable concurrency primitive 88 // Returns bool that indicates if operation is already SAT 89 bool register_select( T &, select_node & ); 42 90 43 void unregister_select( T &, select_node & ); 91 // For unregistering a select stmt on a selectable concurrency primitive 92 // If true is returned then the corresponding code block is run (only in non-special OR case and only if node status is not RUN) 93 bool unregister_select( T &, select_node & ); 94 95 // This routine is run on the selecting thread prior to executing the statement corresponding to the select_node 96 // passed as an arg to this routine 97 // If on_selected returns false, the statement is not run, if it returns true it is run. 98 bool on_selected( T &, select_node & ); 44 99 }; 45 100 46 static inline bool install_select_winner( select_node & this, void * primitive_ptr ) with(this) { 47 // temporary needed for atomic instruction 48 void * cmp_flag = 0p; 49 50 // if we dont win the selector race we need to potentially 51 // ignore this node and move to the next one so we return accordingly 52 if ( *race_flag != 0p || 53 !__atomic_compare_exchange_n( 54 race_flag, 55 &cmp_flag, 56 primitive_ptr, 57 false, 58 __ATOMIC_SEQ_CST, 59 __ATOMIC_SEQ_CST 60 ) 61 ) return false; // lost race and some other node triggered select 62 return true; // won race so this node is what the select proceeds with 101 //============================================================================================= 102 // Waituntil Helpers 103 //============================================================================================= 104 105 // used for the 2-stage avail needed by the special OR case 106 static inline bool __mark_select_node( select_node & this, unsigned long int val ) with( this ) { 107 /* paranoid */ verify( park_counter == 0p ); 108 /* paranoid */ verify( clause_status != 0p ); 109 110 unsigned long int cmp_status = __SELECT_UNSAT; 111 while( !__atomic_compare_exchange_n( clause_status, &cmp_status, val, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) { 112 if ( cmp_status != __SELECT_PENDING ) return false; 113 cmp_status = __SELECT_UNSAT; 114 } 115 return true; 63 116 } 117 118 static inline void __make_select_node_unsat( select_node & this ) with( this ) { 119 __atomic_store_n( clause_status, __SELECT_UNSAT, __ATOMIC_SEQ_CST ); 120 } 121 static inline void __make_select_node_sat( select_node & this ) with( this ) { 122 __atomic_store_n( clause_status, __SELECT_SAT, __ATOMIC_SEQ_CST ); 123 } 124 125 static inline bool __make_select_node_pending( select_node & this ) with( this ) { 126 return __mark_select_node( this, __SELECT_PENDING ); 127 } 128 129 // when a primitive becomes available it calls the following routine on it's node to update the select state: 130 // return true if we want to unpark the thd 131 static inline bool __make_select_node_available( select_node & this ) with( this ) { 132 /* paranoid */ verify( clause_status != 0p ); 133 if( !park_counter ) 134 return __mark_select_node( this, (unsigned long int)&this ); 135 136 unsigned long int cmp_status = __SELECT_UNSAT; 137 138 return *clause_status == 0 // C_TODO might not need a cmp_xchg in non special OR case 139 && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) // can maybe just use atomic write 140 && !__atomic_add_fetch( park_counter, 1, __ATOMIC_SEQ_CST); 141 } 142 143 // Handles the special OR case of the waituntil statement 144 // Since only one select node can win in the OR case, we need to race to set the node available BEFORE 145 // performing the operation since if we lose the race the operation should not be performed as it will be lost 146 // Returns true if execution can continue normally and false if the queue has now been drained 147 static inline bool __handle_waituntil_OR( dlist( select_node ) & queue ) { 148 if ( queue`isEmpty ) return false; 149 if ( queue`first.clause_status && !queue`first.park_counter ) { 150 while ( !queue`isEmpty ) { 151 // if node not a special OR case or if we win the special OR case race break 152 if ( !queue`first.clause_status || queue`first.park_counter || __make_select_node_available( queue`first ) ) 153 return true; 154 // otherwise we lost the special OR race so discard node 155 try_pop_front( queue ); 156 } 157 return false; 158 } 159 return true; 160 } 161 162 // wake one thread from the list 163 static inline void wake_one( dlist( select_node ) & queue, select_node & popped ) { 164 if ( !popped.clause_status // normal case, node is not a select node 165 || ( popped.clause_status && !popped.park_counter ) // If popped link is special case OR selecting unpark but don't call __make_select_node_available 166 || __make_select_node_available( popped ) ) // check if popped link belongs to a selecting thread 167 unpark( popped.blocked_thread ); 168 } 169 170 static inline void wake_one( dlist( select_node ) & queue ) { wake_one( queue, try_pop_front( queue ) ); } 171 172 static inline void setup_clause( select_node & this, unsigned long int * clause_status, int * park_counter ) { 173 this.blocked_thread = active_thread(); 174 this.clause_status = clause_status; 175 this.park_counter = park_counter; 176 } 177 178 // waituntil ( timeout( ... ) ) support 179 struct select_timeout_node { 180 alarm_node_t a_node; 181 select_node * s_node; 182 }; 183 void ?{}( select_timeout_node & this, Duration duration, Alarm_Callback callback ); 184 void ^?{}( select_timeout_node & this ); 185 void timeout_handler_select_cast( alarm_node_t & node ); 186 187 // Selectable trait routines 188 bool register_select( select_timeout_node & this, select_node & node ); 189 bool unregister_select( select_timeout_node & this, select_node & node ); 190 bool on_selected( select_timeout_node & this, select_node & node ); 191 192 // Gateway routines to waituntil on duration 193 select_timeout_node timeout( Duration duration ); 194 select_timeout_node sleep( Duration duration );
Note:
See TracChangeset
for help on using the changeset viewer.