Changeset 3483185


Ignore:
Timestamp:
Mar 2, 2025, 2:58:57 PM (7 months ago)
Author:
kyoung <lseo@…>
Branches:
master
Children:
195f43d, 31df72b
Parents:
b8b64c34
git-author:
kyoung <lseo@…> (03/01/25 00:00:59)
git-committer:
kyoung <lseo@…> (03/02/25 14:58:57)
Message:

Make it possible to fulfil a future by loading an exception like ucpp

Files:
2 added
2 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/Exception.hfa

    rb8b64c34 r3483185  
    66#define ExceptionArgs( name, args... ) &name ## _vt, args
    77#define ExceptionInst( name, args... ) (name){ ExceptionArgs( name, args ) }
     8#define ExceptionPtr( E ) (exception_t *) & E
  • libcfa/src/concurrency/future.hfa

    rb8b64c34 r3483185  
    99// Author           : Thierry Delisle & Peiran Hong & Colby Parsons
    1010// Created On       : Wed Jan 06 17:33:18 2021
    11 // Last Modified By : Kyoung Seo
    12 // Last Modified On : Mon Jan 27 20:35:00 2025
    13 // Update Count     : 3
     11// Last Modified By : Peter A. Buhr
     12// Last Modified On : Sun Mar  2 14:45:56 2025
     13// Update Count     : 19
    1414//
    1515
     
    2626// future_t is lockfree and uses atomics which aren't needed given we use locks here
    2727forall( T ) {
    28     enum { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 };
     28        enum { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 };
    2929
    3030        struct future {
    3131                int state;
    3232                T result;
     33                exception_t * except;
    3334                dlist( select_node ) waiters;
    34         futex_mutex lock;
     35                futex_mutex lock;
    3536        };
    36     __CFA_SELECT_GET_TYPE( future(T) );
    37 
    38     struct future_node {
    39         inline select_node;
    40         T * my_result;
    41     };
     37        __CFA_SELECT_GET_TYPE( future(T) );
     38
     39        struct future_node {
     40                inline select_node;
     41                T * my_result;
     42        };
    4243
    4344        static inline {
    4445
    45         void ?{}( future_node(T) & this, thread$ * blocked_thread, T * my_result ) {
    46             ((select_node &)this){ blocked_thread };
    47             this.my_result = my_result;
    48         }
    49 
    50         void ?{}( future(T) & this ) {
     46                void ?{}( future_node(T) & this, thread$ * blocked_thread, T * my_result ) {
     47                        ((select_node &)this){ blocked_thread };
     48                        this.my_result = my_result;
     49                }
     50
     51                void ?{}( future(T) & this ) {
    5152                        this.waiters{};
    52             this.state = FUTURE_EMPTY;
    53             this.lock{};
     53                        this.except = 0p;
     54                        this.state = FUTURE_EMPTY;
     55                        this.lock{};
     56                }
     57
     58                void ^?{}( future(T) & this ) {
     59                        free( this.except );
    5460                }
    5561
    5662                // Reset future back to original state
    57                 void reset( future(T) & this ) with(this)
    58         {
    59             lock( lock );
    60             if( ! waiters`isEmpty )
    61                 abort("Attempting to reset a future with blocked waiters");
    62             state = FUTURE_EMPTY;
    63             unlock( lock );
    64         }
     63                void reset( future(T) & this ) with(this) {
     64                        lock( lock );
     65                        if ( ! waiters`isEmpty )
     66                                abort("Attempting to reset a future with blocked waiters");
     67                        state = FUTURE_EMPTY;
     68                        free( except );
     69                        except = 0p;
     70                        unlock( lock );
     71                }
    6572
    6673                // check if the future is available
    67         // currently no mutual exclusion because I can't see when you need this call to be synchronous or protected
     74                // currently no mutual exclusion because I can't see when you need this call to be synchronous or protected
    6875                bool available( future(T) & this ) { return __atomic_load_n( &this.state, __ATOMIC_RELAXED ); }
    6976
    7077
    71         // memcpy wrapper to help copy values
    72         void copy_T( T & from, T & to ) {
    73             memcpy((void *)&to, (void *)&from, sizeof(T));
    74         }
    75 
    76         // internal helper to signal waiters off of the future
    77         void _internal_flush( future(T) & this ) with(this) {
    78             while( ! waiters`isEmpty ) {
    79                 if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case
    80                     break; // if handle_OR returns false then waiters is empty so break
    81                 select_node &s = try_pop_front( waiters );
    82 
    83                 if ( s.clause_status == 0p ) // poke in result so that woken threads do not need to reacquire any locks
    84                     copy_T( result, *(((future_node(T) &)s).my_result) );
    85 
    86                 wake_one( waiters, s );
    87             }
    88         }
     78                // memcpy wrapper to help copy values
     79                void copy_T( T & from, T & to ) {
     80                        memcpy((void *)&to, (void *)&from, sizeof(T));
     81                }
     82
     83                bool fulfil$( future(T) & this ) with(this) {   // helper
     84                        bool ret_val = ! waiters`isEmpty;
     85                        state = FUTURE_FULFILLED;
     86                        while ( ! waiters`isEmpty ) {
     87                                if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case
     88                                        break; // if handle_OR returns false then waiters is empty so break
     89                                select_node &s = try_pop_front( waiters );
     90
     91                                if ( s.clause_status == 0p )                    // poke in result so that woken threads do not need to reacquire any locks
     92                                        copy_T( result, *(((future_node(T) &)s).my_result) );
     93
     94                                wake_one( waiters, s );
     95                        }
     96                        unlock( lock );
     97                        return ret_val;
     98                }
    8999
    90100                // Fulfil the future, returns whether or not someone was unblocked
    91101                bool fulfil( future(T) & this, T val ) with(this) {
    92             lock( lock );
    93             if( state != FUTURE_EMPTY )
    94                 abort("Attempting to fulfil a future that has already been fulfilled");
    95 
    96             copy_T( val, result );
    97 
    98             bool ret_val = ! waiters`isEmpty;
    99             state = FUTURE_FULFILLED;
    100                         _internal_flush( this );
    101             unlock( lock );
    102             return ret_val;
     102                        lock( lock );
     103                        if ( state != FUTURE_EMPTY )
     104                                abort("Attempting to fulfil a future that has already been fulfilled");
     105
     106                        copy_T( val, result );
     107                        return fulfil$( this );
     108                }
     109
     110                bool ?()( future(T) & this, T val ) {                   // alternate interface
     111                        return fulfil( this, val );
     112                }
     113
     114                // Load an exception to the future, returns whether or not someone was unblocked
     115                bool fulfil( future(T) & this, exception_t * ex ) with(this) {
     116                        lock( lock );
     117                        if ( state != FUTURE_EMPTY )
     118                                abort("Attempting to fulfil a future that has already been fulfilled");
     119
     120                        except = ( exception_t * ) malloc( ex->virtual_table->size );
     121                        ex->virtual_table->copy( except, ex );
     122                        return fulfil$( this );
     123                }
     124
     125                bool ?()( future(T) & this, exception_t * ex ) { // alternate interface
     126                        return fulfil( this, ex );
    103127                }
    104128
     
    106130                // Also return whether the thread had to block or not
    107131                [T, bool] get( future(T) & this ) with( this ) {
    108             lock( lock );
    109             T ret_val;
    110             if( state == FUTURE_FULFILLED ) {
    111                 copy_T( result, ret_val );
    112                 unlock( lock );
    113                 return [ret_val, false];
    114             }
    115 
    116             future_node(T) node = { active_thread(), &ret_val };
    117             insert_last( waiters, ((select_node &)node) );
    118             unlock( lock );
    119             park( );
     132                        void exceptCheck() {                                            // helper
     133                                if ( except ) {
     134                                        exception_t * ex = ( exception_t * ) alloca( except->virtual_table->size );
     135                                        except->virtual_table->copy( ex, except );
     136                                        unlock( lock );
     137                                        throwResume * ex;
     138                                }
     139                        }
     140
     141                        lock( lock );
     142                        T ret_val;
     143                        if ( state == FUTURE_FULFILLED ) {
     144                                exceptCheck();
     145                                copy_T( result, ret_val );
     146                                unlock( lock );
     147                                return [ret_val, false];
     148                        }
     149
     150                        future_node(T) node = { active_thread(), &ret_val };
     151                        insert_last( waiters, ((select_node &)node) );
     152                        unlock( lock );
     153                        park( );
     154                        exceptCheck();
    120155
    121156                        return [ret_val, true];
     
    129164                }
    130165
    131         // Gets value if it is available and returns [ val, true ]
    132         // otherwise returns [ default_val, false]
    133         // will not block
    134         [T, bool] try_get( future(T) & this ) with(this) {
    135             lock( lock );
    136             T ret_val;
    137             if( state == FUTURE_FULFILLED ) {
    138                 copy_T( result, ret_val );
    139                 unlock( lock );
    140                 return [ret_val, true];
    141             }
    142             unlock( lock );
    143 
    144             return [ret_val, false];
    145         }
    146 
    147         bool register_select( future(T) & this, select_node & s ) with(this) {
    148             lock( lock );
    149 
    150             // check if we can complete operation. If so race to establish winner in special OR case
    151             if ( !s.park_counter && state != FUTURE_EMPTY ) {
    152                 if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering
    153                     unlock( lock );
    154                     return false;
    155                 }
    156             }
    157 
    158             // future not ready -> insert select node and return
    159             if( state == FUTURE_EMPTY ) {
    160                 insert_last( waiters, s );
    161                 unlock( lock );
    162                 return false;
    163             }
    164 
    165             __make_select_node_available( s );
    166             unlock( lock );
    167             return true;
    168         }
    169 
    170         bool unregister_select( future(T) & this, select_node & s ) with(this) {
    171             if ( ! s`isListed ) return false;
    172             lock( lock );
    173             if ( s`isListed ) remove( s );
    174             unlock( lock );
    175             return false;
    176         }
    177 
    178         bool on_selected( future(T) &, select_node & ) { return true; }
     166                T ?()( future(T) & this ) {                                             // alternate interface
     167                        return get( this );
     168                }
     169
     170                // Gets value if it is available and returns [ val, true ]
     171                // otherwise returns [ default_val, false]
     172                // will not block
     173                [T, bool] try_get( future(T) & this ) with(this) {
     174                        lock( lock );
     175                        T ret_val;
     176                        if ( state == FUTURE_FULFILLED ) {
     177                                copy_T( result, ret_val );
     178                                unlock( lock );
     179                                return [ret_val, true];
     180                        }
     181                        unlock( lock );
     182
     183                        return [ret_val, false];
     184                }
     185
     186                bool register_select( future(T) & this, select_node & s ) with(this) {
     187                        lock( lock );
     188
     189                        // check if we can complete operation. If so race to establish winner in special OR case
     190                        if ( !s.park_counter && state != FUTURE_EMPTY ) {
     191                                if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering
     192                                        unlock( lock );
     193                                        return false;
     194                                }
     195                        }
     196
     197                        // future not ready -> insert select node and return
     198                        if ( state == FUTURE_EMPTY ) {
     199                                insert_last( waiters, s );
     200                                unlock( lock );
     201                                return false;
     202                        }
     203
     204                        __make_select_node_available( s );
     205                        unlock( lock );
     206                        return true;
     207                }
     208
     209                bool unregister_select( future(T) & this, select_node & s ) with(this) {
     210                        if ( ! s`isListed ) return false;
     211                        lock( lock );
     212                        if ( s`isListed ) remove( s );
     213                        unlock( lock );
     214                        return false;
     215                }
     216
     217                bool on_selected( future(T) &, select_node & ) { return true; }
    179218        }
    180219}
     
    242281
    243282                bool $first( multi_future(T) & mutex this ) {
    244                         if (this.has_first) {
     283                        if ( this.has_first ) {
    245284                                wait( this.blocked );
    246285                                return false;
     
    258297                // Reset future back to original state
    259298                void reset(multi_future(T) & mutex this) {
    260                         if( this.has_first != false) abort("Attempting to reset a multi_future with at least one blocked threads");
    261                         if( !is_empty(this.blocked) ) abort("Attempting to reset a multi_future with multiple blocked threads");
     299                        if ( this.has_first != false ) abort("Attempting to reset a multi_future with at least one blocked threads");
     300                        if ( !is_empty(this.blocked) ) abort("Attempting to reset a multi_future with multiple blocked threads");
    262301                        reset( (future_t&)*(future_t*)((uintptr_t)&this + sizeof(monitor$)) );
    263302                }
Note: See TracChangeset for help on using the changeset viewer.