Changeset 339e30a for libcfa/src


Ignore:
Timestamp:
Jan 6, 2023, 1:57:36 PM (2 years ago)
Author:
caparsons <caparson@…>
Branches:
ADT, ast-experimental, master
Children:
7eac70e
Parents:
d99a716
Message:

added new future and added rudimentary select statement support for it

Location:
libcfa/src/concurrency
Files:
1 added
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/future.hfa

    rd99a716 r339e30a  
    55// file "LICENCE" distributed with Cforall.
    66//
    7 // io/types.hfa --
    8 //
    9 // Author           : Thierry Delisle & Peiran Hong
     7// concurrency/future.hfa --
     8//
     9// Author           : Thierry Delisle & Peiran Hong & Colby Parsons
    1010// Created On       : Wed Jan 06 17:33:18 2021
    1111// Last Modified By :
     
    1414//
    1515
    16 #pragma once
     16// #pragma once
    1717
    1818#include "bits/locks.hfa"
    1919#include "monitor.hfa"
    20 
     20#include "select.hfa"
     21
     22//----------------------------------------------------------------------------
     23// future
     24// I don't use future_t here since I need to use a lock for this future
     25//  since it supports multiple consumers
     26//  future_t is lockfree and uses atomics which aren't needed given we use locks here
    2127forall( T ) {
     28    // enum(int) { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 }; // Enums seem to be broken so feel free to add this back afterwards
     29
     30    // temporary enum replacement
     31    const int FUTURE_EMPTY = 0;
     32    const int FUTURE_FULFILLED = 1;
     33
    2234        struct future {
     35                int state;
     36                T result;
     37                dlist( select_node ) waiters;
     38        futex_mutex lock;
     39        };
     40
     41    struct future_node {
     42        inline select_node;
     43        T * my_result;
     44    };
     45
     46    // C_TODO: perhaps allow exceptions to be inserted like uC++?
     47
     48        static inline {
     49
     50        void ?{}( future_node(T) & this, thread$ * blocked_thread, T * my_result ) {
     51            ((select_node &)this){ blocked_thread };
     52            this.my_result = my_result;
     53        }
     54
     55        void ?{}(future(T) & this) {
     56                        this.waiters{};
     57            this.state = FUTURE_EMPTY;
     58                }
     59
     60                // Reset future back to original state
     61                void reset(future(T) & this) with(this)
     62        {
     63            lock( lock );
     64            if( ! waiters`isEmpty )
     65                abort("Attempting to reset a future with blocked waiters");
     66            state = FUTURE_EMPTY;
     67            unlock( lock );
     68        }
     69
     70                // check if the future is available
     71        // currently no mutual exclusion because I can't see when you need this call to be synchronous or protected
     72                bool available( future(T) & this ) { return this.state; }
     73
     74
     75        // memcpy wrapper to help copy values
     76        void copy_T( T & from, T & to ) {
     77            memcpy((void *)&to, (void *)&from, sizeof(T));
     78        }
     79
     80        // internal helper to signal waiters off of the future
     81        void _internal_flush( future(T) & this ) with(this) {
     82            while( ! waiters`isEmpty ) {
     83                select_node &s = try_pop_front( waiters );
     84
     85                if ( s.race_flag == 0p )
     86                    // poke in result so that woken threads do not need to reacquire any locks
     87                    // *(((future_node(T) &)s).my_result) = result;
     88                    copy_T( result, *(((future_node(T) &)s).my_result) );
     89                else if ( !install_select_winner( s, &this ) ) continue;
     90               
     91                // only unpark if future is not selected
     92                // or if it is selected we only unpark if we win the race
     93                unpark( s.blocked_thread );
     94            }
     95        }
     96
     97                // Fulfil the future, returns whether or not someone was unblocked
     98                bool fulfil( future(T) & this, T & val ) with(this) {
     99            lock( lock );
     100            if( state != FUTURE_EMPTY )
     101                abort("Attempting to fulfil a future that has already been fulfilled");
     102
     103            copy_T( val, result );
     104
     105            bool ret_val = ! waiters`isEmpty;
     106            state = FUTURE_FULFILLED;
     107                        _internal_flush( this );
     108            unlock( lock );
     109            return ret_val;
     110                }
     111
     112                // Wait for the future to be fulfilled
     113                // Also return whether the thread had to block or not
     114                [T, bool] get( future(T) & this ) with( this ) {
     115            lock( lock );
     116            T ret_val;
     117            if( state == FUTURE_FULFILLED ) {
     118                copy_T( result, ret_val );
     119                unlock( lock );
     120                return [ret_val, false];
     121            }
     122
     123            future_node(T) node = { active_thread(), &ret_val };
     124            insert_last( waiters, ((select_node &)node) );
     125            unlock( lock );
     126            park( );
     127
     128                        return [ret_val, true];
     129                }
     130
     131                // Wait for the future to be fulfilled
     132                T get( future(T) & this ) {
     133                        [T, bool] tt;
     134                        tt = get(this);
     135                        return tt.0;
     136                }
     137
     138        // Gets value if it is available and returns [ val, true ]
     139        // otherwise returns [ default_val, false]
     140        // will not block
     141        [T, bool] try_get( future(T) & this ) with(this) {
     142            lock( lock );
     143            T ret_val;
     144            if( state == FUTURE_FULFILLED ) {
     145                copy_T( result, ret_val );
     146                unlock( lock );
     147                return [ret_val, true];
     148            }
     149            unlock( lock );
     150            // cast to (T *) needed to trick the resolver to let me return *0p
     151            return [ret_val, false];
     152        }
     153
     154        void * register_select( future(T) & this, select_node & s ) with(this) {
     155            lock( lock );
     156
     157            // future not ready -> insert select node and return 0p
     158            if( state == FUTURE_EMPTY ) {
     159                insert_last( waiters, s );
     160                unlock( lock );
     161                return 0p;
     162            }
     163
     164            // future ready and we won race to install it as the select winner return 1p
     165            if ( install_select_winner( s, &this ) ) {
     166                unlock( lock );
     167                return 1p;
     168            }
     169
     170            unlock( lock );
     171            // future ready and we lost race to install it as the select winner
     172            return 2p;
     173        }
     174
     175        void unregister_select( future(T) & this, select_node & s ) with(this) {
     176            lock( lock );
     177            if ( s`isListed ) remove( s );
     178            unlock( lock );
     179        }
     180               
     181        }
     182}
     183
     184//--------------------------------------------------------------------------------------------------------
     185// These futures below do not support select statements so they may not be as useful as 'future'
     186//  however the 'single_future' is cheap and cheerful and is most likely more performant than 'future'
     187//  since it uses raw atomics and no locks afaik
     188//
     189// As far as 'multi_future' goes I can't see many use cases as it will be less performant than 'future'
     190//  since it is monitor based and also is not compatible with select statements
     191//--------------------------------------------------------------------------------------------------------
     192
     193forall( T ) {
     194        struct single_future {
    23195                inline future_t;
    24196                T result;
     
    27199        static inline {
    28200                // Reset future back to original state
    29                 void reset(future(T) & this) { reset( (future_t&)this ); }
     201                void reset(single_future(T) & this) { reset( (future_t&)this ); }
    30202
    31203                // check if the future is available
    32                 bool available( future(T) & this ) { return available( (future_t&)this ); }
     204                bool available( single_future(T) & this ) { return available( (future_t&)this ); }
    33205
    34206                // Mark the future as abandoned, meaning it will be deleted by the server
    35207                // This doesn't work beause of the potential need for a destructor
    36                 void abandon( future(T) & this );
     208                void abandon( single_future(T) & this );
    37209
    38210                // Fulfil the future, returns whether or not someone was unblocked
    39                 thread$ * fulfil( future(T) & this, T result ) {
     211                thread$ * fulfil( single_future(T) & this, T result ) {
    40212                        this.result = result;
    41213                        return fulfil( (future_t&)this );
     
    44216                // Wait for the future to be fulfilled
    45217                // Also return whether the thread had to block or not
    46                 [T, bool] wait( future(T) & this ) {
     218                [T, bool] wait( single_future(T) & this ) {
    47219                        bool r = wait( (future_t&)this );
    48220                        return [this.result, r];
     
    50222
    51223                // Wait for the future to be fulfilled
    52                 T wait( future(T) & this ) {
     224                T wait( single_future(T) & this ) {
    53225                        [T, bool] tt;
    54226                        tt = wait(this);
Note: See TracChangeset for help on using the changeset viewer.