Changeset 8ffee9a


Ignore:
Timestamp:
Nov 19, 2025, 10:00:11 AM (18 hours ago)
Author:
Peter A. Buhr <pabuhr@…>
Branches:
master
Parents:
b5749f9
Message:

more cleanup of future code

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/future.hfa

    rb5749f9 r8ffee9a  
    1010// Created On       : Wed Jan 06 17:33:18 2021
    1111// Last Modified By : Peter A. Buhr
    12 // Last Modified On : Mon Nov 17 08:58:38 2025
    13 // Update Count     : 164
     12// Last Modified On : Wed Nov 19 09:26:38 2025
     13// Update Count     : 204
    1414//
    1515
     
    2727
    2828forall( T ) {
    29         enum { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 };
     29        // PRIVATE
     30
     31        struct future_node$ {
     32                inline select_node;
     33                T * my_result;
     34        };
     35
     36        static inline {
     37                // memcpy wrapper to help copy values
     38                void copy_T$( T & to, T & from ) { memcpy( (void *)&to, (void *)&from, sizeof(T) ); }
     39        } // distribution
     40
     41        enum { FUTURE_EMPTY$ = 0, FUTURE_FULFILLED$ = 1 };
     42
     43        // PUBLIC
    3044
    3145        struct future {
     
    3347                T result;
    3448                exception_t * except;
     49                futex_mutex lock;
    3550                dlist( select_node ) waiters;
    36                 futex_mutex lock;
    3751        };
    38         __CFA_SELECT_GET_TYPE( future(T) );
    39 
    40         struct future_node {
    41                 inline select_node;
    42                 T * my_result;
    43         };
    44 
    45         static inline {
    46                 void ?{}( future_node(T) & this, thread$ * blocked_thread, T * my_result ) {
    47                         ((select_node &)this){ blocked_thread };
    48                         this.my_result = my_result;
    49                 }
    50 
    51                 void ?{}( future(T) & this ) with( this ) {
    52                         waiters{};
     52        __CFA_SELECT_GET_TYPE( future(T) );                                     // magic
     53
     54        static inline {
     55                // PRIVATE
     56
     57                bool register_select( future(T) & fut, select_node & s ) with( fut ) { // for waituntil statement
     58                        lock( lock );
     59
     60                        // check if we can complete operation. If so race to establish winner in special OR case
     61                        if ( !s.park_counter && state != FUTURE_EMPTY$ ) {
     62                                if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering
     63                                        unlock( lock );
     64                                        return false;
     65                                }
     66                        }
     67
     68                        // future not ready -> insert select node and return
     69                  if ( state == FUTURE_EMPTY$ ) {
     70                                insert_last( waiters, s );
     71                                unlock( lock );
     72                                return false;
     73                        }
     74
     75                        __make_select_node_available( s );
     76                        unlock( lock );
     77                        return true;
     78                }
     79
     80                bool unregister_select( future(T) & fut, select_node & s ) with( fut ) { // for waituntil statement
     81                  if ( ! isListed( s ) ) return false;
     82                        lock( lock );
     83                        if ( isListed( s ) ) remove( s );
     84                        unlock( lock );
     85                        return false;
     86                }
     87
     88                bool on_selected( future(T) &, select_node & ) { return true; } // for waituntil statement
     89
     90                // PUBLIC
     91
     92                // General
     93
     94                void ?{}( future_node$(T) & fut, thread$ * blocked_thread, T * my_result ) {
     95                        ((select_node &)fut){ blocked_thread };
     96                        fut.my_result = my_result;
     97                }
     98
     99                void ?{}( future(T) & fut ) with( fut ) {
     100//                      waiters{};
    53101                        except = 0p;
    54                         state = FUTURE_EMPTY;
     102                        state = FUTURE_EMPTY$;
    55103                        lock{};
    56104                }
    57105
    58                 void ^?{}( future(T) & this ) with( this ) {
     106                void ^?{}( future(T) & fut ) with( fut ) {
    59107                        free( except );
    60108                }
    61109
    62                 // Reset future back to original state
    63                 void reset( future(T) & this ) with(this) {
    64                         lock( lock );
    65                         if ( ! isEmpty( waiters ) )
    66                                 abort("Attempting to reset a future with blocked waiters");
    67                         state = FUTURE_EMPTY;
    68                         free( except );
    69                         except = 0p;
    70                         unlock( lock );
    71                 }
    72 
    73                 // check if the future is available
    74                 // currently no mutual exclusion because I can't see when you need this call to be synchronous or protected
    75                 bool available( future(T) & this ) { return __atomic_load_n( &this.state, __ATOMIC_RELAXED ); }
    76 
    77 
    78                 // memcpy wrapper to help copy values
    79                 void copy_T$( T & from, T & to ) {
    80                         memcpy((void *)&to, (void *)&from, sizeof(T));
    81                 }
    82 
    83                 bool fulfil$( future(T) & this ) with(this) {   // helper
    84                         bool ret_val = ! isEmpty( waiters );
    85                         state = FUTURE_FULFILLED;
    86                         while ( ! isEmpty( waiters ) ) {
    87                                 if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case
    88                                         break; // if handle_OR returns false then waiters is empty so break
    89                                 select_node &s = remove_first( waiters );
    90 
    91                                 if ( s.clause_status == 0p )                    // poke in result so that woken threads do not need to reacquire any locks
    92                                         copy_T$( result, *(((future_node(T) &)s).my_result) );
    93 
    94                                 wake_one( waiters, s );
    95                         }
    96                         unlock( lock );
    97                         return ret_val;
    98                 }
    99 
    100                 // Fulfil the future, returns whether or not someone was unblocked
    101                 bool fulfil( future(T) & this, T val ) with(this) {
    102                         lock( lock );
    103                         if ( state != FUTURE_EMPTY )
    104                                 abort("Attempting to fulfil a future that has already been fulfilled");
    105 
    106                         copy_T$( val, result );
    107                         return fulfil$( this );
    108                 }
    109 
    110                 bool ?()( future(T) & this, T val ) {                   // alternate interface
    111                         return fulfil( this, val );
    112                 }
    113 
    114                 // Load an exception to the future, returns whether or not someone was unblocked
    115                 bool fulfil( future(T) & this, exception_t * ex ) with(this) {
    116                         lock( lock );
    117                         if ( state != FUTURE_EMPTY )
    118                                 abort("Attempting to fulfil a future that has already been fulfilled");
    119 
    120                         except = ( exception_t * ) malloc( ex->virtual_table->size );
    121                         ex->virtual_table->copy( except, ex );
    122                         return fulfil$( this );
    123                 }
    124 
    125                 bool ?()( future(T) & this, exception_t * ex ) { // alternate interface
    126                         return fulfil( this, ex );
    127                 }
    128 
    129                 // Wait for the future to be fulfilled
    130                 // Also return whether the thread had to block or not
    131                 [T, bool] get( future(T) & this ) with( this ) {
     110                // Used by Client
     111
     112                // PRIVATE
     113
     114                // Return a value/exception from the future.
     115                T get$( future(T) & fut ) with( fut ) {                 // helper
    132116                        void exceptCheck() {                                            // helper
    133117                                if ( except ) {
     
    138122                                }
    139123                        }
    140 
    141                         lock( lock );
    142124                        T ret_val;
    143                         if ( state == FUTURE_FULFILLED ) {
     125
     126                        // LOCK ACQUIRED IN PUBLIC get
     127                        if ( state == FUTURE_FULFILLED$ ) {
    144128                                exceptCheck();
    145                                 copy_T$( result, ret_val );
     129                                copy_T$( ret_val, result );
    146130                                unlock( lock );
    147                                 return [ret_val, false];
    148                         }
    149 
    150                         future_node(T) node = { active_thread(), &ret_val };
     131                                return ret_val;
     132                        }
     133
     134                        future_node$(T) node = { active_thread(), &ret_val };
    151135                        insert_last( waiters, ((select_node &)node) );
    152136                        unlock( lock );
    153137                        park( );
    154138                        exceptCheck();
    155 
    156                         return [ret_val, true];
    157                 }
    158 
    159                 // Wait for the future to be fulfilled
    160                 T get( future(T) & this ) {
    161                         [T, bool] tt;
    162                         tt = get(this);
    163                         return tt.0;
    164                 }
    165 
    166                 T ?()( future(T) & this ) {                                             // alternate interface
    167                         return get( this );
    168                 }
    169 
    170                 // Gets value if it is available and returns [ val, true ]
    171                 // otherwise returns [ default_val, false]
    172                 // will not block
    173                 [T, bool] try_get( future(T) & this ) with(this) {
     139                        return ret_val;
     140                }
     141
     142                // PUBLIC
     143
     144                bool available( future( T ) & fut ) { return __atomic_load_n( &fut.state, __ATOMIC_RELAXED ); } // future result available ?
     145
     146                // Return a value/exception from the future.
     147                [T, bool] get( future(T) & fut ) with( fut ) {
     148                        lock( lock );
     149                        bool ret = state == FUTURE_EMPTY$;
     150                        return [ get$( fut ), ret ];
     151                }
     152
     153                T get( future(T) & fut ) with( fut ) {
     154                        lock( lock );
     155                        return get$( fut );
     156                }
     157                T ?()( future(T) & fut ) { return get( fut ); } // alternate interface
     158
     159                // Non-blocking get: true => return defined value, false => value return undefined.
     160                [T, bool] try_get( future(T) & fut ) with( fut ) {
    174161                        lock( lock );
    175162                        T ret_val;
    176                         if ( state == FUTURE_FULFILLED ) {
    177                                 copy_T$( result, ret_val );
     163                        if ( state == FUTURE_FULFILLED$ ) {
     164                                copy_T$( ret_val, result );
    178165                                unlock( lock );
    179166                                return [ret_val, true];
     
    183170                }
    184171
    185                 bool register_select( future(T) & this, select_node & s ) with(this) {
    186                         lock( lock );
    187 
    188                         // check if we can complete operation. If so race to establish winner in special OR case
    189                         if ( !s.park_counter && state != FUTURE_EMPTY ) {
    190                                 if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering
    191                                         unlock( lock );
    192                                         return false;
    193                                 }
    194                         }
    195 
    196                         // future not ready -> insert select node and return
    197                         if ( state == FUTURE_EMPTY ) {
    198                                 insert_last( waiters, s );
    199                                 unlock( lock );
    200                                 return false;
    201                         }
    202 
    203                         __make_select_node_available( s );
    204                         unlock( lock );
    205                         return true;
    206                 }
    207 
    208                 bool unregister_select( future(T) & this, select_node & s ) with(this) {
    209                         if ( ! isListed( s ) ) return false;
    210                         lock( lock );
    211                         if ( isListed( s ) ) remove( s );
    212                         unlock( lock );
    213                         return false;
    214                 }
    215 
    216                 bool on_selected( future(T) &, select_node & ) { return true; }
    217         }
    218 }
     172                // Used by Server
     173
     174                // PRIVATE
     175
     176                bool fulfil$( future(T) & fut ) with( fut ) {   // helper
     177                        bool ret_val = ! isEmpty( waiters );
     178                        state = FUTURE_FULFILLED$;
     179                        while ( ! isEmpty( waiters ) ) {
     180                                if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case
     181                                        break; // if handle_OR returns false then waiters is empty so break
     182                                select_node &s = remove_first( waiters );
     183
     184                                if ( s.clause_status == 0p )                    // poke in result so that woken threads do not need to reacquire any locks
     185                                        copy_T$( *(((future_node$(T) &)s).my_result), result );
     186
     187                                wake_one( waiters, s );
     188                        }
     189                        unlock( lock );
     190                        return ret_val;
     191                }
     192
     193                // PUBLIC
     194
     195                // Load a value/exception into the future, returns whether or not waiting threads.
     196                bool fulfil( future(T) & fut, T val ) with( fut ) {
     197                        lock( lock );
     198                  if ( state != FUTURE_EMPTY$ ) abort("Attempting to fulfil a future that has already been fulfilled");
     199                        copy_T$( result, val );
     200                        return fulfil$( fut );
     201                }
     202                bool ?()( future(T) & fut, T val ) { return fulfil( fut, val ); } // alternate interface
     203
     204                bool fulfil( future(T) & fut, exception_t * ex ) with( fut ) {
     205                        lock( lock );
     206                  if ( state != FUTURE_EMPTY$ ) abort( "Attempting to fulfil a future that has already been fulfilled" );
     207                        except = ( exception_t * ) malloc( ex->virtual_table->size );
     208                        ex->virtual_table->copy( except, ex );
     209                        return fulfil$( fut );
     210                }
     211                bool ?()( future(T) & fut, exception_t * ex ) { return fulfil( fut, ex ); } // alternate interface
     212
     213                void reset( future(T) & fut ) with( fut ) {             // mark future as empty (for reuse)
     214                        lock( lock );
     215                  if ( ! isEmpty( waiters ) ) abort( "Attempting to reset a future with blocked waiters" );
     216                        state = FUTURE_EMPTY$;
     217                        free( except );
     218                        except = 0p;
     219                        unlock( lock );
     220                }
     221        } // static inline
     222} // forall( T )
    219223
    220224//--------------------------------------------------------------------------------------------------------
     
    224228
    225229forall( T ) {
     230        // PRIVATE
     231
    226232        struct future_rc_impl$ {
    227233                futex_mutex lock;                                                               // concurrent protection
     
    231237
    232238        static inline {
    233                 void incRef$( future_rc_impl$( T ) & impl ) with( impl ) {
    234                         __atomic_fetch_add( &refCnt, 1, __ATOMIC_RELAXED );
    235 //                      lock( lock );
    236 //                      refCnt += 1;
    237 //                      unlock( lock );
     239                size_t incRef$( future_rc_impl$( T ) & impl ) with( impl ) {
     240                        return __atomic_fetch_add( &refCnt, 1, __ATOMIC_SEQ_CST );
    238241                } // incRef$
    239242
    240                 bool decRef$( future_rc_impl$( T ) & impl ) with( impl ) {
    241                         return __atomic_fetch_add( &refCnt, -1, __ATOMIC_RELAXED ) == 1;
    242                         // lock( lock );
    243                         // refCnt -= 1;
    244                         // bool ret = refCnt == 0;
    245                         // unlock( lock );
    246                         // return ret;
     243                size_t decRef$( future_rc_impl$( T ) & impl ) with( impl ) {
     244                        return __atomic_fetch_add( &refCnt, -1, __ATOMIC_SEQ_CST );
    247245                } // decRef$
    248246
    249247                void ?{}( future_rc_impl$( T ) & frc ) with( frc ) {
    250                         lock{};                                                                         // intialization
    251                         refCnt = 1;
     248                        refCnt = 1;                                                                     // count initial object
    252249                } // ?{}
    253 
    254                 void ^?{}( future_rc_impl$( T ) & frc ) with( frc ) {
    255                         decRef$( frc );
    256                 } // ^?{}
    257250        } // static inline
    258251       
     252        // PUBLIC
     253
    259254        struct future_rc {
    260255                future_rc_impl$(T) * impl;             
    261256        }; // future_rc
    262         __CFA_SELECT_GET_TYPE( future_rc(T) );
     257        __CFA_SELECT_GET_TYPE( future_rc(T) );                          // magic
    263258               
    264259        static inline {
    265                 void ?{}( future_rc( T ) & frc ) with( frc ) {
     260                // PRIVATE
     261
     262                bool register_select( future_rc(T) & frc, select_node & s ) with( frc ) { // for waituntil statement
     263                        return register_select( frc.impl->fut, s );
     264                }
     265
     266                bool unregister_select( future_rc(T) & frc, select_node & s ) with( frc ) { // for waituntil statement
     267                        return unregister_select( frc.impl->fut, s );
     268                }
     269
     270                bool on_selected( future_rc(T) &, select_node & ) { return true; } // for waituntil statement
     271
     272                // PUBLIC
     273
     274                // General
     275
     276                void ?{}( future_rc( T ) & frc ) with( frc ) {  // default constructor
    266277                        impl = new();
    267278                } // ?{}
    268279
    269                 void ?{}( future_rc( T ) & to, future_rc( T ) & from ) with( to ) {
     280                void ?{}( future_rc( T ) & to, future_rc( T ) & from ) with( to ) { // copy constructor
    270281                        impl = from.impl;                                                       // point at new impl
    271282                        incRef$( *impl );
     
    273284
    274285                void ^?{}( future_rc( T ) & frc ) with( frc ) {
    275                         if ( decRef$( *impl ) ) { delete( impl ); impl = 0p; }
     286                        if ( decRef$( *impl ) == 1 ) { delete( impl ); impl = 0p; }
    276287                } // ^?{}
    277288
    278289                future_rc( T ) & ?=?( future_rc( T ) & lhs, future_rc( T ) & rhs ) with( lhs ) {
    279290                  if ( impl == rhs.impl ) return lhs;                   // self assignment ?
    280                         if ( decRef$( *impl ) ) { delete( impl ); impl = 0p; } // no references => delete current impl
     291                        if ( decRef$( *impl ) == 1 ) { delete( impl ); impl = 0p; } // no references ? => delete current impl
    281292                        impl = rhs.impl;                                                        // point at new impl
    282293                        incRef$( *impl );                                                       //   and increment reference count
     
    284295                } // ?+?
    285296
    286                 bool register_select( future_rc(T) & this, select_node & s ) with( this ) {
    287                         return register_select( this.impl->fut, s );
    288                 }
    289 
    290                 bool unregister_select( future_rc(T) & this, select_node & s ) with( this ) {
    291                         return unregister_select( this.impl->fut, s );
    292                 }
    293 
    294                 bool on_selected( future_rc(T) &, select_node & ) { return true; }
    295 
    296                 // USED BY CLIENT
     297                // Used by Client
    297298
    298299                bool available( future_rc( T ) & frc ) { return available( frc.impl->fut ); } // future result available ?
    299300
    300                 bool fulfil( future_rc(T) & frc, T val ) with( frc ) { return fulfil( impl->fut, val ); }
     301                // Return a value/exception from the future.
     302                [T, bool] get( future_rc(T) & frc ) with( frc ) { return get( impl->fut ); } // return future value
     303                T get( future_rc(T) & frc ) with( frc ) { return get( impl->fut ); } // return future value
     304                T ?()( future_rc(T) & frc ) with( frc ) { return get( frc ); } // alternate interface
     305                [T, bool] try_get( future_rc(T) & frc ) with( frc ) { return try_get( impl->fut ); }
     306
     307                int ?==?( future_rc( T ) & lhs, future_rc( T ) & rhs ) { return lhs.impl == rhs.impl; } // referential equality
     308
     309                // Used by Server
     310
     311                // Load a value/exception into the future, returns whether or not waiting threads.
     312                bool fulfil( future_rc(T) & frc, T val ) with( frc ) { return fulfil( impl->fut, val ); } // copy-in future value
    301313                bool ?()( future_rc(T) & frc, T val ) { return fulfil( frc, val ); } // alternate interface
    302314
    303                 int ?==?( future_rc( T ) & lhs, future_rc( T ) & rhs ) { return lhs.impl == rhs.impl; } // referential equality
    304 
    305                 // USED BY SERVER
    306 
    307                 T get( future_rc(T) & frc ) with( frc ) { return get( impl->fut ); }
    308                 T ?()( future_rc(T) & frc ) with( frc ) { return get( frc ); } // alternate interface
    309 
    310                 bool fulfil( future_rc(T) & frc, exception_t * ex ) with( frc ) { return fulfil( impl->fut, ex ); }
     315                bool fulfil( future_rc(T) & frc, exception_t * ex ) with( frc ) { return fulfil( impl->fut, ex ); } // insert future exception
    311316                bool ?()( future_rc(T) & frc, exception_t * ex ) { return fulfil( frc, ex ); } // alternate interface
    312317
     
    357362                T wait( single_future(T) & this ) {
    358363                        [T, bool] tt;
    359                         tt = wait(this);
     364                        tt = wait( this );
    360365                        return tt.0;
    361366                }
     
    419424                // Wait for the future to be fulfilled
    420425                T wait( multi_future(T) & this ) {
    421                         return wait(this).0;
     426                        return wait( this ).0;
    422427                }
    423428        }
Note: See TracChangeset for help on using the changeset viewer.