- File:
-
- 1 edited
-
libcfa/src/concurrency/future.hfa (modified) (5 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/future.hfa
r5e180c2 r8f1a99e 5 5 // file "LICENCE" distributed with Cforall. 6 6 // 7 // concurrency/future.hfa --7 // io/types.hfa -- 8 8 // 9 // Author : Thierry Delisle & Peiran Hong & Colby Parsons9 // Author : Thierry Delisle & Peiran Hong 10 10 // Created On : Wed Jan 06 17:33:18 2021 11 11 // Last Modified By : … … 14 14 // 15 15 16 //#pragma once16 #pragma once 17 17 18 18 #include "bits/locks.hfa" 19 19 #include "monitor.hfa" 20 #include "select.hfa"21 22 //----------------------------------------------------------------------------23 // future24 // I don't use future_t here since I need to use a lock for this future25 // since it supports multiple consumers26 // future_t is lockfree and uses atomics which aren't needed given we use locks here27 forall( T ) {28 // enum(int) { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 }; // Enums seem to be broken so feel free to add this back afterwards29 30 // temporary enum replacement31 const int FUTURE_EMPTY = 0;32 const int FUTURE_FULFILLED = 1;33 34 struct future {35 int state;36 T result;37 dlist( select_node ) waiters;38 futex_mutex lock;39 };40 41 struct future_node {42 inline select_node;43 T * my_result;44 };45 46 // C_TODO: perhaps allow exceptions to be inserted like uC++?47 48 static inline {49 50 void ?{}( future_node(T) & this, thread$ * blocked_thread, T * my_result ) {51 ((select_node &)this){ blocked_thread };52 this.my_result = my_result;53 }54 55 void ?{}(future(T) & this) {56 this.waiters{};57 this.state = FUTURE_EMPTY;58 this.lock{};59 }60 61 // Reset future back to original state62 void reset(future(T) & this) with(this)63 {64 lock( lock );65 if( ! waiters`isEmpty )66 abort("Attempting to reset a future with blocked waiters");67 state = FUTURE_EMPTY;68 unlock( lock );69 }70 71 // check if the future is available72 // currently no mutual exclusion because I can't see when you need this call to be synchronous or protected73 bool available( future(T) & this ) { return this.state; }74 75 76 // memcpy wrapper to help copy values77 void copy_T( T & from, T & to ) {78 memcpy((void *)&to, (void *)&from, sizeof(T));79 }80 81 // internal helper to signal waiters off of the future82 void _internal_flush( future(T) & this ) with(this) {83 while( ! waiters`isEmpty ) {84 select_node &s = try_pop_front( waiters );85 86 if ( s.race_flag == 0p )87 // poke in result so that woken threads do not need to reacquire any locks88 // *(((future_node(T) &)s).my_result) = result;89 copy_T( result, *(((future_node(T) &)s).my_result) );90 else if ( !install_select_winner( s, &this ) ) continue;91 92 // only unpark if future is not selected93 // or if it is selected we only unpark if we win the race94 unpark( s.blocked_thread );95 }96 }97 98 // Fulfil the future, returns whether or not someone was unblocked99 bool fulfil( future(T) & this, T & val ) with(this) {100 lock( lock );101 if( state != FUTURE_EMPTY )102 abort("Attempting to fulfil a future that has already been fulfilled");103 104 copy_T( val, result );105 106 bool ret_val = ! waiters`isEmpty;107 state = FUTURE_FULFILLED;108 _internal_flush( this );109 unlock( lock );110 return ret_val;111 }112 113 // Wait for the future to be fulfilled114 // Also return whether the thread had to block or not115 [T, bool] get( future(T) & this ) with( this ) {116 lock( lock );117 T ret_val;118 if( state == FUTURE_FULFILLED ) {119 copy_T( result, ret_val );120 unlock( lock );121 return [ret_val, false];122 }123 124 future_node(T) node = { active_thread(), &ret_val };125 insert_last( waiters, ((select_node &)node) );126 unlock( lock );127 park( );128 129 return [ret_val, true];130 }131 132 // Wait for the future to be fulfilled133 T get( future(T) & this ) {134 [T, bool] tt;135 tt = get(this);136 return tt.0;137 }138 139 // Gets value if it is available and returns [ val, true ]140 // otherwise returns [ default_val, false]141 // will not block142 [T, bool] try_get( future(T) & this ) with(this) {143 lock( lock );144 T ret_val;145 if( state == FUTURE_FULFILLED ) {146 copy_T( result, ret_val );147 unlock( lock );148 return [ret_val, true];149 }150 unlock( lock );151 152 return [ret_val, false];153 }154 155 void * register_select( future(T) & this, select_node & s ) with(this) {156 lock( lock );157 158 // future not ready -> insert select node and return 0p159 if( state == FUTURE_EMPTY ) {160 insert_last( waiters, s );161 unlock( lock );162 return 0p;163 }164 165 // future ready and we won race to install it as the select winner return 1p166 if ( install_select_winner( s, &this ) ) {167 unlock( lock );168 return 1p;169 }170 171 unlock( lock );172 // future ready and we lost race to install it as the select winner173 return 2p;174 }175 176 void unregister_select( future(T) & this, select_node & s ) with(this) {177 lock( lock );178 if ( s`isListed ) remove( s );179 unlock( lock );180 }181 182 }183 }184 185 //--------------------------------------------------------------------------------------------------------186 // These futures below do not support select statements so they may not be as useful as 'future'187 // however the 'single_future' is cheap and cheerful and is most likely more performant than 'future'188 // since it uses raw atomics and no locks afaik189 //190 // As far as 'multi_future' goes I can't see many use cases as it will be less performant than 'future'191 // since it is monitor based and also is not compatible with select statements192 //--------------------------------------------------------------------------------------------------------193 20 194 21 forall( T ) { 195 struct single_future {22 struct future { 196 23 inline future_t; 197 24 T result; … … 200 27 static inline { 201 28 // Reset future back to original state 202 void reset( single_future(T) & this) { reset( (future_t&)this ); }29 void reset(future(T) & this) { reset( (future_t&)this ); } 203 30 204 31 // check if the future is available 205 bool available( single_future(T) & this ) { return available( (future_t&)this ); }32 bool available( future(T) & this ) { return available( (future_t&)this ); } 206 33 207 34 // Mark the future as abandoned, meaning it will be deleted by the server 208 35 // This doesn't work beause of the potential need for a destructor 209 void abandon( single_future(T) & this );36 void abandon( future(T) & this ); 210 37 211 38 // Fulfil the future, returns whether or not someone was unblocked 212 thread$ * fulfil( single_future(T) & this, T result ) {39 thread$ * fulfil( future(T) & this, T result ) { 213 40 this.result = result; 214 41 return fulfil( (future_t&)this ); … … 217 44 // Wait for the future to be fulfilled 218 45 // Also return whether the thread had to block or not 219 [T, bool] wait( single_future(T) & this ) {46 [T, bool] wait( future(T) & this ) { 220 47 bool r = wait( (future_t&)this ); 221 48 return [this.result, r]; … … 223 50 224 51 // Wait for the future to be fulfilled 225 T wait( single_future(T) & this ) {52 T wait( future(T) & this ) { 226 53 [T, bool] tt; 227 54 tt = wait(this);
Note:
See TracChangeset
for help on using the changeset viewer.