Changeset 3483185
- Timestamp:
- Mar 2, 2025, 2:58:57 PM (7 months ago)
- Branches:
- master
- Children:
- 195f43d, 31df72b
- Parents:
- b8b64c34
- git-author:
- kyoung <lseo@…> (03/01/25 00:00:59)
- git-committer:
- kyoung <lseo@…> (03/02/25 14:58:57)
- Files:
-
- 2 added
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/Exception.hfa
rb8b64c34 r3483185 6 6 #define ExceptionArgs( name, args... ) &name ## _vt, args 7 7 #define ExceptionInst( name, args... ) (name){ ExceptionArgs( name, args ) } 8 #define ExceptionPtr( E ) (exception_t *) & E -
libcfa/src/concurrency/future.hfa
rb8b64c34 r3483185 9 9 // Author : Thierry Delisle & Peiran Hong & Colby Parsons 10 10 // Created On : Wed Jan 06 17:33:18 2021 11 // Last Modified By : Kyoung Seo12 // Last Modified On : Mon Jan 27 20:35:00202513 // Update Count : 311 // Last Modified By : Peter A. Buhr 12 // Last Modified On : Sun Mar 2 14:45:56 2025 13 // Update Count : 19 14 14 // 15 15 … … 26 26 // future_t is lockfree and uses atomics which aren't needed given we use locks here 27 27 forall( T ) { 28 28 enum { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 }; 29 29 30 30 struct future { 31 31 int state; 32 32 T result; 33 exception_t * except; 33 34 dlist( select_node ) waiters; 34 35 futex_mutex lock; 35 36 }; 36 37 38 39 40 41 37 __CFA_SELECT_GET_TYPE( future(T) ); 38 39 struct future_node { 40 inline select_node; 41 T * my_result; 42 }; 42 43 43 44 static inline { 44 45 45 46 47 48 49 50 46 void ?{}( future_node(T) & this, thread$ * blocked_thread, T * my_result ) { 47 ((select_node &)this){ blocked_thread }; 48 this.my_result = my_result; 49 } 50 51 void ?{}( future(T) & this ) { 51 52 this.waiters{}; 52 this.state = FUTURE_EMPTY; 53 this.lock{}; 53 this.except = 0p; 54 this.state = FUTURE_EMPTY; 55 this.lock{}; 56 } 57 58 void ^?{}( future(T) & this ) { 59 free( this.except ); 54 60 } 55 61 56 62 // Reset future back to original state 57 void reset( future(T) & this ) with(this) 58 { 59 lock( lock ); 60 if( ! waiters`isEmpty ) 61 abort("Attempting to reset a future with blocked waiters"); 62 state = FUTURE_EMPTY; 63 unlock( lock ); 64 } 63 void reset( future(T) & this ) with(this) { 64 lock( lock ); 65 if ( ! waiters`isEmpty ) 66 abort("Attempting to reset a future with blocked waiters"); 67 state = FUTURE_EMPTY; 68 free( except ); 69 except = 0p; 70 unlock( lock ); 71 } 65 72 66 73 // check if the future is available 67 74 // currently no mutual exclusion because I can't see when you need this call to be synchronous or protected 68 75 bool available( future(T) & this ) { return __atomic_load_n( &this.state, __ATOMIC_RELAXED ); } 69 76 70 77 71 // memcpy wrapper to help copy values 72 void copy_T( T & from, T & to ) { 73 memcpy((void *)&to, (void *)&from, sizeof(T)); 74 } 75 76 // internal helper to signal waiters off of the future 77 void _internal_flush( future(T) & this ) with(this) { 78 while( ! waiters`isEmpty ) { 79 if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case 80 break; // if handle_OR returns false then waiters is empty so break 81 select_node &s = try_pop_front( waiters ); 82 83 if ( s.clause_status == 0p ) // poke in result so that woken threads do not need to reacquire any locks 84 copy_T( result, *(((future_node(T) &)s).my_result) ); 85 86 wake_one( waiters, s ); 87 } 88 } 78 // memcpy wrapper to help copy values 79 void copy_T( T & from, T & to ) { 80 memcpy((void *)&to, (void *)&from, sizeof(T)); 81 } 82 83 bool fulfil$( future(T) & this ) with(this) { // helper 84 bool ret_val = ! waiters`isEmpty; 85 state = FUTURE_FULFILLED; 86 while ( ! waiters`isEmpty ) { 87 if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case 88 break; // if handle_OR returns false then waiters is empty so break 89 select_node &s = try_pop_front( waiters ); 90 91 if ( s.clause_status == 0p ) // poke in result so that woken threads do not need to reacquire any locks 92 copy_T( result, *(((future_node(T) &)s).my_result) ); 93 94 wake_one( waiters, s ); 95 } 96 unlock( lock ); 97 return ret_val; 98 } 89 99 90 100 // Fulfil the future, returns whether or not someone was unblocked 91 101 bool fulfil( future(T) & this, T val ) with(this) { 92 lock( lock ); 93 if( state != FUTURE_EMPTY ) 94 abort("Attempting to fulfil a future that has already been fulfilled"); 95 96 copy_T( val, result ); 97 98 bool ret_val = ! waiters`isEmpty; 99 state = FUTURE_FULFILLED; 100 _internal_flush( this ); 101 unlock( lock ); 102 return ret_val; 102 lock( lock ); 103 if ( state != FUTURE_EMPTY ) 104 abort("Attempting to fulfil a future that has already been fulfilled"); 105 106 copy_T( val, result ); 107 return fulfil$( this ); 108 } 109 110 bool ?()( future(T) & this, T val ) { // alternate interface 111 return fulfil( this, val ); 112 } 113 114 // Load an exception to the future, returns whether or not someone was unblocked 115 bool fulfil( future(T) & this, exception_t * ex ) with(this) { 116 lock( lock ); 117 if ( state != FUTURE_EMPTY ) 118 abort("Attempting to fulfil a future that has already been fulfilled"); 119 120 except = ( exception_t * ) malloc( ex->virtual_table->size ); 121 ex->virtual_table->copy( except, ex ); 122 return fulfil$( this ); 123 } 124 125 bool ?()( future(T) & this, exception_t * ex ) { // alternate interface 126 return fulfil( this, ex ); 103 127 } 104 128 … … 106 130 // Also return whether the thread had to block or not 107 131 [T, bool] get( future(T) & this ) with( this ) { 108 lock( lock ); 109 T ret_val; 110 if( state == FUTURE_FULFILLED ) { 111 copy_T( result, ret_val ); 112 unlock( lock ); 113 return [ret_val, false]; 114 } 115 116 future_node(T) node = { active_thread(), &ret_val }; 117 insert_last( waiters, ((select_node &)node) ); 118 unlock( lock ); 119 park( ); 132 void exceptCheck() { // helper 133 if ( except ) { 134 exception_t * ex = ( exception_t * ) alloca( except->virtual_table->size ); 135 except->virtual_table->copy( ex, except ); 136 unlock( lock ); 137 throwResume * ex; 138 } 139 } 140 141 lock( lock ); 142 T ret_val; 143 if ( state == FUTURE_FULFILLED ) { 144 exceptCheck(); 145 copy_T( result, ret_val ); 146 unlock( lock ); 147 return [ret_val, false]; 148 } 149 150 future_node(T) node = { active_thread(), &ret_val }; 151 insert_last( waiters, ((select_node &)node) ); 152 unlock( lock ); 153 park( ); 154 exceptCheck(); 120 155 121 156 return [ret_val, true]; … … 129 164 } 130 165 131 // Gets value if it is available and returns [ val, true ] 132 // otherwise returns [ default_val, false] 133 // will not block 134 [T, bool] try_get( future(T) & this ) with(this) { 135 lock( lock ); 136 T ret_val; 137 if( state == FUTURE_FULFILLED ) { 138 copy_T( result, ret_val ); 139 unlock( lock ); 140 return [ret_val, true]; 141 } 142 unlock( lock ); 143 144 return [ret_val, false]; 145 } 146 147 bool register_select( future(T) & this, select_node & s ) with(this) { 148 lock( lock ); 149 150 // check if we can complete operation. If so race to establish winner in special OR case 151 if ( !s.park_counter && state != FUTURE_EMPTY ) { 152 if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering 153 unlock( lock ); 154 return false; 155 } 156 } 157 158 // future not ready -> insert select node and return 159 if( state == FUTURE_EMPTY ) { 160 insert_last( waiters, s ); 161 unlock( lock ); 162 return false; 163 } 164 165 __make_select_node_available( s ); 166 unlock( lock ); 167 return true; 168 } 169 170 bool unregister_select( future(T) & this, select_node & s ) with(this) { 171 if ( ! s`isListed ) return false; 172 lock( lock ); 173 if ( s`isListed ) remove( s ); 174 unlock( lock ); 175 return false; 176 } 177 178 bool on_selected( future(T) &, select_node & ) { return true; } 166 T ?()( future(T) & this ) { // alternate interface 167 return get( this ); 168 } 169 170 // Gets value if it is available and returns [ val, true ] 171 // otherwise returns [ default_val, false] 172 // will not block 173 [T, bool] try_get( future(T) & this ) with(this) { 174 lock( lock ); 175 T ret_val; 176 if ( state == FUTURE_FULFILLED ) { 177 copy_T( result, ret_val ); 178 unlock( lock ); 179 return [ret_val, true]; 180 } 181 unlock( lock ); 182 183 return [ret_val, false]; 184 } 185 186 bool register_select( future(T) & this, select_node & s ) with(this) { 187 lock( lock ); 188 189 // check if we can complete operation. If so race to establish winner in special OR case 190 if ( !s.park_counter && state != FUTURE_EMPTY ) { 191 if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering 192 unlock( lock ); 193 return false; 194 } 195 } 196 197 // future not ready -> insert select node and return 198 if ( state == FUTURE_EMPTY ) { 199 insert_last( waiters, s ); 200 unlock( lock ); 201 return false; 202 } 203 204 __make_select_node_available( s ); 205 unlock( lock ); 206 return true; 207 } 208 209 bool unregister_select( future(T) & this, select_node & s ) with(this) { 210 if ( ! s`isListed ) return false; 211 lock( lock ); 212 if ( s`isListed ) remove( s ); 213 unlock( lock ); 214 return false; 215 } 216 217 bool on_selected( future(T) &, select_node & ) { return true; } 179 218 } 180 219 } … … 242 281 243 282 bool $first( multi_future(T) & mutex this ) { 244 if ( this.has_first) {283 if ( this.has_first ) { 245 284 wait( this.blocked ); 246 285 return false; … … 258 297 // Reset future back to original state 259 298 void reset(multi_future(T) & mutex this) { 260 if ( this.has_first != false) abort("Attempting to reset a multi_future with at least one blocked threads");261 if ( !is_empty(this.blocked) ) abort("Attempting to reset a multi_future with multiple blocked threads");299 if ( this.has_first != false ) abort("Attempting to reset a multi_future with at least one blocked threads"); 300 if ( !is_empty(this.blocked) ) abort("Attempting to reset a multi_future with multiple blocked threads"); 262 301 reset( (future_t&)*(future_t*)((uintptr_t)&this + sizeof(monitor$)) ); 263 302 }
Note:
See TracChangeset
for help on using the changeset viewer.