Changeset beeff61e for libcfa/src/concurrency/select.hfa
- Timestamp:
- May 1, 2023, 4:00:06 PM (15 months ago)
- Branches:
- ADT, ast-experimental, master
- Children:
- 73bf7ddc
- Parents:
- bb7422a
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/select.hfa
rbb7422a rbeeff61e 2 2 3 3 #include "containers/list.hfa" 4 #include <stdint.h> 5 #include <kernel.hfa> 6 #include <locks.hfa> 4 #include "stdint.h" 5 #include "kernel.hfa" 7 6 7 struct select_node; 8 9 // node status 10 static const unsigned long int __SELECT_UNSAT = 0; 11 static const unsigned long int __SELECT_SAT = 1; 12 static const unsigned long int __SELECT_RUN = 2; 13 14 static inline bool __CFA_has_clause_run( unsigned long int status ) { return status == __SELECT_RUN; } 15 static inline void __CFA_maybe_park( int * park_counter ) { 16 if ( __atomic_sub_fetch( park_counter, 1, __ATOMIC_SEQ_CST) < 0 ) 17 park(); 18 } 19 20 // node used for coordinating waituntil synchronization 8 21 struct select_node { 22 int * park_counter; // If this is 0p then the node is in a special OR case waituntil 23 unsigned long int * clause_status; // needs to point at ptr sized location, if this is 0p then node is not part of a waituntil 24 25 void * extra; // used to store arbitrary data needed by some primitives 26 9 27 thread$ * blocked_thread; 10 void ** race_flag;11 28 inline dlink(select_node); 12 29 }; 13 30 P9_EMBEDDED( select_node, dlink(select_node) ) 14 31 15 void ?{}( select_node & this ) { 16 this.blocked_thread = 0p; 17 this.race_flag = 0p; 32 static inline void ?{}( select_node & this ) { 33 this.blocked_thread = active_thread(); 34 this.clause_status = 0p; 35 this.park_counter = 0p; 36 this.extra = 0p; 18 37 } 19 38 20 void ?{}( select_node & this, thread$ * blocked_thread ) {39 static inline void ?{}( select_node & this, thread$ * blocked_thread ) { 21 40 this.blocked_thread = blocked_thread; 22 this.race_flag = 0p; 41 this.clause_status = 0p; 42 this.park_counter = 0p; 43 this.extra = 0p; 23 44 } 24 45 25 void ?{}( select_node & this, thread$ * blocked_thread, void ** race_flag) {46 static inline void ?{}( select_node & this, thread$ * blocked_thread, void * extra ) { 26 47 this.blocked_thread = blocked_thread; 27 this.race_flag = race_flag; 48 this.clause_status = 0p; 49 this.park_counter = 0p; 50 this.extra = extra; 28 51 } 29 52 30 void ^?{}( select_node & this ) {}53 static inline void ^?{}( select_node & this ) {} 31 54 55 static inline unsigned long int * __get_clause_status( select_node & s ) { return s.clause_status; } 32 56 33 57 //----------------------------------------------------------------------------- 34 58 // is_selectable 35 trait is_selectable(T & | sized(T)) { 36 // For registering a select on a selectable concurrency primitive 37 // return 0p if primitive not accessible yet 38 // return 1p if primitive gets acquired 39 // return 2p if primitive is accessible but some other primitive won the race 40 // C_TODO: add enum for return values 41 void * register_select( T &, select_node & ); 59 forall(T & | sized(T)) 60 trait is_selectable { 61 // For registering a select stmt on a selectable concurrency primitive 62 // Returns bool that indicates if operation is already SAT 63 bool register_select( T &, select_node & ); 42 64 43 void unregister_select( T &, select_node & ); 65 // For unregistering a select stmt on a selectable concurrency primitive 66 // If true is returned then the corresponding code block is run (only in non-special OR case and only if node status is not RUN) 67 bool unregister_select( T &, select_node & ); 68 69 // This routine is run on the selecting thread prior to executing the statement corresponding to the select_node 70 // passed as an arg to this routine 71 // If on_selected returns false, the statement is not run, if it returns true it is run. 72 bool on_selected( T &, select_node & ); 44 73 }; 45 74 46 static inline bool install_select_winner( select_node & this, void * primitive_ptr ) with(this) { 47 // temporary needed for atomic instruction 48 void * cmp_flag = 0p; 49 50 // if we dont win the selector race we need to potentially 51 // ignore this node and move to the next one so we return accordingly 52 if ( *race_flag != 0p || 53 !__atomic_compare_exchange_n( 54 race_flag, 55 &cmp_flag, 56 primitive_ptr, 57 false, 58 __ATOMIC_SEQ_CST, 59 __ATOMIC_SEQ_CST 60 ) 61 ) return false; // lost race and some other node triggered select 62 return true; // won race so this node is what the select proceeds with 75 // this is used inside the compiler to attempt to establish an else clause as a winner in the OR special case race 76 static inline bool __select_node_else_race( select_node & this ) with( this ) { 77 unsigned long int cmp_status = __SELECT_UNSAT; 78 return *clause_status == 0 79 && __atomic_compare_exchange_n( clause_status, &cmp_status, 1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ); 63 80 } 81 82 // when a primitive becomes available it calls the following routine on it's node to update the select state: 83 // return true if we want to unpark the thd 84 static inline bool __make_select_node_available( select_node & this ) with( this ) { 85 unsigned long int cmp_status = __SELECT_UNSAT; 86 87 if( !park_counter ) 88 return *clause_status == 0 89 && __atomic_compare_exchange_n( clause_status, &cmp_status, (unsigned long int)&this, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ); // OR specific case where race was won 90 91 return *clause_status == 0 92 && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) // can maybe just use atomic write 93 && !__atomic_add_fetch( park_counter, 1, __ATOMIC_SEQ_CST); 94 } 95 96 // Handles the special OR case of the waituntil statement 97 // Since only one select node can win in the OR case, we need to race to set the node available BEFORE 98 // performing the operation since if we lose the race the operation should not be performed as it will be lost 99 // Returns true if execution can continue normally and false if the queue has now been drained 100 static inline bool __handle_waituntil_OR( dlist( select_node ) & queue ) { 101 if ( queue`isEmpty ) return false; 102 if ( queue`first.clause_status && !queue`first.park_counter ) { 103 while ( !queue`isEmpty ) { 104 // if node not a special OR case or if we win the special OR case race break 105 if ( !queue`first.clause_status || queue`first.park_counter || __make_select_node_available( queue`first ) ) { return true; } 106 // otherwise we lost the special OR race so discard node 107 try_pop_front( queue ); 108 } 109 return false; 110 } 111 return true; 112 } 113 114 // wake one thread from the list 115 static inline void wake_one( dlist( select_node ) & queue, select_node & popped ) { 116 if ( !popped.clause_status // normal case, node is not a select node 117 || ( popped.clause_status && !popped.park_counter ) // If popped link is special case OR selecting unpark but don't call __make_select_node_available 118 || __make_select_node_available( popped ) ) // check if popped link belongs to a selecting thread 119 unpark( popped.blocked_thread ); 120 } 121 122 static inline void wake_one( dlist( select_node ) & queue ) { wake_one( queue, try_pop_front( queue ) ); } 123 124 static inline void setup_clause( select_node & this, unsigned long int * clause_status, int * park_counter ) { 125 this.blocked_thread = active_thread(); 126 this.clause_status = clause_status; 127 this.park_counter = park_counter; 128 } 129
Note: See TracChangeset
for help on using the changeset viewer.