Ignore:
Timestamp:
Oct 13, 2023, 7:13:21 PM (2 years ago)
Author:
JiadaL <j82liang@…>
Branches:
master
Children:
a97b9ed, bab2917
Parents:
85034ed (diff), 0bf0b978 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

Location:
libcfa/src/concurrency
Files:
6 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/channel.hfa

    r85034ed r8cbe732  
    130130static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) {
    131131    memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work
     132    __atomic_thread_fence( __ATOMIC_SEQ_CST );
    132133    wake_one( cons );
    133134}
     
    136137static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) {
    137138    memcpy( (void *)&retval, prods`first.extra, sizeof(T) );
     139    __atomic_thread_fence( __ATOMIC_SEQ_CST );
    138140    wake_one( prods );
    139141}
  • libcfa/src/concurrency/cofor.cfa

    r85034ed r8cbe732  
    44// cofor ( uC++ COFOR )
    55
    6 thread co_runner {
     6thread cofor_runner {
    77        ssize_t low, high;
    88        __cofor_body_t loop_body;
    99};
    1010
    11 static void ?{}( co_runner & this, ssize_t low, ssize_t high, __cofor_body_t loop_body ) {
     11static void ?{}( cofor_runner & this, ssize_t low, ssize_t high, __cofor_body_t loop_body ) {
    1212        this.low = low;
    1313        this.high = high;
     
    1515}
    1616
    17 void main( co_runner & this ) with( this ) {
     17void main( cofor_runner & this ) with( this ) {
    1818        for ( ssize_t i = low; i < high; i++ )
    1919                loop_body(i);
    2020}
    2121
    22 void cofor( ssize_t low, ssize_t high, __cofor_body_t loop_body ) libcfa_public {
     22void __Cofor__( ssize_t low, ssize_t high, __cofor_body_t loop_body ) libcfa_public {
    2323        ssize_t range = high - low;
    2424  if ( range <= 0 ) return;
     
    2929        ssize_t i = 0;
    3030        ssize_t stride_iter = low;
    31         co_runner * runners[ threads ];
     31        cofor_runner * runners[ threads ];
    3232        for ( i; threads ) {
    3333                runners[i] = alloc();
     
    4545}
    4646
    47 //////////////////////////////////////////////////////////////////////////////////////////
    48 // parallel (COBEGIN/COEND)
    4947
    50 thread para_runner {
    51         parallel_stmt_t body;
    52         void * arg;
    53 };
    54 
    55 static void ?{}( para_runner & this, parallel_stmt_t body, void * arg ) {
    56         this.body = body;
    57         this.arg = arg;
    58 }
    59 
    60 void main( para_runner & this ) with( this ) { body( arg ); }
    61 
    62 void parallel( parallel_stmt_t * stmts, void ** args, size_t num ) libcfa_public {
    63         para_runner * runners[ num ];
    64         for ( i; num )
    65                 (*(runners[i] = malloc())){ stmts[i], args[i] };
    66         for ( i; num )
    67                 delete( runners[i] );
    68 }
    69 
  • libcfa/src/concurrency/cofor.hfa

    r85034ed r8cbe732  
    55typedef void (*__cofor_body_t)( ssize_t );
    66
    7 void cofor( ssize_t low, ssize_t high, __cofor_body_t loop_body );
     7void __Cofor__( ssize_t low, ssize_t high, __cofor_body_t loop_body );
    88
    99#define COFOR( lidname, low, high, loopbody ) \
     
    1212                        loopbody \
    1313                } \
    14                 cofor( low, high, __CFA_loopLambda__ ); \
     14                __Cofor__( low, high, __CFA_loopLambda__ ); \
    1515        }
    1616
    1717//////////////////////////////////////////////////////////////////////////////////////////
    18 // parallel (COBEGIN/COEND)
    19 typedef void (*parallel_stmt_t)( void * );
     18// corun
    2019
    21 void parallel( parallel_stmt_t * stmts, void ** args, size_t num );
     20//
     21typedef void (*__CFA_corun_lambda_t)( void );
     22
     23// used to run a corun statement in parallel
     24thread co_runner {
     25        __CFA_corun_lambda_t body;
     26};
     27
     28// wraps a co_runner to provide RAII deallocation
     29struct runner_block {
     30    co_runner * runner;
     31};
     32static inline void ?{}( co_runner & this, __CFA_corun_lambda_t body ) { this.body = body; }
     33
     34void main( co_runner & this ) with( this ) { body(); }
     35
     36static inline void ?{}( runner_block & this ) {}
     37static inline void ?{}( runner_block & this, __CFA_corun_lambda_t body ) {
     38    (*(this.runner = malloc())){ body };
     39}
     40
     41static inline void ^?{}( runner_block & this ) {
     42    delete( this.runner );
     43}
     44
  • libcfa/src/concurrency/coroutine.cfa

    r85034ed r8cbe732  
    343343
    344344bool poll() libcfa_public { return poll( active_coroutine() ); }
     345void enable_ehm() libcfa_public { active_coroutine()->ehm_state.ehm_enabled = true; }
     346void disable_ehm() libcfa_public { active_coroutine()->ehm_state.ehm_enabled = false; }
     347bool checked_poll() libcfa_public { return active_coroutine()->ehm_state.ehm_enabled ? poll( active_coroutine() ) : false; }
    345348coroutine$ * resumer() libcfa_public { return active_coroutine()->last; }
    346349coroutine$ * first_resumer() libcfa_public { return active_coroutine()->starter; }
  • libcfa/src/concurrency/coroutine.hfa

    r85034ed r8cbe732  
    224224
    225225// non local ehm and coroutine utility routines
     226void enable_ehm();
     227void disable_ehm();
    226228bool poll( coroutine$ * cor );
    227229bool poll();
     230bool checked_poll();
    228231coroutine$ * resumer();
    229232coroutine$ * first_resumer();
    230233
    231234forall(T & | is_coroutine(T)) {
    232     void enable_ehm( T & cor );
    233     void disable_ehm( T & cor );
     235    void enable_ehm( T & cor );         // enable checking non-local exceptions for cor via checked_poll
     236    void disable_ehm( T & cor );        // disable checking non-local exceptions for cor via checked_poll
    234237    bool poll( T & cor );
    235     bool checked_poll( T & cor );
     238    bool checked_poll( T & cor );       // check for non-local exceptions while respecting enable/disable
    236239    coroutine$ * resumer( T & cor );
    237240    coroutine$ * first_resumer( T & cor );
  • libcfa/src/concurrency/kernel/fwd.hfa

    r85034ed r8cbe732  
    118118                // Yield: yield N times
    119119                static inline void yield( size_t times ) {
    120                         for( times ) {
     120                        for ( times ) {
    121121                                yield();
    122122                        }
     
    136136
    137137                        bool wait(single_sem & this) {
    138                                 for() {
     138                                for () {
    139139                                        struct thread$ * expected = this.ptr;
    140                                         if(expected == 1p) {
    141                                                 if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     140                                        if (expected == 1p) {
     141                                                if (__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
    142142                                                        return false;
    143143                                                }
     
    145145                                        else {
    146146                                                /* paranoid */ verify( expected == 0p );
    147                                                 if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     147                                                if (__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
    148148                                                        park();
    149149                                                        return true;
     
    155155
    156156                        bool post(single_sem & this) {
    157                                 for() {
     157                                for () {
    158158                                        struct thread$ * expected = this.ptr;
    159                                         if(expected == 1p) return false;
    160                                         if(expected == 0p) {
    161                                                 if(__atomic_compare_exchange_n(&this.ptr, &expected, 1p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     159                                        if (expected == 1p) return false;
     160                                        if (expected == 0p) {
     161                                                if (__atomic_compare_exchange_n(&this.ptr, &expected, 1p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
    162162                                                        return false;
    163163                                                }
    164164                                        }
    165165                                        else {
    166                                                 if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     166                                                if (__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
    167167                                                        unpark( expected );
    168168                                                        return true;
     
    195195                        // return true if the thread was parked
    196196                        bool wait(oneshot & this) {
    197                                 for() {
     197                                for () {
    198198                                        struct thread$ * expected = this.ptr;
    199                                         if(expected == oneshot_FULFILLED) return false;
    200                                         if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     199                                        if (expected == oneshot_FULFILLED) return false;
     200                                        if (__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
    201201                                                park();
    202202                                                /* paranoid */ verify( this.ptr == oneshot_FULFILLED );
     
    210210                        thread$ * post(oneshot & this, bool do_unpark = true) {
    211211                                struct thread$ * got = __atomic_exchange_n( &this.ptr, oneshot_FULFILLED, __ATOMIC_SEQ_CST);
    212                                 if( got == oneshot_ARMED || got == oneshot_FULFILLED ) return 0p;
    213                                 if(do_unpark) unpark( got );
     212                                if ( got == oneshot_ARMED || got == oneshot_FULFILLED ) return 0p;
     213                                if (do_unpark) unpark( got );
    214214                                return got;
    215215                        }
     
    255255                                /* paranoid */ verify( wait_ctx.ptr == oneshot_ARMED || wait_ctx.ptr == oneshot_FULFILLED );
    256256                                // The future needs to set the wait context
    257                                 for() {
     257                                for () {
    258258                                        struct oneshot * expected = this.ptr;
    259259                                        // Is the future already fulfilled?
    260                                         if(expected == future_FULFILLED) return false; // Yes, just return false (didn't block)
     260                                        if (expected == future_FULFILLED) return false; // Yes, just return false (didn't block)
    261261
    262262                                        // The future is not fulfilled, try to setup the wait context
    263                                         if(__atomic_compare_exchange_n(&this.ptr, &expected, &wait_ctx, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     263                                        if (__atomic_compare_exchange_n(&this.ptr, &expected, &wait_ctx, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
    264264                                                return true;
    265265                                        }
     
    276276
    277277                                // attempt to remove the context so it doesn't get consumed.
    278                                 if(__atomic_compare_exchange_n( &this.ptr, &expected, future_ARMED, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     278                                if (__atomic_compare_exchange_n( &this.ptr, &expected, future_ARMED, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
    279279                                        // we still have the original context, then no one else saw it
    280280                                        return false;
     
    282282
    283283                                // expected == ARMED: future was never actually setup, just return
    284                                 if( expected == future_ARMED ) return false;
     284                                if ( expected == future_ARMED ) return false;
    285285
    286286                                // expected == FULFILLED: the future is ready and the context was fully consumed
    287287                                // the server won't use the pointer again
    288288                                // It is safe to delete (which could happen after the return)
    289                                 if( expected == future_FULFILLED ) return true;
     289                                if ( expected == future_FULFILLED ) return true;
    290290
    291291                                // expected == PROGRESS: the future is ready but the context hasn't fully been consumed
    292292                                // spin until it is safe to move on
    293                                 if( expected == future_PROGRESS ) {
     293                                if ( expected == future_PROGRESS ) {
    294294                                        while( this.ptr != future_FULFILLED ) Pause();
    295295                                        /* paranoid */ verify( this.ptr == future_FULFILLED );
     
    310310
    311311                                // If the future isn't already fulfilled, let the server delete it
    312                                 if( got == future_ARMED ) return false;
     312                                if ( got == future_ARMED ) return false;
    313313
    314314                                // got == PROGRESS: the future is ready but the context hasn't fully been consumed
    315315                                // spin until it is safe to move on
    316                                 if( got == future_PROGRESS ) {
     316                                if ( got == future_PROGRESS ) {
    317317                                        while( this.ptr != future_FULFILLED ) Pause();
    318318                                        got = future_FULFILLED;
     
    327327                        // from the server side, mark the future as fulfilled
    328328                        // delete it if needed
     329
    329330                        thread$ * fulfil( future_t & this, bool do_unpark = true  ) {
    330                                 for() {
     331                                for () {
    331332                                        struct oneshot * expected = this.ptr;
    332                                         // was this abandoned?
     333
    333334                                        #if defined(__GNUC__) && __GNUC__ >= 7
    334                                                 #pragma GCC diagnostic push
    335                                                 #pragma GCC diagnostic ignored "-Wfree-nonheap-object"
     335                                        // SKULLDUGGERY: gcc bug does not handle push/pop for -Wfree-nonheap-object
     336                                        //#pragma GCC diagnostic push
     337                                        #pragma GCC diagnostic ignored "-Wfree-nonheap-object"
    336338                                        #endif
    337                                                 if( expected == future_ABANDONED ) { free( &this ); return 0p; }
     339
     340                                        if ( expected == future_ABANDONED ) { free( &this ); return 0p; }
     341
    338342                                        #if defined(__GNUC__) && __GNUC__ >= 7
    339                                                 #pragma GCC diagnostic pop
     343                                        //#pragma GCC diagnostic pop
    340344                                        #endif
    341345
     
    346350                                        // If there is no context then we can skip the in progress phase
    347351                                        struct oneshot * want = expected == future_ARMED ? future_FULFILLED : future_PROGRESS;
    348                                         if(__atomic_compare_exchange_n(&this.ptr, &expected, want, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
    349                                                 if( expected == future_ARMED ) { return 0p; }
     352                                        if (__atomic_compare_exchange_n(&this.ptr, &expected, want, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     353                                                if ( expected == future_ARMED ) { return 0p; }
    350354                                                thread$ * ret = post( *expected, do_unpark );
    351355                                                __atomic_store_n( &this.ptr, future_FULFILLED, __ATOMIC_SEQ_CST);
     
    359363                        bool wait( future_t & this ) {
    360364                                oneshot temp;
    361                                 if( !setup(this, temp) ) return false;
     365                                if ( !setup(this, temp) ) return false;
    362366
    363367                                // Wait context is setup, just wait on it
     
    387391                                // if any are already satisfied return
    388392                                for ( i; num_futures ) {
    389                                         if( !setup(futures[i], temp) ) return futures[i];
     393                                        if ( !setup(futures[i], temp) ) return futures[i];
    390394                                }
    391395
     
    413417
    414418                        #define __STATS__(in_kernel, ...) { \
    415                                 if( !(in_kernel) ) disable_interrupts(); \
    416                                 with( *__tls_stats() ) { \
     419                                if ( !(in_kernel) ) disable_interrupts(); \
     420                                with ( *__tls_stats() ) { \
    417421                                        __VA_ARGS__ \
    418422                                } \
    419                                 if( !(in_kernel) ) enable_interrupts(); \
     423                                if ( !(in_kernel) ) enable_interrupts(); \
    420424                        }
    421425                        #if defined(CFA_HAVE_LINUX_IO_URING_H)
    422426                                #define __IO_STATS__(in_kernel, ...) { \
    423                                         if( !(in_kernel) ) disable_interrupts(); \
    424                                         with( *__tls_stats() ) { \
     427                                        if ( !(in_kernel) ) disable_interrupts(); \
     428                                        with ( *__tls_stats() ) { \
    425429                                                __VA_ARGS__ \
    426430                                        } \
    427                                         if( !(in_kernel) ) enable_interrupts(); \
     431                                        if ( !(in_kernel) ) enable_interrupts(); \
    428432                                }
    429433                        #else
     
    436440        }
    437441}
    438 #endif
     442#endif // #endif
Note: See TracChangeset for help on using the changeset viewer.