Ignore:
Timestamp:
Jun 12, 2023, 2:45:32 PM (2 years ago)
Author:
Fangren Yu <f37yu@…>
Branches:
ast-experimental, master
Children:
62d62db
Parents:
34b4268 (diff), 251ce80 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' into ast-experimental

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/future.hfa

    r34b4268 r24d6572  
    55// file "LICENCE" distributed with Cforall.
    66//
    7 // io/types.hfa --
    8 //
    9 // Author           : Thierry Delisle & Peiran Hong
     7// concurrency/future.hfa --
     8//
     9// Author           : Thierry Delisle & Peiran Hong & Colby Parsons
    1010// Created On       : Wed Jan 06 17:33:18 2021
    1111// Last Modified By :
     
    1818#include "bits/locks.hfa"
    1919#include "monitor.hfa"
    20 
     20#include "select.hfa"
     21#include "locks.hfa"
     22
     23//----------------------------------------------------------------------------
     24// future
     25// I don't use future_t here since I need to use a lock for this future
     26//  since it supports multiple consumers
     27//  future_t is lockfree and uses atomics which aren't needed given we use locks here
    2128forall( T ) {
     29    // enum { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 }; // Enums seem to be broken so feel free to add this back afterwards
     30
     31    // temporary enum replacement
     32    const int FUTURE_EMPTY = 0;
     33    const int FUTURE_FULFILLED = 1;
     34
    2235        struct future {
     36                int state;
     37                T result;
     38                dlist( select_node ) waiters;
     39        futex_mutex lock;
     40        };
     41
     42    struct future_node {
     43        inline select_node;
     44        T * my_result;
     45    };
     46
     47        static inline {
     48
     49        void ?{}( future_node(T) & this, thread$ * blocked_thread, T * my_result ) {
     50            ((select_node &)this){ blocked_thread };
     51            this.my_result = my_result;
     52        }
     53
     54        void ?{}( future(T) & this ) {
     55                        this.waiters{};
     56            this.state = FUTURE_EMPTY;
     57            this.lock{};
     58                }
     59
     60                // Reset future back to original state
     61                void reset( future(T) & this ) with(this)
     62        {
     63            lock( lock );
     64            if( ! waiters`isEmpty )
     65                abort("Attempting to reset a future with blocked waiters");
     66            state = FUTURE_EMPTY;
     67            unlock( lock );
     68        }
     69
     70                // check if the future is available
     71        // currently no mutual exclusion because I can't see when you need this call to be synchronous or protected
     72                bool available( future(T) & this ) { return __atomic_load_n( &this.state, __ATOMIC_RELAXED ); }
     73
     74
     75        // memcpy wrapper to help copy values
     76        void copy_T( T & from, T & to ) {
     77            memcpy((void *)&to, (void *)&from, sizeof(T));
     78        }
     79
     80        // internal helper to signal waiters off of the future
     81        void _internal_flush( future(T) & this ) with(this) {
     82            while( ! waiters`isEmpty ) {
     83                if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case
     84                    break; // if handle_OR returns false then waiters is empty so break
     85                select_node &s = try_pop_front( waiters );
     86
     87                if ( s.clause_status == 0p ) // poke in result so that woken threads do not need to reacquire any locks
     88                    copy_T( result, *(((future_node(T) &)s).my_result) );
     89               
     90                wake_one( waiters, s );
     91            }
     92        }
     93
     94                // Fulfil the future, returns whether or not someone was unblocked
     95                bool fulfil( future(T) & this, T val ) with(this) {
     96            lock( lock );
     97            if( state != FUTURE_EMPTY )
     98                abort("Attempting to fulfil a future that has already been fulfilled");
     99
     100            copy_T( val, result );
     101
     102            bool ret_val = ! waiters`isEmpty;
     103            state = FUTURE_FULFILLED;
     104                        _internal_flush( this );
     105            unlock( lock );
     106            return ret_val;
     107                }
     108
     109                // Wait for the future to be fulfilled
     110                // Also return whether the thread had to block or not
     111                [T, bool] get( future(T) & this ) with( this ) {
     112            lock( lock );
     113            T ret_val;
     114            if( state == FUTURE_FULFILLED ) {
     115                copy_T( result, ret_val );
     116                unlock( lock );
     117                return [ret_val, false];
     118            }
     119
     120            future_node(T) node = { active_thread(), &ret_val };
     121            insert_last( waiters, ((select_node &)node) );
     122            unlock( lock );
     123            park( );
     124
     125                        return [ret_val, true];
     126                }
     127
     128                // Wait for the future to be fulfilled
     129                T get( future(T) & this ) {
     130                        [T, bool] tt;
     131                        tt = get(this);
     132                        return tt.0;
     133                }
     134
     135        // Gets value if it is available and returns [ val, true ]
     136        // otherwise returns [ default_val, false]
     137        // will not block
     138        [T, bool] try_get( future(T) & this ) with(this) {
     139            lock( lock );
     140            T ret_val;
     141            if( state == FUTURE_FULFILLED ) {
     142                copy_T( result, ret_val );
     143                unlock( lock );
     144                return [ret_val, true];
     145            }
     146            unlock( lock );
     147           
     148            return [ret_val, false];
     149        }
     150
     151        bool register_select( future(T) & this, select_node & s ) with(this) {
     152            lock( lock );
     153
     154            // check if we can complete operation. If so race to establish winner in special OR case
     155            if ( !s.park_counter && state != FUTURE_EMPTY ) {
     156                if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering
     157                    unlock( lock );
     158                    return false;
     159                }
     160            }
     161
     162            // future not ready -> insert select node and return
     163            if( state == FUTURE_EMPTY ) {
     164                insert_last( waiters, s );
     165                unlock( lock );
     166                return false;
     167            }
     168
     169            __make_select_node_available( s );
     170            unlock( lock );
     171            return true;
     172        }
     173
     174        bool unregister_select( future(T) & this, select_node & s ) with(this) {
     175            if ( ! s`isListed ) return false;
     176            lock( lock );
     177            if ( s`isListed ) remove( s );
     178            unlock( lock );
     179            return false;
     180        }
     181               
     182        void on_selected( future(T) & this, select_node & node ) {}
     183        }
     184}
     185
     186//--------------------------------------------------------------------------------------------------------
     187// These futures below do not support select statements so they may not have as many features as 'future'
     188//  however the 'single_future' is cheap and cheerful and is most likely more performant than 'future'
     189//  since it uses raw atomics and no locks
     190//
     191// As far as 'multi_future' goes I can't see many use cases as it will be less performant than 'future'
     192//  since it is monitor based and also is not compatible with select statements
     193//--------------------------------------------------------------------------------------------------------
     194
     195forall( T ) {
     196        struct single_future {
    23197                inline future_t;
    24198                T result;
     
    27201        static inline {
    28202                // Reset future back to original state
    29                 void reset(future(T) & this) { reset( (future_t&)this ); }
     203                void reset(single_future(T) & this) { reset( (future_t&)this ); }
    30204
    31205                // check if the future is available
    32                 bool available( future(T) & this ) { return available( (future_t&)this ); }
     206                bool available( single_future(T) & this ) { return available( (future_t&)this ); }
    33207
    34208                // Mark the future as abandoned, meaning it will be deleted by the server
    35209                // This doesn't work beause of the potential need for a destructor
    36                 void abandon( future(T) & this );
     210                void abandon( single_future(T) & this );
    37211
    38212                // Fulfil the future, returns whether or not someone was unblocked
    39                 thread$ * fulfil( future(T) & this, T result ) {
     213                thread$ * fulfil( single_future(T) & this, T result ) {
    40214                        this.result = result;
    41215                        return fulfil( (future_t&)this );
     
    44218                // Wait for the future to be fulfilled
    45219                // Also return whether the thread had to block or not
    46                 [T, bool] wait( future(T) & this ) {
     220                [T, bool] wait( single_future(T) & this ) {
    47221                        bool r = wait( (future_t&)this );
    48222                        return [this.result, r];
     
    50224
    51225                // Wait for the future to be fulfilled
    52                 T wait( future(T) & this ) {
     226                T wait( single_future(T) & this ) {
    53227                        [T, bool] tt;
    54228                        tt = wait(this);
Note: See TracChangeset for help on using the changeset viewer.