- File:
-
- 1 edited
-
libcfa/src/concurrency/select.hfa (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/select.hfa
rbeeff61e r5e4a830 2 2 3 3 #include "containers/list.hfa" 4 #include "stdint.h" 5 #include "kernel.hfa" 4 #include <stdint.h> 5 #include <kernel.hfa> 6 #include <locks.hfa> 6 7 7 struct select_node;8 9 // node status10 static const unsigned long int __SELECT_UNSAT = 0;11 static const unsigned long int __SELECT_SAT = 1;12 static const unsigned long int __SELECT_RUN = 2;13 14 static inline bool __CFA_has_clause_run( unsigned long int status ) { return status == __SELECT_RUN; }15 static inline void __CFA_maybe_park( int * park_counter ) {16 if ( __atomic_sub_fetch( park_counter, 1, __ATOMIC_SEQ_CST) < 0 )17 park();18 }19 20 // node used for coordinating waituntil synchronization21 8 struct select_node { 22 int * park_counter; // If this is 0p then the node is in a special OR case waituntil23 unsigned long int * clause_status; // needs to point at ptr sized location, if this is 0p then node is not part of a waituntil24 25 void * extra; // used to store arbitrary data needed by some primitives26 27 9 thread$ * blocked_thread; 10 void ** race_flag; 28 11 inline dlink(select_node); 29 12 }; 30 13 P9_EMBEDDED( select_node, dlink(select_node) ) 31 14 32 static inline void ?{}( select_node & this ) { 33 this.blocked_thread = active_thread(); 34 this.clause_status = 0p; 35 this.park_counter = 0p; 36 this.extra = 0p; 15 void ?{}( select_node & this ) { 16 this.blocked_thread = 0p; 17 this.race_flag = 0p; 37 18 } 38 19 39 static inlinevoid ?{}( select_node & this, thread$ * blocked_thread ) {20 void ?{}( select_node & this, thread$ * blocked_thread ) { 40 21 this.blocked_thread = blocked_thread; 41 this.clause_status = 0p; 42 this.park_counter = 0p; 43 this.extra = 0p; 22 this.race_flag = 0p; 44 23 } 45 24 46 static inline void ?{}( select_node & this, thread$ * blocked_thread, void * extra) {25 void ?{}( select_node & this, thread$ * blocked_thread, void ** race_flag ) { 47 26 this.blocked_thread = blocked_thread; 48 this.clause_status = 0p; 49 this.park_counter = 0p; 50 this.extra = extra; 27 this.race_flag = race_flag; 51 28 } 52 29 53 static inlinevoid ^?{}( select_node & this ) {}30 void ^?{}( select_node & this ) {} 54 31 55 static inline unsigned long int * __get_clause_status( select_node & s ) { return s.clause_status; }56 32 57 33 //----------------------------------------------------------------------------- 58 34 // is_selectable 59 forall(T & | sized(T)) 60 trait is_selectable { 61 // For registering a select stmt on a selectable concurrency primitive 62 // Returns bool that indicates if operation is already SAT 63 bool register_select( T &, select_node & ); 35 trait is_selectable(T & | sized(T)) { 36 // For registering a select on a selectable concurrency primitive 37 // return 0p if primitive not accessible yet 38 // return 1p if primitive gets acquired 39 // return 2p if primitive is accessible but some other primitive won the race 40 // C_TODO: add enum for return values 41 void * register_select( T &, select_node & ); 64 42 65 // For unregistering a select stmt on a selectable concurrency primitive 66 // If true is returned then the corresponding code block is run (only in non-special OR case and only if node status is not RUN) 67 bool unregister_select( T &, select_node & ); 68 69 // This routine is run on the selecting thread prior to executing the statement corresponding to the select_node 70 // passed as an arg to this routine 71 // If on_selected returns false, the statement is not run, if it returns true it is run. 72 bool on_selected( T &, select_node & ); 43 void unregister_select( T &, select_node & ); 73 44 }; 74 45 75 // this is used inside the compiler to attempt to establish an else clause as a winner in the OR special case race 76 static inline bool __select_node_else_race( select_node & this ) with( this ) { 77 unsigned long int cmp_status = __SELECT_UNSAT; 78 return *clause_status == 0 79 && __atomic_compare_exchange_n( clause_status, &cmp_status, 1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ); 46 static inline bool install_select_winner( select_node & this, void * primitive_ptr ) with(this) { 47 // temporary needed for atomic instruction 48 void * cmp_flag = 0p; 49 50 // if we dont win the selector race we need to potentially 51 // ignore this node and move to the next one so we return accordingly 52 if ( *race_flag != 0p || 53 !__atomic_compare_exchange_n( 54 race_flag, 55 &cmp_flag, 56 primitive_ptr, 57 false, 58 __ATOMIC_SEQ_CST, 59 __ATOMIC_SEQ_CST 60 ) 61 ) return false; // lost race and some other node triggered select 62 return true; // won race so this node is what the select proceeds with 80 63 } 81 82 // when a primitive becomes available it calls the following routine on it's node to update the select state:83 // return true if we want to unpark the thd84 static inline bool __make_select_node_available( select_node & this ) with( this ) {85 unsigned long int cmp_status = __SELECT_UNSAT;86 87 if( !park_counter )88 return *clause_status == 089 && __atomic_compare_exchange_n( clause_status, &cmp_status, (unsigned long int)&this, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ); // OR specific case where race was won90 91 return *clause_status == 092 && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) // can maybe just use atomic write93 && !__atomic_add_fetch( park_counter, 1, __ATOMIC_SEQ_CST);94 }95 96 // Handles the special OR case of the waituntil statement97 // Since only one select node can win in the OR case, we need to race to set the node available BEFORE98 // performing the operation since if we lose the race the operation should not be performed as it will be lost99 // Returns true if execution can continue normally and false if the queue has now been drained100 static inline bool __handle_waituntil_OR( dlist( select_node ) & queue ) {101 if ( queue`isEmpty ) return false;102 if ( queue`first.clause_status && !queue`first.park_counter ) {103 while ( !queue`isEmpty ) {104 // if node not a special OR case or if we win the special OR case race break105 if ( !queue`first.clause_status || queue`first.park_counter || __make_select_node_available( queue`first ) ) { return true; }106 // otherwise we lost the special OR race so discard node107 try_pop_front( queue );108 }109 return false;110 }111 return true;112 }113 114 // wake one thread from the list115 static inline void wake_one( dlist( select_node ) & queue, select_node & popped ) {116 if ( !popped.clause_status // normal case, node is not a select node117 || ( popped.clause_status && !popped.park_counter ) // If popped link is special case OR selecting unpark but don't call __make_select_node_available118 || __make_select_node_available( popped ) ) // check if popped link belongs to a selecting thread119 unpark( popped.blocked_thread );120 }121 122 static inline void wake_one( dlist( select_node ) & queue ) { wake_one( queue, try_pop_front( queue ) ); }123 124 static inline void setup_clause( select_node & this, unsigned long int * clause_status, int * park_counter ) {125 this.blocked_thread = active_thread();126 this.clause_status = clause_status;127 this.park_counter = park_counter;128 }129
Note:
See TracChangeset
for help on using the changeset viewer.