Changeset 4a16ddfa


Ignore:
Timestamp:
Nov 17, 2025, 9:13:40 AM (44 hours ago)
Author:
Peter A. Buhr <pabuhr@…>
Branches:
master
Children:
f04623f
Parents:
b94579a
Message:

add reference counting futures for use with waituntil

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/future.hfa

    rb94579a r4a16ddfa  
    77// concurrency/future.hfa --
    88//
    9 // Author           : Thierry Delisle & Peiran Hong & Colby Parsons
     9// Author           : Thierry Delisle & Peiran Hong & Colby Parsons & Peter Buhr
    1010// Created On       : Wed Jan 06 17:33:18 2021
    1111// Last Modified By : Peter A. Buhr
    12 // Last Modified On : Tue Nov  4 22:04:42 2025
    13 // Update Count     : 23
     12// Last Modified On : Mon Nov 17 08:58:38 2025
     13// Update Count     : 164
    1414//
    1515
     
    2121#include "locks.hfa"
    2222
    23 //----------------------------------------------------------------------------
    24 // future
    25 // I don't use future_t here as I need to use a lock for this future since it supports multiple consumers.
    26 // future_t is lockfree and uses atomics which aren't needed given we use locks here
     23//--------------------------------------------------------------------------------------------------------
     24// future does not use future_t as it needs a lock to support multiple consumers.  future_t is lockfree
     25// and uses atomics which are not needed.
     26//--------------------------------------------------------------------------------------------------------
     27
    2728forall( T ) {
    2829        enum { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 };
     
    4344
    4445        static inline {
    45 
    4646                void ?{}( future_node(T) & this, thread$ * blocked_thread, T * my_result ) {
    4747                        ((select_node &)this){ blocked_thread };
     
    4949                }
    5050
    51                 void ?{}( future(T) & this ) {
    52                         this.waiters{};
    53                         this.except = 0p;
    54                         this.state = FUTURE_EMPTY;
    55                         this.lock{};
    56                 }
    57 
    58                 void ^?{}( future(T) & this ) {
    59                         free( this.except );
     51                void ?{}( future(T) & this ) with( this ) {
     52                        waiters{};
     53                        except = 0p;
     54                        state = FUTURE_EMPTY;
     55                        lock{};
     56                }
     57
     58                void ^?{}( future(T) & this ) with( this ) {
     59                        free( except );
    6060                }
    6161
     
    7777
    7878                // memcpy wrapper to help copy values
    79                 void copy_T( T & from, T & to ) {
     79                void copy_T$( T & from, T & to ) {
    8080                        memcpy((void *)&to, (void *)&from, sizeof(T));
    8181                }
     
    9090
    9191                                if ( s.clause_status == 0p )                    // poke in result so that woken threads do not need to reacquire any locks
    92                                         copy_T( result, *(((future_node(T) &)s).my_result) );
     92                                        copy_T$( result, *(((future_node(T) &)s).my_result) );
    9393
    9494                                wake_one( waiters, s );
     
    104104                                abort("Attempting to fulfil a future that has already been fulfilled");
    105105
    106                         copy_T( val, result );
     106                        copy_T$( val, result );
    107107                        return fulfil$( this );
    108108                }
     
    143143                        if ( state == FUTURE_FULFILLED ) {
    144144                                exceptCheck();
    145                                 copy_T( result, ret_val );
     145                                copy_T$( result, ret_val );
    146146                                unlock( lock );
    147147                                return [ret_val, false];
     
    175175                        T ret_val;
    176176                        if ( state == FUTURE_FULFILLED ) {
    177                                 copy_T( result, ret_val );
     177                                copy_T$( result, ret_val );
    178178                                unlock( lock );
    179179                                return [ret_val, true];
    180180                        }
    181181                        unlock( lock );
    182 
    183182                        return [ret_val, false];
    184183                }
     
    220219
    221220//--------------------------------------------------------------------------------------------------------
    222 // These futures below do not support select statements so they may not have as many features as 'future'
     221// future_rc uses reference counting to eliminate explicit storage-management and support the waituntil
     222// statement.
     223//--------------------------------------------------------------------------------------------------------
     224
     225forall( T ) {
     226        struct future_rc_impl$ {
     227                futex_mutex lock;                                                               // concurrent protection
     228                size_t refCnt;                                                                  // number of references to future
     229                future(T) fut;                                                                  // underlying future
     230        }; // future_rc_impl$
     231
     232        static inline {
     233                void incRef$( future_rc_impl$( T ) & impl ) with( impl ) {
     234                        __atomic_fetch_add( &refCnt, 1, __ATOMIC_RELAXED );
     235//                      lock( lock );
     236//                      refCnt += 1;
     237//                      unlock( lock );
     238                } // incRef$
     239
     240                bool decRef$( future_rc_impl$( T ) & impl ) with( impl ) {
     241                        return __atomic_fetch_add( &refCnt, -1, __ATOMIC_RELAXED ) == 1;
     242                        // lock( lock );
     243                        // refCnt -= 1;
     244                        // bool ret = refCnt == 0;
     245                        // unlock( lock );
     246                        // return ret;
     247                } // decRef$
     248
     249                void ?{}( future_rc_impl$( T ) & frc ) with( frc ) {
     250                        lock{};                                                                         // intialization
     251                        refCnt = 1;
     252                } // ?{}
     253
     254                void ^?{}( future_rc_impl$( T ) & frc ) with( frc ) {
     255                        decRef$( frc );
     256                } // ^?{}
     257        } // static inline
     258       
     259        struct future_rc {
     260                future_rc_impl$(T) * impl;             
     261        }; // future_rc
     262        __CFA_SELECT_GET_TYPE( future_rc(T) );
     263               
     264        static inline {
     265                void ?{}( future_rc( T ) & frc ) with( frc ) {
     266                        impl = new();
     267                } // ?{}
     268
     269                void ?{}( future_rc( T ) & to, future_rc( T ) & from ) with( to ) {
     270                        impl = from.impl;                                                       // point at new impl
     271                        incRef$( *impl );
     272                } // ?{}
     273
     274                void ^?{}( future_rc( T ) & frc ) with( frc ) {
     275                        if ( decRef$( *impl ) ) { delete( impl ); impl = 0p; }
     276                } // ^?{}
     277
     278                future_rc( T ) & ?=?( future_rc( T ) & lhs, future_rc( T ) & rhs ) with( lhs ) {
     279                  if ( impl == rhs.impl ) return lhs;                   // self assignment ?
     280                        if ( decRef$( *impl ) ) { delete( impl ); impl = 0p; } // no references => delete current impl
     281                        impl = rhs.impl;                                                        // point at new impl
     282                        incRef$( *impl );                                                       //   and increment reference count
     283                        return lhs;
     284                } // ?+?
     285
     286                bool register_select( future_rc(T) & this, select_node & s ) with( this ) {
     287                        return register_select( this.impl->fut, s );
     288                }
     289
     290                bool unregister_select( future_rc(T) & this, select_node & s ) with( this ) {
     291                        return unregister_select( this.impl->fut, s );
     292                }
     293
     294                bool on_selected( future_rc(T) &, select_node & ) { return true; }
     295
     296                // USED BY CLIENT
     297
     298                bool available( future_rc( T ) & frc ) { return available( frc.impl->fut ); } // future result available ?
     299
     300                bool fulfil( future_rc(T) & frc, T val ) with( frc ) { return fulfil( impl->fut, val ); }
     301                bool ?()( future_rc(T) & frc, T val ) { return fulfil( frc, val ); } // alternate interface
     302
     303                int ?==?( future_rc( T ) & lhs, future_rc( T ) & rhs ) { return lhs.impl == rhs.impl; } // referential equality
     304
     305                // USED BY SERVER
     306
     307                T get( future_rc(T) & frc ) with( frc ) { return get( impl->fut ); }
     308                T ?()( future_rc(T) & frc ) with( frc ) { return get( frc ); } // alternate interface
     309
     310                bool fulfil( future_rc(T) & frc, exception_t * ex ) with( frc ) { return fulfil( impl->fut, ex ); }
     311                bool ?()( future_rc(T) & frc, exception_t * ex ) { return fulfil( frc, ex ); } // alternate interface
     312
     313                void reset( future_rc(T) & frc ) with( frc ) { reset( impl->fut ); } // mark future as empty (for reuse)
     314        } // static inline
     315} // forall( T )
     316
     317//--------------------------------------------------------------------------------------------------------
     318// These futures below do not support waituntil statements so they may not have as many features as 'future'
    223319//  however the 'single_future' is cheap and cheerful and is most likely more performant than 'future'
    224320//  since it uses raw atomics and no locks
    225321//
    226322// As far as 'multi_future' goes I can't see many use cases as it will be less performant than 'future'
    227 //  since it is monitor based and also is not compatible with select statements
     323//  since it is monitor based and also is not compatible with waituntil statement.
    228324//--------------------------------------------------------------------------------------------------------
    229325
Note: See TracChangeset for help on using the changeset viewer.