Changeset ad861ef for libcfa


Ignore:
Timestamp:
Jan 20, 2023, 1:25:37 PM (3 years ago)
Author:
Peter A. Buhr <pabuhr@…>
Branches:
ADT, ast-experimental, master
Children:
79a6b17, cd5eb4b
Parents:
466787a (diff), a0d1f1c (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

Location:
libcfa
Files:
2 added
5 edited

Legend:

Unmodified
Added
Removed
  • libcfa/configure.ac

    r466787a rad861ef  
    122122AC_PROG_CC
    123123AM_PROG_AS
    124 AC_PROG_LIBTOOL
     124LT_INIT
    125125AC_PROG_INSTALL
    126126AC_PROG_MAKE_SET
     
    246246AC_CONFIG_HEADERS(prelude/defines.hfa)
    247247
    248 AC_OUTPUT()
     248AC_OUTPUT
    249249
    250250# Final text
  • libcfa/src/Makefile.am

    r466787a rad861ef  
    113113        concurrency/once.hfa \
    114114        concurrency/kernel/fwd.hfa \
    115         concurrency/mutex_stmt.hfa
     115        concurrency/mutex_stmt.hfa \
     116    concurrency/select.hfa \
     117    concurrency/channel.hfa
    116118
    117119inst_thread_headers_src = \
  • libcfa/src/concurrency/clib/cfathread.cfa

    r466787a rad861ef  
    439439        // Mutex
    440440        struct cfathread_mutex {
    441                 linear_backoff_then_block_lock impl;
     441                exp_backoff_then_block_lock impl;
    442442        };
    443443        int cfathread_mutex_init(cfathread_mutex_t *restrict mut, const cfathread_mutexattr_t *restrict) __attribute__((nonnull (1))) { *mut = new(); return 0; }
     
    454454        // Condition
    455455        struct cfathread_condition {
    456                 condition_variable(linear_backoff_then_block_lock) impl;
     456                condition_variable(exp_backoff_then_block_lock) impl;
    457457        };
    458458        int cfathread_cond_init(cfathread_cond_t *restrict cond, const cfathread_condattr_t *restrict) __attribute__((nonnull (1))) { *cond = new(); return 0; }
  • libcfa/src/concurrency/future.hfa

    r466787a rad861ef  
    55// file "LICENCE" distributed with Cforall.
    66//
    7 // io/types.hfa --
    8 //
    9 // Author           : Thierry Delisle & Peiran Hong
     7// concurrency/future.hfa --
     8//
     9// Author           : Thierry Delisle & Peiran Hong & Colby Parsons
    1010// Created On       : Wed Jan 06 17:33:18 2021
    1111// Last Modified By :
     
    1414//
    1515
    16 #pragma once
     16// #pragma once
    1717
    1818#include "bits/locks.hfa"
    1919#include "monitor.hfa"
    20 
     20#include "select.hfa"
     21
     22//----------------------------------------------------------------------------
     23// future
     24// I don't use future_t here since I need to use a lock for this future
     25//  since it supports multiple consumers
     26//  future_t is lockfree and uses atomics which aren't needed given we use locks here
    2127forall( T ) {
     28    // enum(int) { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 }; // Enums seem to be broken so feel free to add this back afterwards
     29
     30    // temporary enum replacement
     31    const int FUTURE_EMPTY = 0;
     32    const int FUTURE_FULFILLED = 1;
     33
    2234        struct future {
     35                int state;
     36                T result;
     37                dlist( select_node ) waiters;
     38        futex_mutex lock;
     39        };
     40
     41    struct future_node {
     42        inline select_node;
     43        T * my_result;
     44    };
     45
     46    // C_TODO: perhaps allow exceptions to be inserted like uC++?
     47
     48        static inline {
     49
     50        void ?{}( future_node(T) & this, thread$ * blocked_thread, T * my_result ) {
     51            ((select_node &)this){ blocked_thread };
     52            this.my_result = my_result;
     53        }
     54
     55        void ?{}(future(T) & this) {
     56                        this.waiters{};
     57            this.state = FUTURE_EMPTY;
     58            this.lock{};
     59                }
     60
     61                // Reset future back to original state
     62                void reset(future(T) & this) with(this)
     63        {
     64            lock( lock );
     65            if( ! waiters`isEmpty )
     66                abort("Attempting to reset a future with blocked waiters");
     67            state = FUTURE_EMPTY;
     68            unlock( lock );
     69        }
     70
     71                // check if the future is available
     72        // currently no mutual exclusion because I can't see when you need this call to be synchronous or protected
     73                bool available( future(T) & this ) { return this.state; }
     74
     75
     76        // memcpy wrapper to help copy values
     77        void copy_T( T & from, T & to ) {
     78            memcpy((void *)&to, (void *)&from, sizeof(T));
     79        }
     80
     81        // internal helper to signal waiters off of the future
     82        void _internal_flush( future(T) & this ) with(this) {
     83            while( ! waiters`isEmpty ) {
     84                select_node &s = try_pop_front( waiters );
     85
     86                if ( s.race_flag == 0p )
     87                    // poke in result so that woken threads do not need to reacquire any locks
     88                    // *(((future_node(T) &)s).my_result) = result;
     89                    copy_T( result, *(((future_node(T) &)s).my_result) );
     90                else if ( !install_select_winner( s, &this ) ) continue;
     91               
     92                // only unpark if future is not selected
     93                // or if it is selected we only unpark if we win the race
     94                unpark( s.blocked_thread );
     95            }
     96        }
     97
     98                // Fulfil the future, returns whether or not someone was unblocked
     99                bool fulfil( future(T) & this, T & val ) with(this) {
     100            lock( lock );
     101            if( state != FUTURE_EMPTY )
     102                abort("Attempting to fulfil a future that has already been fulfilled");
     103
     104            copy_T( val, result );
     105
     106            bool ret_val = ! waiters`isEmpty;
     107            state = FUTURE_FULFILLED;
     108                        _internal_flush( this );
     109            unlock( lock );
     110            return ret_val;
     111                }
     112
     113                // Wait for the future to be fulfilled
     114                // Also return whether the thread had to block or not
     115                [T, bool] get( future(T) & this ) with( this ) {
     116            lock( lock );
     117            T ret_val;
     118            if( state == FUTURE_FULFILLED ) {
     119                copy_T( result, ret_val );
     120                unlock( lock );
     121                return [ret_val, false];
     122            }
     123
     124            future_node(T) node = { active_thread(), &ret_val };
     125            insert_last( waiters, ((select_node &)node) );
     126            unlock( lock );
     127            park( );
     128
     129                        return [ret_val, true];
     130                }
     131
     132                // Wait for the future to be fulfilled
     133                T get( future(T) & this ) {
     134                        [T, bool] tt;
     135                        tt = get(this);
     136                        return tt.0;
     137                }
     138
     139        // Gets value if it is available and returns [ val, true ]
     140        // otherwise returns [ default_val, false]
     141        // will not block
     142        [T, bool] try_get( future(T) & this ) with(this) {
     143            lock( lock );
     144            T ret_val;
     145            if( state == FUTURE_FULFILLED ) {
     146                copy_T( result, ret_val );
     147                unlock( lock );
     148                return [ret_val, true];
     149            }
     150            unlock( lock );
     151           
     152            return [ret_val, false];
     153        }
     154
     155        void * register_select( future(T) & this, select_node & s ) with(this) {
     156            lock( lock );
     157
     158            // future not ready -> insert select node and return 0p
     159            if( state == FUTURE_EMPTY ) {
     160                insert_last( waiters, s );
     161                unlock( lock );
     162                return 0p;
     163            }
     164
     165            // future ready and we won race to install it as the select winner return 1p
     166            if ( install_select_winner( s, &this ) ) {
     167                unlock( lock );
     168                return 1p;
     169            }
     170
     171            unlock( lock );
     172            // future ready and we lost race to install it as the select winner
     173            return 2p;
     174        }
     175
     176        void unregister_select( future(T) & this, select_node & s ) with(this) {
     177            lock( lock );
     178            if ( s`isListed ) remove( s );
     179            unlock( lock );
     180        }
     181               
     182        }
     183}
     184
     185//--------------------------------------------------------------------------------------------------------
     186// These futures below do not support select statements so they may not be as useful as 'future'
     187//  however the 'single_future' is cheap and cheerful and is most likely more performant than 'future'
     188//  since it uses raw atomics and no locks afaik
     189//
     190// As far as 'multi_future' goes I can't see many use cases as it will be less performant than 'future'
     191//  since it is monitor based and also is not compatible with select statements
     192//--------------------------------------------------------------------------------------------------------
     193
     194forall( T ) {
     195        struct single_future {
    23196                inline future_t;
    24197                T result;
     
    27200        static inline {
    28201                // Reset future back to original state
    29                 void reset(future(T) & this) { reset( (future_t&)this ); }
     202                void reset(single_future(T) & this) { reset( (future_t&)this ); }
    30203
    31204                // check if the future is available
    32                 bool available( future(T) & this ) { return available( (future_t&)this ); }
     205                bool available( single_future(T) & this ) { return available( (future_t&)this ); }
    33206
    34207                // Mark the future as abandoned, meaning it will be deleted by the server
    35208                // This doesn't work beause of the potential need for a destructor
    36                 void abandon( future(T) & this );
     209                void abandon( single_future(T) & this );
    37210
    38211                // Fulfil the future, returns whether or not someone was unblocked
    39                 thread$ * fulfil( future(T) & this, T result ) {
     212                thread$ * fulfil( single_future(T) & this, T result ) {
    40213                        this.result = result;
    41214                        return fulfil( (future_t&)this );
     
    44217                // Wait for the future to be fulfilled
    45218                // Also return whether the thread had to block or not
    46                 [T, bool] wait( future(T) & this ) {
     219                [T, bool] wait( single_future(T) & this ) {
    47220                        bool r = wait( (future_t&)this );
    48221                        return [this.result, r];
     
    50223
    51224                // Wait for the future to be fulfilled
    52                 T wait( future(T) & this ) {
     225                T wait( single_future(T) & this ) {
    53226                        [T, bool] tt;
    54227                        tt = wait(this);
  • libcfa/src/concurrency/locks.hfa

    r466787a rad861ef  
    3838#include <unistd.h>
    3939
    40 // undef to make a number of the locks not reacquire upon waking from a condlock
    41 #define REACQ 1
     40// C_TODO: cleanup this and locks.cfa
     41// - appropriate separation of interface and impl
     42// - clean up unused/unneeded locks
     43// - change messy big blocking lock from inheritance to composition to remove need for flags
    4244
    4345//-----------------------------------------------------------------------------
     
    249251static inline void on_notify(clh_lock & this, struct thread$ * t ) { unpark(t); }
    250252static inline size_t on_wait(clh_lock & this) { unlock(this); return 0; }
    251 static inline void on_wakeup(clh_lock & this, size_t recursion ) {
    252         #ifdef REACQ
    253         lock(this);
    254         #endif
    255 }
    256 
    257 
    258 //-----------------------------------------------------------------------------
    259 // Linear backoff Spinlock
    260 struct linear_backoff_then_block_lock {
     253static inline void on_wakeup(clh_lock & this, size_t recursion ) { lock(this); }
     254
     255
     256//-----------------------------------------------------------------------------
     257// Exponential backoff then block lock
     258struct exp_backoff_then_block_lock {
    261259        // Spin lock used for mutual exclusion
    262260        __spinlock_t spinlock;
     
    269267};
    270268
    271 static inline void  ?{}( linear_backoff_then_block_lock & this ) {
     269static inline void  ?{}( exp_backoff_then_block_lock & this ) {
    272270        this.spinlock{};
    273271        this.blocked_threads{};
    274272        this.lock_value = 0;
    275273}
    276 static inline void ^?{}( linear_backoff_then_block_lock & this ) {}
    277 // static inline void ?{}( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void;
    278 // static inline void ?=?( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void;
    279 
    280 static inline bool internal_try_lock(linear_backoff_then_block_lock & this, size_t & compare_val) with(this) {
     274static inline void ^?{}( exp_backoff_then_block_lock & this ) {}
     275// static inline void ?{}( exp_backoff_then_block_lock & this, exp_backoff_then_block_lock this2 ) = void;
     276// static inline void ?=?( exp_backoff_then_block_lock & this, exp_backoff_then_block_lock this2 ) = void;
     277
     278static inline bool internal_try_lock(exp_backoff_then_block_lock & this, size_t & compare_val) with(this) {
    281279        if (__atomic_compare_exchange_n(&lock_value, &compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
    282280                return true;
     
    285283}
    286284
    287 static inline bool try_lock(linear_backoff_then_block_lock & this) { size_t compare_val = 0; return internal_try_lock(this, compare_val); }
    288 
    289 static inline bool try_lock_contention(linear_backoff_then_block_lock & this) with(this) {
     285static inline bool try_lock(exp_backoff_then_block_lock & this) { size_t compare_val = 0; return internal_try_lock(this, compare_val); }
     286
     287static inline bool try_lock_contention(exp_backoff_then_block_lock & this) with(this) {
    290288        if (__atomic_exchange_n(&lock_value, 2, __ATOMIC_ACQUIRE) == 0) {
    291289                return true;
     
    294292}
    295293
    296 static inline bool block(linear_backoff_then_block_lock & this) with(this) {
     294static inline bool block(exp_backoff_then_block_lock & this) with(this) {
    297295        lock( spinlock __cfaabi_dbg_ctx2 ); // TODO change to lockfree queue (MPSC)
    298296        if (lock_value != 2) {
     
    306304}
    307305
    308 static inline void lock(linear_backoff_then_block_lock & this) with(this) {
     306static inline void lock(exp_backoff_then_block_lock & this) with(this) {
    309307        size_t compare_val = 0;
    310308        int spin = 4;
     
    324322}
    325323
    326 static inline void unlock(linear_backoff_then_block_lock & this) with(this) {
     324static inline void unlock(exp_backoff_then_block_lock & this) with(this) {
    327325    if (__atomic_exchange_n(&lock_value, 0, __ATOMIC_RELEASE) == 1) return;
    328326        lock( spinlock __cfaabi_dbg_ctx2 );
     
    332330}
    333331
    334 static inline void on_notify(linear_backoff_then_block_lock & this, struct thread$ * t ) { unpark(t); }
    335 static inline size_t on_wait(linear_backoff_then_block_lock & this) { unlock(this); return 0; }
    336 static inline void on_wakeup(linear_backoff_then_block_lock & this, size_t recursion ) {
    337         #ifdef REACQ
    338         lock(this);
    339         #endif
    340 }
     332static inline void on_notify(exp_backoff_then_block_lock & this, struct thread$ * t ) { unpark(t); }
     333static inline size_t on_wait(exp_backoff_then_block_lock & this) { unlock(this); return 0; }
     334static inline void on_wakeup(exp_backoff_then_block_lock & this, size_t recursion ) { lock(this); }
    341335
    342336//-----------------------------------------------------------------------------
     
    390384
    391385static inline void on_notify(fast_block_lock & this, struct thread$ * t ) with(this) {
    392         #ifdef REACQ
    393                 lock( lock __cfaabi_dbg_ctx2 );
    394                 insert_last( blocked_threads, *t );
    395                 unlock( lock );
    396         #else
    397                 unpark(t);
    398         #endif
     386    lock( lock __cfaabi_dbg_ctx2 );
     387    insert_last( blocked_threads, *t );
     388    unlock( lock );
    399389}
    400390static inline size_t on_wait(fast_block_lock & this) { unlock(this); return 0; }
     
    553543}
    554544static inline size_t on_wait(spin_queue_lock & this) { unlock(this); return 0; }
    555 static inline void on_wakeup(spin_queue_lock & this, size_t recursion ) {
    556         #ifdef REACQ
    557         lock(this);
    558         #endif
    559 }
     545static inline void on_wakeup(spin_queue_lock & this, size_t recursion ) { lock(this); }
    560546
    561547
     
    598584static inline void on_notify(mcs_block_spin_lock & this, struct thread$ * t ) { unpark(t); }
    599585static inline size_t on_wait(mcs_block_spin_lock & this) { unlock(this); return 0; }
    600 static inline void on_wakeup(mcs_block_spin_lock & this, size_t recursion ) {
    601         #ifdef REACQ
    602         lock(this);
    603         #endif
    604 }
     586static inline void on_wakeup(mcs_block_spin_lock & this, size_t recursion ) {lock(this); }
    605587
    606588//-----------------------------------------------------------------------------
     
    640622
    641623static inline void on_notify(block_spin_lock & this, struct thread$ * t ) with(this.lock) {
    642   #ifdef REACQ
    643624        // first we acquire internal fast_block_lock
    644625        lock( lock __cfaabi_dbg_ctx2 );
     
    652633        unlock( lock );
    653634
    654   #endif
    655635        unpark(t);
    656        
    657636}
    658637static inline size_t on_wait(block_spin_lock & this) { unlock(this); return 0; }
    659638static inline void on_wakeup(block_spin_lock & this, size_t recursion ) with(this) {
    660   #ifdef REACQ
    661639        // now we acquire the entire block_spin_lock upon waking up
    662640        while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause();
    663641        __atomic_store_n(&held, true, __ATOMIC_RELEASE);
    664642        unlock( lock ); // Now we release the internal fast_spin_lock
    665   #endif
    666643}
    667644
Note: See TracChangeset for help on using the changeset viewer.