Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/future.hfa

    r5e180c2 r8f1a99e  
    55// file "LICENCE" distributed with Cforall.
    66//
    7 // concurrency/future.hfa --
     7// io/types.hfa --
    88//
    9 // Author           : Thierry Delisle & Peiran Hong & Colby Parsons
     9// Author           : Thierry Delisle & Peiran Hong
    1010// Created On       : Wed Jan 06 17:33:18 2021
    1111// Last Modified By :
     
    1414//
    1515
    16 // #pragma once
     16#pragma once
    1717
    1818#include "bits/locks.hfa"
    1919#include "monitor.hfa"
    20 #include "select.hfa"
    21 
    22 //----------------------------------------------------------------------------
    23 // future
    24 // I don't use future_t here since I need to use a lock for this future
    25 //  since it supports multiple consumers
    26 //  future_t is lockfree and uses atomics which aren't needed given we use locks here
    27 forall( T ) {
    28     // enum(int) { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 }; // Enums seem to be broken so feel free to add this back afterwards
    29 
    30     // temporary enum replacement
    31     const int FUTURE_EMPTY = 0;
    32     const int FUTURE_FULFILLED = 1;
    33 
    34         struct future {
    35                 int state;
    36                 T result;
    37                 dlist( select_node ) waiters;
    38         futex_mutex lock;
    39         };
    40 
    41     struct future_node {
    42         inline select_node;
    43         T * my_result;
    44     };
    45 
    46     // C_TODO: perhaps allow exceptions to be inserted like uC++?
    47 
    48         static inline {
    49 
    50         void ?{}( future_node(T) & this, thread$ * blocked_thread, T * my_result ) {
    51             ((select_node &)this){ blocked_thread };
    52             this.my_result = my_result;
    53         }
    54 
    55         void ?{}(future(T) & this) {
    56                         this.waiters{};
    57             this.state = FUTURE_EMPTY;
    58             this.lock{};
    59                 }
    60 
    61                 // Reset future back to original state
    62                 void reset(future(T) & this) with(this)
    63         {
    64             lock( lock );
    65             if( ! waiters`isEmpty )
    66                 abort("Attempting to reset a future with blocked waiters");
    67             state = FUTURE_EMPTY;
    68             unlock( lock );
    69         }
    70 
    71                 // check if the future is available
    72         // currently no mutual exclusion because I can't see when you need this call to be synchronous or protected
    73                 bool available( future(T) & this ) { return this.state; }
    74 
    75 
    76         // memcpy wrapper to help copy values
    77         void copy_T( T & from, T & to ) {
    78             memcpy((void *)&to, (void *)&from, sizeof(T));
    79         }
    80 
    81         // internal helper to signal waiters off of the future
    82         void _internal_flush( future(T) & this ) with(this) {
    83             while( ! waiters`isEmpty ) {
    84                 select_node &s = try_pop_front( waiters );
    85 
    86                 if ( s.race_flag == 0p )
    87                     // poke in result so that woken threads do not need to reacquire any locks
    88                     // *(((future_node(T) &)s).my_result) = result;
    89                     copy_T( result, *(((future_node(T) &)s).my_result) );
    90                 else if ( !install_select_winner( s, &this ) ) continue;
    91                
    92                 // only unpark if future is not selected
    93                 // or if it is selected we only unpark if we win the race
    94                 unpark( s.blocked_thread );
    95             }
    96         }
    97 
    98                 // Fulfil the future, returns whether or not someone was unblocked
    99                 bool fulfil( future(T) & this, T & val ) with(this) {
    100             lock( lock );
    101             if( state != FUTURE_EMPTY )
    102                 abort("Attempting to fulfil a future that has already been fulfilled");
    103 
    104             copy_T( val, result );
    105 
    106             bool ret_val = ! waiters`isEmpty;
    107             state = FUTURE_FULFILLED;
    108                         _internal_flush( this );
    109             unlock( lock );
    110             return ret_val;
    111                 }
    112 
    113                 // Wait for the future to be fulfilled
    114                 // Also return whether the thread had to block or not
    115                 [T, bool] get( future(T) & this ) with( this ) {
    116             lock( lock );
    117             T ret_val;
    118             if( state == FUTURE_FULFILLED ) {
    119                 copy_T( result, ret_val );
    120                 unlock( lock );
    121                 return [ret_val, false];
    122             }
    123 
    124             future_node(T) node = { active_thread(), &ret_val };
    125             insert_last( waiters, ((select_node &)node) );
    126             unlock( lock );
    127             park( );
    128 
    129                         return [ret_val, true];
    130                 }
    131 
    132                 // Wait for the future to be fulfilled
    133                 T get( future(T) & this ) {
    134                         [T, bool] tt;
    135                         tt = get(this);
    136                         return tt.0;
    137                 }
    138 
    139         // Gets value if it is available and returns [ val, true ]
    140         // otherwise returns [ default_val, false]
    141         // will not block
    142         [T, bool] try_get( future(T) & this ) with(this) {
    143             lock( lock );
    144             T ret_val;
    145             if( state == FUTURE_FULFILLED ) {
    146                 copy_T( result, ret_val );
    147                 unlock( lock );
    148                 return [ret_val, true];
    149             }
    150             unlock( lock );
    151            
    152             return [ret_val, false];
    153         }
    154 
    155         void * register_select( future(T) & this, select_node & s ) with(this) {
    156             lock( lock );
    157 
    158             // future not ready -> insert select node and return 0p
    159             if( state == FUTURE_EMPTY ) {
    160                 insert_last( waiters, s );
    161                 unlock( lock );
    162                 return 0p;
    163             }
    164 
    165             // future ready and we won race to install it as the select winner return 1p
    166             if ( install_select_winner( s, &this ) ) {
    167                 unlock( lock );
    168                 return 1p;
    169             }
    170 
    171             unlock( lock );
    172             // future ready and we lost race to install it as the select winner
    173             return 2p;
    174         }
    175 
    176         void unregister_select( future(T) & this, select_node & s ) with(this) {
    177             lock( lock );
    178             if ( s`isListed ) remove( s );
    179             unlock( lock );
    180         }
    181                
    182         }
    183 }
    184 
    185 //--------------------------------------------------------------------------------------------------------
    186 // These futures below do not support select statements so they may not be as useful as 'future'
    187 //  however the 'single_future' is cheap and cheerful and is most likely more performant than 'future'
    188 //  since it uses raw atomics and no locks afaik
    189 //
    190 // As far as 'multi_future' goes I can't see many use cases as it will be less performant than 'future'
    191 //  since it is monitor based and also is not compatible with select statements
    192 //--------------------------------------------------------------------------------------------------------
    19320
    19421forall( T ) {
    195         struct single_future {
     22        struct future {
    19623                inline future_t;
    19724                T result;
     
    20027        static inline {
    20128                // Reset future back to original state
    202                 void reset(single_future(T) & this) { reset( (future_t&)this ); }
     29                void reset(future(T) & this) { reset( (future_t&)this ); }
    20330
    20431                // check if the future is available
    205                 bool available( single_future(T) & this ) { return available( (future_t&)this ); }
     32                bool available( future(T) & this ) { return available( (future_t&)this ); }
    20633
    20734                // Mark the future as abandoned, meaning it will be deleted by the server
    20835                // This doesn't work beause of the potential need for a destructor
    209                 void abandon( single_future(T) & this );
     36                void abandon( future(T) & this );
    21037
    21138                // Fulfil the future, returns whether or not someone was unblocked
    212                 thread$ * fulfil( single_future(T) & this, T result ) {
     39                thread$ * fulfil( future(T) & this, T result ) {
    21340                        this.result = result;
    21441                        return fulfil( (future_t&)this );
     
    21744                // Wait for the future to be fulfilled
    21845                // Also return whether the thread had to block or not
    219                 [T, bool] wait( single_future(T) & this ) {
     46                [T, bool] wait( future(T) & this ) {
    22047                        bool r = wait( (future_t&)this );
    22148                        return [this.result, r];
     
    22350
    22451                // Wait for the future to be fulfilled
    225                 T wait( single_future(T) & this ) {
     52                T wait( future(T) & this ) {
    22653                        [T, bool] tt;
    22754                        tt = wait(this);
Note: See TracChangeset for help on using the changeset viewer.