Changeset 24d6572 for libcfa/src/concurrency/future.hfa
- Timestamp:
- Jun 12, 2023, 2:45:32 PM (2 years ago)
- Branches:
- ast-experimental, master
- Children:
- 62d62db
- Parents:
- 34b4268 (diff), 251ce80 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/future.hfa
r34b4268 r24d6572 5 5 // file "LICENCE" distributed with Cforall. 6 6 // 7 // io/types.hfa --8 // 9 // Author : Thierry Delisle & Peiran Hong 7 // concurrency/future.hfa -- 8 // 9 // Author : Thierry Delisle & Peiran Hong & Colby Parsons 10 10 // Created On : Wed Jan 06 17:33:18 2021 11 11 // Last Modified By : … … 18 18 #include "bits/locks.hfa" 19 19 #include "monitor.hfa" 20 20 #include "select.hfa" 21 #include "locks.hfa" 22 23 //---------------------------------------------------------------------------- 24 // future 25 // I don't use future_t here since I need to use a lock for this future 26 // since it supports multiple consumers 27 // future_t is lockfree and uses atomics which aren't needed given we use locks here 21 28 forall( T ) { 29 // enum { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 }; // Enums seem to be broken so feel free to add this back afterwards 30 31 // temporary enum replacement 32 const int FUTURE_EMPTY = 0; 33 const int FUTURE_FULFILLED = 1; 34 22 35 struct future { 36 int state; 37 T result; 38 dlist( select_node ) waiters; 39 futex_mutex lock; 40 }; 41 42 struct future_node { 43 inline select_node; 44 T * my_result; 45 }; 46 47 static inline { 48 49 void ?{}( future_node(T) & this, thread$ * blocked_thread, T * my_result ) { 50 ((select_node &)this){ blocked_thread }; 51 this.my_result = my_result; 52 } 53 54 void ?{}( future(T) & this ) { 55 this.waiters{}; 56 this.state = FUTURE_EMPTY; 57 this.lock{}; 58 } 59 60 // Reset future back to original state 61 void reset( future(T) & this ) with(this) 62 { 63 lock( lock ); 64 if( ! waiters`isEmpty ) 65 abort("Attempting to reset a future with blocked waiters"); 66 state = FUTURE_EMPTY; 67 unlock( lock ); 68 } 69 70 // check if the future is available 71 // currently no mutual exclusion because I can't see when you need this call to be synchronous or protected 72 bool available( future(T) & this ) { return __atomic_load_n( &this.state, __ATOMIC_RELAXED ); } 73 74 75 // memcpy wrapper to help copy values 76 void copy_T( T & from, T & to ) { 77 memcpy((void *)&to, (void *)&from, sizeof(T)); 78 } 79 80 // internal helper to signal waiters off of the future 81 void _internal_flush( future(T) & this ) with(this) { 82 while( ! waiters`isEmpty ) { 83 if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case 84 break; // if handle_OR returns false then waiters is empty so break 85 select_node &s = try_pop_front( waiters ); 86 87 if ( s.clause_status == 0p ) // poke in result so that woken threads do not need to reacquire any locks 88 copy_T( result, *(((future_node(T) &)s).my_result) ); 89 90 wake_one( waiters, s ); 91 } 92 } 93 94 // Fulfil the future, returns whether or not someone was unblocked 95 bool fulfil( future(T) & this, T val ) with(this) { 96 lock( lock ); 97 if( state != FUTURE_EMPTY ) 98 abort("Attempting to fulfil a future that has already been fulfilled"); 99 100 copy_T( val, result ); 101 102 bool ret_val = ! waiters`isEmpty; 103 state = FUTURE_FULFILLED; 104 _internal_flush( this ); 105 unlock( lock ); 106 return ret_val; 107 } 108 109 // Wait for the future to be fulfilled 110 // Also return whether the thread had to block or not 111 [T, bool] get( future(T) & this ) with( this ) { 112 lock( lock ); 113 T ret_val; 114 if( state == FUTURE_FULFILLED ) { 115 copy_T( result, ret_val ); 116 unlock( lock ); 117 return [ret_val, false]; 118 } 119 120 future_node(T) node = { active_thread(), &ret_val }; 121 insert_last( waiters, ((select_node &)node) ); 122 unlock( lock ); 123 park( ); 124 125 return [ret_val, true]; 126 } 127 128 // Wait for the future to be fulfilled 129 T get( future(T) & this ) { 130 [T, bool] tt; 131 tt = get(this); 132 return tt.0; 133 } 134 135 // Gets value if it is available and returns [ val, true ] 136 // otherwise returns [ default_val, false] 137 // will not block 138 [T, bool] try_get( future(T) & this ) with(this) { 139 lock( lock ); 140 T ret_val; 141 if( state == FUTURE_FULFILLED ) { 142 copy_T( result, ret_val ); 143 unlock( lock ); 144 return [ret_val, true]; 145 } 146 unlock( lock ); 147 148 return [ret_val, false]; 149 } 150 151 bool register_select( future(T) & this, select_node & s ) with(this) { 152 lock( lock ); 153 154 // check if we can complete operation. If so race to establish winner in special OR case 155 if ( !s.park_counter && state != FUTURE_EMPTY ) { 156 if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering 157 unlock( lock ); 158 return false; 159 } 160 } 161 162 // future not ready -> insert select node and return 163 if( state == FUTURE_EMPTY ) { 164 insert_last( waiters, s ); 165 unlock( lock ); 166 return false; 167 } 168 169 __make_select_node_available( s ); 170 unlock( lock ); 171 return true; 172 } 173 174 bool unregister_select( future(T) & this, select_node & s ) with(this) { 175 if ( ! s`isListed ) return false; 176 lock( lock ); 177 if ( s`isListed ) remove( s ); 178 unlock( lock ); 179 return false; 180 } 181 182 void on_selected( future(T) & this, select_node & node ) {} 183 } 184 } 185 186 //-------------------------------------------------------------------------------------------------------- 187 // These futures below do not support select statements so they may not have as many features as 'future' 188 // however the 'single_future' is cheap and cheerful and is most likely more performant than 'future' 189 // since it uses raw atomics and no locks 190 // 191 // As far as 'multi_future' goes I can't see many use cases as it will be less performant than 'future' 192 // since it is monitor based and also is not compatible with select statements 193 //-------------------------------------------------------------------------------------------------------- 194 195 forall( T ) { 196 struct single_future { 23 197 inline future_t; 24 198 T result; … … 27 201 static inline { 28 202 // Reset future back to original state 29 void reset( future(T) & this) { reset( (future_t&)this ); }203 void reset(single_future(T) & this) { reset( (future_t&)this ); } 30 204 31 205 // check if the future is available 32 bool available( future(T) & this ) { return available( (future_t&)this ); }206 bool available( single_future(T) & this ) { return available( (future_t&)this ); } 33 207 34 208 // Mark the future as abandoned, meaning it will be deleted by the server 35 209 // This doesn't work beause of the potential need for a destructor 36 void abandon( future(T) & this );210 void abandon( single_future(T) & this ); 37 211 38 212 // Fulfil the future, returns whether or not someone was unblocked 39 thread$ * fulfil( future(T) & this, T result ) {213 thread$ * fulfil( single_future(T) & this, T result ) { 40 214 this.result = result; 41 215 return fulfil( (future_t&)this ); … … 44 218 // Wait for the future to be fulfilled 45 219 // Also return whether the thread had to block or not 46 [T, bool] wait( future(T) & this ) {220 [T, bool] wait( single_future(T) & this ) { 47 221 bool r = wait( (future_t&)this ); 48 222 return [this.result, r]; … … 50 224 51 225 // Wait for the future to be fulfilled 52 T wait( future(T) & this ) {226 T wait( single_future(T) & this ) { 53 227 [T, bool] tt; 54 228 tt = wait(this);
Note:
See TracChangeset
for help on using the changeset viewer.