Ignore:
Timestamp:
Dec 8, 2025, 11:29:33 AM (2 months ago)
Author:
Michael Brooks <mlbrooks@…>
Branches:
master
Children:
79ba50c
Parents:
8f448e0 (diff), 79ec8c3 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge remote-tracking branch 'refs/remotes/origin/master'

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/future.hfa

    r8f448e0 r5e0b6657  
    77// concurrency/future.hfa --
    88//
    9 // Author           : Thierry Delisle & Peiran Hong & Colby Parsons
     9// Author           : Thierry Delisle & Peiran Hong & Colby Parsons & Peter Buhr
    1010// Created On       : Wed Jan 06 17:33:18 2021
    1111// Last Modified By : Peter A. Buhr
    12 // Last Modified On : Wed Apr 23 22:41:10 2025
    13 // Update Count     : 22
     12// Last Modified On : Mon Nov 24 16:08:52 2025
     13// Update Count     : 222
    1414//
    1515
     
    2121#include "locks.hfa"
    2222
    23 //----------------------------------------------------------------------------
    24 // future
    25 // I don't use future_t here as I need to use a lock for this future since it supports multiple consumers.
    26 // future_t is lockfree and uses atomics which aren't needed given we use locks here
     23//--------------------------------------------------------------------------------------------------------
     24// future does not use future_t as it needs a lock to support multiple consumers.  future_t is lockfree
     25// and uses atomics which are not needed.
     26//--------------------------------------------------------------------------------------------------------
     27
    2728forall( T ) {
    28         enum { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 };
     29        // PRIVATE
     30
     31        struct future_node$ {
     32                inline select_node;
     33                T * my_result;
     34        };
     35
     36        static inline {
     37                // memcpy wrapper to help copy values
     38                void copy_T$( T & to, T & from ) { memcpy( (void *)&to, (void *)&from, sizeof(T) ); }
     39        } // distribution
     40
     41        enum { FUTURE_EMPTY$ = 0, FUTURE_FULFILLED$ = 1 };
     42
     43        // PUBLIC
    2944
    3045        struct future {
     
    3247                T result;
    3348                exception_t * except;
     49                futex_mutex lock;
    3450                dlist( select_node ) waiters;
    35                 futex_mutex lock;
    3651        };
    37         __CFA_SELECT_GET_TYPE( future(T) );
    38 
    39         struct future_node {
    40                 inline select_node;
    41                 T * my_result;
    42         };
    43 
    44         static inline {
    45 
    46                 void ?{}( future_node(T) & this, thread$ * blocked_thread, T * my_result ) {
    47                         ((select_node &)this){ blocked_thread };
    48                         this.my_result = my_result;
    49                 }
    50 
    51                 void ?{}( future(T) & this ) {
    52                         this.waiters{};
    53                         this.except = 0p;
    54                         this.state = FUTURE_EMPTY;
    55                         this.lock{};
    56                 }
    57 
    58                 void ^?{}( future(T) & this ) {
    59                         free( this.except );
    60                 }
    61 
    62                 // Reset future back to original state
    63                 void reset( future(T) & this ) with(this) {
    64                         lock( lock );
    65                         if ( ! isEmpty( waiters ) )
    66                                 abort("Attempting to reset a future with blocked waiters");
    67                         state = FUTURE_EMPTY;
     52        __CFA_SELECT_GET_TYPE( future(T) );                                     // magic
     53
     54        static inline {
     55                // PRIVATE
     56
     57                bool register_select$( future(T) & fut, select_node & s ) with( fut ) { // for waituntil statement
     58                        lock( lock );
     59
     60                        // check if we can complete operation. If so race to establish winner in special OR case
     61                        if ( !s.park_counter && state != FUTURE_EMPTY$ ) {
     62                                if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering
     63                                        unlock( lock );
     64                                        return false;
     65                                }
     66                        }
     67
     68                        // future not ready -> insert select node and return
     69                  if ( state == FUTURE_EMPTY$ ) {
     70                                insert_last( waiters, s );
     71                                unlock( lock );
     72                                return false;
     73                        }
     74
     75                        __make_select_node_available( s );
     76                        unlock( lock );
     77                        return true;
     78                }
     79
     80                bool unregister_select$( future(T) & fut, select_node & s ) with( fut ) { // for waituntil statement
     81                  if ( ! isListed( s ) ) return false;
     82                        lock( lock );
     83                        if ( isListed( s ) ) remove( s );
     84                        unlock( lock );
     85                        return false;
     86                }
     87
     88                bool on_selected$( future(T) &, select_node & ) { return true; } // for waituntil statement
     89
     90                // PUBLIC
     91
     92                // General
     93
     94                void ?{}( future_node$(T) & fut, thread$ * blocked_thread, T * my_result ) {
     95                        ((select_node &)fut){ blocked_thread };
     96                        fut.my_result = my_result;
     97                }
     98
     99                void ?{}( future(T) & fut ) with( fut ) {
     100                        except = 0p;
     101                        state = FUTURE_EMPTY$;
     102                }
     103
     104                void ^?{}( future(T) & fut ) with( fut ) {
    68105                        free( except );
    69                         except = 0p;
    70                         unlock( lock );
    71                 }
    72 
    73                 // check if the future is available
    74                 // currently no mutual exclusion because I can't see when you need this call to be synchronous or protected
    75                 bool available( future(T) & this ) { return __atomic_load_n( &this.state, __ATOMIC_RELAXED ); }
    76 
    77 
    78                 // memcpy wrapper to help copy values
    79                 void copy_T( T & from, T & to ) {
    80                         memcpy((void *)&to, (void *)&from, sizeof(T));
    81                 }
    82 
    83                 bool fulfil$( future(T) & this ) with(this) {   // helper
    84                         bool ret_val = ! isEmpty( waiters );
    85                         state = FUTURE_FULFILLED;
    86                         while ( ! isEmpty( waiters ) ) {
    87                                 if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case
    88                                         break; // if handle_OR returns false then waiters is empty so break
    89                                 select_node &s = remove_first( waiters );
    90 
    91                                 if ( s.clause_status == 0p )                    // poke in result so that woken threads do not need to reacquire any locks
    92                                         copy_T( result, *(((future_node(T) &)s).my_result) );
    93 
    94                                 wake_one( waiters, s );
    95                         }
    96                         unlock( lock );
    97                         return ret_val;
    98                 }
    99 
    100                 // Fulfil the future, returns whether or not someone was unblocked
    101                 bool fulfil( future(T) & this, T val ) with(this) {
    102                         lock( lock );
    103                         if ( state != FUTURE_EMPTY )
    104                                 abort("Attempting to fulfil a future that has already been fulfilled");
    105 
    106                         copy_T( val, result );
    107                         return fulfil$( this );
    108                 }
    109 
    110                 bool ?()( future(T) & this, T val ) {                   // alternate interface
    111                         return fulfil( this, val );
    112                 }
    113 
    114                 // Load an exception to the future, returns whether or not someone was unblocked
    115                 bool fulfil( future(T) & this, exception_t * ex ) with(this) {
    116                         lock( lock );
    117                         if ( state != FUTURE_EMPTY )
    118                                 abort("Attempting to fulfil a future that has already been fulfilled");
    119 
    120                         except = ( exception_t * ) malloc( ex->virtual_table->size );
    121                         ex->virtual_table->copy( except, ex );
    122                         return fulfil$( this );
    123                 }
    124 
    125                 bool ?()( future(T) & this, exception_t * ex ) { // alternate interface
    126                         return fulfil( this, ex );
    127                 }
    128 
    129                 // Wait for the future to be fulfilled
    130                 // Also return whether the thread had to block or not
    131                 [T, bool] get( future(T) & this ) with( this ) {
     106                }
     107
     108                // Used by Client
     109
     110                // PRIVATE
     111
     112                // Return a value/exception from the future.
     113                T get$( future(T) & fut ) with( fut ) {                 // helper
    132114                        void exceptCheck() {                                            // helper
    133115                                if ( except ) {
     
    138120                                }
    139121                        }
    140 
    141                         lock( lock );
    142122                        T ret_val;
    143                         if ( state == FUTURE_FULFILLED ) {
     123
     124                        // LOCK ACQUIRED IN PUBLIC get
     125                        if ( state == FUTURE_FULFILLED$ ) {
    144126                                exceptCheck();
    145                                 copy_T( result, ret_val );
     127                                copy_T$( ret_val, result );
    146128                                unlock( lock );
    147                                 return [ret_val, false];
    148                         }
    149 
    150                         future_node(T) node = { active_thread(), &ret_val };
     129                                return ret_val;
     130                        }
     131
     132                        future_node$(T) node = { active_thread(), &ret_val };
    151133                        insert_last( waiters, ((select_node &)node) );
    152134                        unlock( lock );
    153135                        park( );
    154136                        exceptCheck();
    155 
    156                         return [ret_val, true];
    157                 }
    158 
    159                 // Wait for the future to be fulfilled
    160                 T get( future(T) & this ) {
    161                         [T, bool] tt;
    162                         tt = get(this);
    163                         return tt.0;
    164                 }
    165 
    166                 T ?()( future(T) & this ) {                                             // alternate interface
    167                         return get( this );
    168                 }
    169 
    170                 // Gets value if it is available and returns [ val, true ]
    171                 // otherwise returns [ default_val, false]
    172                 // will not block
    173                 [T, bool] try_get( future(T) & this ) with(this) {
     137                        return ret_val;
     138                }
     139
     140                // PUBLIC
     141
     142                bool available( future( T ) & fut ) { return __atomic_load_n( &fut.state, __ATOMIC_RELAXED ); } // future result available ?
     143
     144                // Return a value/exception from the future.
     145                [T, bool] get( future(T) & fut ) with( fut ) {
     146                        lock( lock );
     147                        bool ret = state == FUTURE_EMPTY$;
     148                        return [ get$( fut ), ret ];
     149                }
     150
     151                T get( future(T) & fut ) with( fut ) {
     152                        lock( lock );
     153                        return get$( fut );
     154                }
     155                T ?()( future(T) & fut ) { return get( fut ); } // alternate interface
     156
     157                // Non-blocking get: true => return defined value, false => value return undefined.
     158                [T, bool] try_get( future(T) & fut ) with( fut ) {
    174159                        lock( lock );
    175160                        T ret_val;
    176                         if ( state == FUTURE_FULFILLED ) {
    177                                 copy_T( result, ret_val );
     161                        if ( state == FUTURE_FULFILLED$ ) {
     162                                copy_T$( ret_val, result );
    178163                                unlock( lock );
    179164                                return [ret_val, true];
    180165                        }
    181166                        unlock( lock );
    182 
    183167                        return [ret_val, false];
    184168                }
    185169
    186                 bool register_select( future(T) & this, select_node & s ) with(this) {
    187                         lock( lock );
    188 
    189                         // check if we can complete operation. If so race to establish winner in special OR case
    190                         if ( !s.park_counter && state != FUTURE_EMPTY ) {
    191                                 if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering
    192                                         unlock( lock );
    193                                         return false;
    194                                 }
    195                         }
    196 
    197                         // future not ready -> insert select node and return
    198                         if ( state == FUTURE_EMPTY ) {
    199                                 insert_last( waiters, s );
    200                                 unlock( lock );
    201                                 return false;
    202                         }
    203 
    204                         __make_select_node_available( s );
    205                         unlock( lock );
    206                         return true;
    207                 }
    208 
    209                 bool unregister_select( future(T) & this, select_node & s ) with(this) {
    210                         if ( ! isListed( s ) ) return false;
    211                         lock( lock );
    212                         if ( isListed( s ) ) remove( s );
    213                         unlock( lock );
    214                         return false;
    215                 }
    216 
    217                 bool on_selected( future(T) &, select_node & ) { return true; }
    218         }
    219 }
    220 
    221 //--------------------------------------------------------------------------------------------------------
    222 // These futures below do not support select statements so they may not have as many features as 'future'
    223 //  however the 'single_future' is cheap and cheerful and is most likely more performant than 'future'
    224 //  since it uses raw atomics and no locks
    225 //
    226 // As far as 'multi_future' goes I can't see many use cases as it will be less performant than 'future'
    227 //  since it is monitor based and also is not compatible with select statements
     170                // Used by Server
     171
     172                // PRIVATE
     173
     174                bool fulfil$( future(T) & fut ) with( fut ) {   // helper
     175                        bool ret_val = ! isEmpty( waiters );
     176                        state = FUTURE_FULFILLED$;
     177                        while ( ! isEmpty( waiters ) ) {
     178                                if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case
     179                                        break; // if handle_OR returns false then waiters is empty so break
     180                                select_node &s = remove_first( waiters );
     181
     182                                if ( s.clause_status == 0p )                    // poke in result so that woken threads do not need to reacquire any locks
     183                                        copy_T$( *(((future_node$(T) &)s).my_result), result );
     184
     185                                wake_one( waiters, s );
     186                        }
     187                        unlock( lock );
     188                        return ret_val;
     189                }
     190
     191                // PUBLIC
     192
     193                // Load a value/exception into the future, returns whether or not waiting threads.
     194                bool fulfil( future(T) & fut, T val ) with( fut ) {
     195                        lock( lock );
     196                  if ( state != FUTURE_EMPTY$ ) abort("Attempting to fulfil a future that has already been fulfilled");
     197                        copy_T$( result, val );
     198                        return fulfil$( fut );
     199                }
     200                bool ?()( future(T) & fut, T val ) { return fulfil( fut, val ); } // alternate interface
     201
     202                bool fulfil( future(T) & fut, exception_t * ex ) with( fut ) {
     203                        lock( lock );
     204                  if ( state != FUTURE_EMPTY$ ) abort( "Attempting to fulfil a future that has already been fulfilled" );
     205                        except = ( exception_t * ) malloc( ex->virtual_table->size );
     206                        ex->virtual_table->copy( except, ex );
     207                        return fulfil$( fut );
     208                }
     209                bool ?()( future(T) & fut, exception_t * ex ) { return fulfil( fut, ex ); } // alternate interface
     210
     211                void reset( future(T) & fut ) with( fut ) {             // mark future as empty (for reuse)
     212                        lock( lock );
     213                  if ( ! isEmpty( waiters ) ) abort( "Attempting to reset a future with blocked waiters" );
     214                        state = FUTURE_EMPTY$;
     215                        free( except );
     216                        except = 0p;
     217                        unlock( lock );
     218                }
     219        } // static inline
     220} // forall( T )
     221
     222//--------------------------------------------------------------------------------------------------------
     223// future_rc uses reference counting to eliminate explicit storage-management and support the waituntil
     224// statement.
    228225//--------------------------------------------------------------------------------------------------------
    229226
    230227forall( T ) {
     228        // PRIVATE
     229
     230        struct future_rc_impl$ {
     231                futex_mutex lock;                                                               // concurrent protection
     232                size_t refCnt;                                                                  // number of references to future
     233                future(T) fut;                                                                  // underlying future
     234        }; // future_rc_impl$
     235
     236        static inline {
     237                size_t incRef$( future_rc_impl$( T ) & impl ) with( impl ) {
     238                        return __atomic_fetch_add( &refCnt, 1, __ATOMIC_SEQ_CST );
     239                } // incRef$
     240
     241                size_t decRef$( future_rc_impl$( T ) & impl ) with( impl ) {
     242                        return __atomic_fetch_add( &refCnt, -1, __ATOMIC_SEQ_CST );
     243                } // decRef$
     244
     245                void ?{}( future_rc_impl$( T ) & frc ) with( frc ) {
     246                        refCnt = 1;                                                                     // count initial object
     247                } // ?{}
     248        } // static inline
     249       
     250        // PUBLIC
     251
     252        struct future_rc {
     253                future_rc_impl$(T) * impl;             
     254        }; // future_rc
     255        __CFA_SELECT_GET_TYPE( future_rc(T) );                          // magic
     256               
     257        static inline {
     258                // PRIVATE
     259
     260                bool register_select$( future_rc(T) & frc, select_node & s ) with( frc ) { // for waituntil statement
     261                        return register_select$( frc.impl->fut, s );
     262                }
     263
     264                bool unregister_select$( future_rc(T) & frc, select_node & s ) with( frc ) { // for waituntil statement
     265                        return unregister_select$( frc.impl->fut, s );
     266                }
     267
     268                bool on_selected$( future_rc(T) &, select_node & ) { return true; } // for waituntil statement
     269
     270                // PUBLIC
     271
     272                // General
     273
     274                void ?{}( future_rc( T ) & frc ) with( frc ) {  // default constructor
     275                        impl = new();
     276                } // ?{}
     277
     278                void ?{}( future_rc( T ) & to, future_rc( T ) & from ) with( to ) { // copy constructor
     279                        impl = from.impl;                                                       // point at new impl
     280                        incRef$( *impl );
     281                } // ?{}
     282
     283                void ^?{}( future_rc( T ) & frc ) with( frc ) {
     284                        if ( decRef$( *impl ) == 1 ) { delete( impl ); impl = 0p; }
     285                } // ^?{}
     286
     287                future_rc( T ) & ?=?( future_rc( T ) & lhs, future_rc( T ) & rhs ) with( lhs ) {
     288                  if ( impl == rhs.impl ) return lhs;                   // self assignment ?
     289                        if ( decRef$( *impl ) == 1 ) { delete( impl ); impl = 0p; } // no references ? => delete current impl
     290                        impl = rhs.impl;                                                        // point at new impl
     291                        incRef$( *impl );                                                       //   and increment reference count
     292                        return lhs;
     293                } // ?+?
     294
     295                // Used by Client
     296
     297                bool available( future_rc( T ) & frc ) { return available( frc.impl->fut ); } // future result available ?
     298
     299                // Return a value/exception from the future.
     300                [T, bool] get( future_rc(T) & frc ) with( frc ) { return get( impl->fut ); } // return future value
     301                T get( future_rc(T) & frc ) with( frc ) { return get( impl->fut ); } // return future value
     302                T ?()( future_rc(T) & frc ) with( frc ) { return get( frc ); } // alternate interface
     303                [T, bool] try_get( future_rc(T) & frc ) with( frc ) { return try_get( impl->fut ); }
     304
     305                int ?==?( future_rc( T ) & lhs, future_rc( T ) & rhs ) { return lhs.impl == rhs.impl; } // referential equality
     306
     307                // Used by Server
     308
     309                // Load a value/exception into the future, returns whether or not waiting threads.
     310                bool fulfil( future_rc(T) & frc, T val ) with( frc ) { return fulfil( impl->fut, val ); } // copy-in future value
     311                bool ?()( future_rc(T) & frc, T val ) { return fulfil( frc, val ); } // alternate interface
     312
     313                bool fulfil( future_rc(T) & frc, exception_t * ex ) with( frc ) { return fulfil( impl->fut, ex ); } // insert future exception
     314                bool ?()( future_rc(T) & frc, exception_t * ex ) { return fulfil( frc, ex ); } // alternate interface
     315
     316                void reset( future_rc(T) & frc ) with( frc ) { reset( impl->fut ); } // mark future as empty (for reuse)
     317        } // static inline
     318} // forall( T )
     319
     320//--------------------------------------------------------------------------------------------------------
     321// This future does not support waituntil statements so it does not have as many features as 'future'.
     322// However, it is cheap and cheerful and is more performant than 'future' since it uses raw atomics
     323// and no locks
     324//--------------------------------------------------------------------------------------------------------
     325
     326forall( T ) {
     327        // PUBLIC
     328
    231329        struct single_future {
    232330                inline future_t;
     
    235333
    236334        static inline {
    237                 // Reset future back to original state
    238                 void reset(single_future(T) & this) { reset( (future_t&)this ); }
    239 
    240                 // check if the future is available
    241                 bool available( single_future(T) & this ) { return available( (future_t&)this ); }
    242 
    243                 // Mark the future as abandoned, meaning it will be deleted by the server
    244                 // This doesn't work beause of the potential need for a destructor
    245                 // void abandon( single_future(T) & this );
    246 
    247                 // Fulfil the future, returns whether or not someone was unblocked
    248                 thread$ * fulfil( single_future(T) & this, T result ) {
    249                         this.result = result;
    250                         return fulfil( (future_t&)this );
    251                 }
    252 
    253                 // Wait for the future to be fulfilled
    254                 // Also return whether the thread had to block or not
    255                 [T, bool] wait( single_future(T) & this ) {
    256                         bool r = wait( (future_t&)this );
    257                         return [this.result, r];
    258                 }
    259 
    260                 // Wait for the future to be fulfilled
    261                 T wait( single_future(T) & this ) {
    262                         [T, bool] tt;
    263                         tt = wait(this);
    264                         return tt.0;
    265                 }
    266         }
    267 }
    268 
    269 forall( T ) {
    270         monitor multi_future {
    271                 inline future_t;
    272                 condition blocked;
    273                 bool has_first;
    274                 T result;
    275         };
    276 
    277         static inline {
    278                 void ?{}(multi_future(T) & this) {
    279                         this.has_first = false;
    280                 }
    281 
    282                 bool $first( multi_future(T) & mutex this ) {
    283                         if ( this.has_first ) {
    284                                 wait( this.blocked );
    285                                 return false;
    286                         }
    287 
    288                         this.has_first = true;
    289                         return true;
    290                 }
    291 
    292                 void $first_done( multi_future(T) & mutex this ) {
    293                         this.has_first = false;
    294                         signal_all( this.blocked );
    295                 }
    296 
    297                 // Reset future back to original state
    298                 void reset(multi_future(T) & mutex this) {
    299                         if ( this.has_first != false ) abort("Attempting to reset a multi_future with at least one blocked threads");
    300                         if ( !is_empty(this.blocked) ) abort("Attempting to reset a multi_future with multiple blocked threads");
    301                         reset( (future_t&)*(future_t*)((uintptr_t)&this + sizeof(monitor$)) );
    302                 }
    303 
    304                 // Fulfil the future, returns whether or not someone was unblocked
    305                 bool fulfil( multi_future(T) & this, T result ) {
    306                         this.result = result;
    307                         return fulfil( (future_t&)*(future_t*)((uintptr_t)&this + sizeof(monitor$)) ) != 0p;
    308                 }
    309 
    310                 // Wait for the future to be fulfilled
    311                 // Also return whether the thread had to block or not
    312                 [T, bool] wait( multi_future(T) & this ) {
    313                         bool sw = $first( this );
    314                         bool w = !sw;
    315                         if ( sw ) {
    316                                 w = wait( (future_t&)*(future_t*)((uintptr_t)&this + sizeof(monitor$)) );
    317                                 $first_done( this );
    318                         }
    319 
    320                         return [this.result, w];
    321                 }
    322 
    323                 // Wait for the future to be fulfilled
    324                 T wait( multi_future(T) & this ) {
    325                         return wait(this).0;
    326                 }
    327         }
    328 }
     335                // PUBLIC
     336
     337                bool available( single_future(T) & fut ) { return available( (future_t &)fut ); } // future result available ?
     338
     339                // Return a value/exception from the future.
     340                [T, bool] get( single_future(T) & fut ) { return [fut.result, wait( fut )]; }
     341                T get( single_future(T) & fut ) { wait( fut ); return fut.result; }
     342                T ?()( single_future(T) & fut ) { return get( fut ); }  // alternate interface
     343
     344                // Load a value into the future, returns whether or not waiting threads.
     345                bool fulfil( single_future(T) & fut, T result ) {
     346                        fut.result = result;
     347                        return fulfil( (future_t &)fut ) != 0p;
     348                }
     349                bool ?()( single_future(T) & fut, T val ) { return fulfil( fut, val ); } // alternate interface
     350
     351                void reset( single_future(T) & fut ) { reset( (future_t &)fut ); } // mark future as empty (for reuse)
     352        } // static inline
     353} // forall( T )
Note: See TracChangeset for help on using the changeset viewer.