Changeset 339e30a for libcfa/src/concurrency
- Timestamp:
- Jan 6, 2023, 1:57:36 PM (23 months ago)
- Branches:
- ADT, ast-experimental, master
- Children:
- 7eac70e
- Parents:
- d99a716
- Location:
- libcfa/src/concurrency
- Files:
-
- 1 added
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/future.hfa
rd99a716 r339e30a 5 5 // file "LICENCE" distributed with Cforall. 6 6 // 7 // io/types.hfa --8 // 9 // Author : Thierry Delisle & Peiran Hong 7 // concurrency/future.hfa -- 8 // 9 // Author : Thierry Delisle & Peiran Hong & Colby Parsons 10 10 // Created On : Wed Jan 06 17:33:18 2021 11 11 // Last Modified By : … … 14 14 // 15 15 16 #pragma once16 // #pragma once 17 17 18 18 #include "bits/locks.hfa" 19 19 #include "monitor.hfa" 20 20 #include "select.hfa" 21 22 //---------------------------------------------------------------------------- 23 // future 24 // I don't use future_t here since I need to use a lock for this future 25 // since it supports multiple consumers 26 // future_t is lockfree and uses atomics which aren't needed given we use locks here 21 27 forall( T ) { 28 // enum(int) { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 }; // Enums seem to be broken so feel free to add this back afterwards 29 30 // temporary enum replacement 31 const int FUTURE_EMPTY = 0; 32 const int FUTURE_FULFILLED = 1; 33 22 34 struct future { 35 int state; 36 T result; 37 dlist( select_node ) waiters; 38 futex_mutex lock; 39 }; 40 41 struct future_node { 42 inline select_node; 43 T * my_result; 44 }; 45 46 // C_TODO: perhaps allow exceptions to be inserted like uC++? 47 48 static inline { 49 50 void ?{}( future_node(T) & this, thread$ * blocked_thread, T * my_result ) { 51 ((select_node &)this){ blocked_thread }; 52 this.my_result = my_result; 53 } 54 55 void ?{}(future(T) & this) { 56 this.waiters{}; 57 this.state = FUTURE_EMPTY; 58 } 59 60 // Reset future back to original state 61 void reset(future(T) & this) with(this) 62 { 63 lock( lock ); 64 if( ! waiters`isEmpty ) 65 abort("Attempting to reset a future with blocked waiters"); 66 state = FUTURE_EMPTY; 67 unlock( lock ); 68 } 69 70 // check if the future is available 71 // currently no mutual exclusion because I can't see when you need this call to be synchronous or protected 72 bool available( future(T) & this ) { return this.state; } 73 74 75 // memcpy wrapper to help copy values 76 void copy_T( T & from, T & to ) { 77 memcpy((void *)&to, (void *)&from, sizeof(T)); 78 } 79 80 // internal helper to signal waiters off of the future 81 void _internal_flush( future(T) & this ) with(this) { 82 while( ! waiters`isEmpty ) { 83 select_node &s = try_pop_front( waiters ); 84 85 if ( s.race_flag == 0p ) 86 // poke in result so that woken threads do not need to reacquire any locks 87 // *(((future_node(T) &)s).my_result) = result; 88 copy_T( result, *(((future_node(T) &)s).my_result) ); 89 else if ( !install_select_winner( s, &this ) ) continue; 90 91 // only unpark if future is not selected 92 // or if it is selected we only unpark if we win the race 93 unpark( s.blocked_thread ); 94 } 95 } 96 97 // Fulfil the future, returns whether or not someone was unblocked 98 bool fulfil( future(T) & this, T & val ) with(this) { 99 lock( lock ); 100 if( state != FUTURE_EMPTY ) 101 abort("Attempting to fulfil a future that has already been fulfilled"); 102 103 copy_T( val, result ); 104 105 bool ret_val = ! waiters`isEmpty; 106 state = FUTURE_FULFILLED; 107 _internal_flush( this ); 108 unlock( lock ); 109 return ret_val; 110 } 111 112 // Wait for the future to be fulfilled 113 // Also return whether the thread had to block or not 114 [T, bool] get( future(T) & this ) with( this ) { 115 lock( lock ); 116 T ret_val; 117 if( state == FUTURE_FULFILLED ) { 118 copy_T( result, ret_val ); 119 unlock( lock ); 120 return [ret_val, false]; 121 } 122 123 future_node(T) node = { active_thread(), &ret_val }; 124 insert_last( waiters, ((select_node &)node) ); 125 unlock( lock ); 126 park( ); 127 128 return [ret_val, true]; 129 } 130 131 // Wait for the future to be fulfilled 132 T get( future(T) & this ) { 133 [T, bool] tt; 134 tt = get(this); 135 return tt.0; 136 } 137 138 // Gets value if it is available and returns [ val, true ] 139 // otherwise returns [ default_val, false] 140 // will not block 141 [T, bool] try_get( future(T) & this ) with(this) { 142 lock( lock ); 143 T ret_val; 144 if( state == FUTURE_FULFILLED ) { 145 copy_T( result, ret_val ); 146 unlock( lock ); 147 return [ret_val, true]; 148 } 149 unlock( lock ); 150 // cast to (T *) needed to trick the resolver to let me return *0p 151 return [ret_val, false]; 152 } 153 154 void * register_select( future(T) & this, select_node & s ) with(this) { 155 lock( lock ); 156 157 // future not ready -> insert select node and return 0p 158 if( state == FUTURE_EMPTY ) { 159 insert_last( waiters, s ); 160 unlock( lock ); 161 return 0p; 162 } 163 164 // future ready and we won race to install it as the select winner return 1p 165 if ( install_select_winner( s, &this ) ) { 166 unlock( lock ); 167 return 1p; 168 } 169 170 unlock( lock ); 171 // future ready and we lost race to install it as the select winner 172 return 2p; 173 } 174 175 void unregister_select( future(T) & this, select_node & s ) with(this) { 176 lock( lock ); 177 if ( s`isListed ) remove( s ); 178 unlock( lock ); 179 } 180 181 } 182 } 183 184 //-------------------------------------------------------------------------------------------------------- 185 // These futures below do not support select statements so they may not be as useful as 'future' 186 // however the 'single_future' is cheap and cheerful and is most likely more performant than 'future' 187 // since it uses raw atomics and no locks afaik 188 // 189 // As far as 'multi_future' goes I can't see many use cases as it will be less performant than 'future' 190 // since it is monitor based and also is not compatible with select statements 191 //-------------------------------------------------------------------------------------------------------- 192 193 forall( T ) { 194 struct single_future { 23 195 inline future_t; 24 196 T result; … … 27 199 static inline { 28 200 // Reset future back to original state 29 void reset( future(T) & this) { reset( (future_t&)this ); }201 void reset(single_future(T) & this) { reset( (future_t&)this ); } 30 202 31 203 // check if the future is available 32 bool available( future(T) & this ) { return available( (future_t&)this ); }204 bool available( single_future(T) & this ) { return available( (future_t&)this ); } 33 205 34 206 // Mark the future as abandoned, meaning it will be deleted by the server 35 207 // This doesn't work beause of the potential need for a destructor 36 void abandon( future(T) & this );208 void abandon( single_future(T) & this ); 37 209 38 210 // Fulfil the future, returns whether or not someone was unblocked 39 thread$ * fulfil( future(T) & this, T result ) {211 thread$ * fulfil( single_future(T) & this, T result ) { 40 212 this.result = result; 41 213 return fulfil( (future_t&)this ); … … 44 216 // Wait for the future to be fulfilled 45 217 // Also return whether the thread had to block or not 46 [T, bool] wait( future(T) & this ) {218 [T, bool] wait( single_future(T) & this ) { 47 219 bool r = wait( (future_t&)this ); 48 220 return [this.result, r]; … … 50 222 51 223 // Wait for the future to be fulfilled 52 T wait( future(T) & this ) {224 T wait( single_future(T) & this ) { 53 225 [T, bool] tt; 54 226 tt = wait(this);
Note: See TracChangeset
for help on using the changeset viewer.