Ignore:
Timestamp:
Jun 12, 2023, 12:05:58 PM (3 years ago)
Author:
JiadaL <j82liang@…>
Branches:
master
Children:
fec8bd1
Parents:
2b78949 (diff), 38e266ca (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

Location:
libcfa/src/concurrency
Files:
1 added
7 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/actor.hfa

    r2b78949 r8a930c03  
    1313#endif // CFA_DEBUG
    1414
     15#define DEBUG_ABORT( cond, string ) CFA_DEBUG( if ( cond ) abort( string ) )
     16
    1517// Define the default number of processors created in the executor. Must be greater than 0.
    1618#define __DEFAULT_EXECUTOR_PROCESSORS__ 2
     
    4244struct executor;
    4345
    44 enum Allocation { Nodelete, Delete, Destroy, Finished }; // allocation status
    45 
    46 typedef Allocation (*__receive_fn)(actor &, message &);
     46enum allocation { Nodelete, Delete, Destroy, Finished }; // allocation status
     47
     48typedef allocation (*__receive_fn)(actor &, message &);
    4749struct request {
    4850    actor * receiver;
     
    393395struct actor {
    394396    size_t ticket;                                          // executor-queue handle
    395     Allocation allocation_;                                         // allocation action
     397    allocation allocation_;                                         // allocation action
    396398    inline virtual_dtor;
    397399};
     
    400402    // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive
    401403    // member must be called to end it
    402     verifyf( __actor_executor_, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" );
     404    DEBUG_ABORT( __actor_executor_ == 0p, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" );
    403405    allocation_ = Nodelete;
    404406    ticket = __get_next_ticket( *__actor_executor_ );
     
    430432
    431433struct message {
    432     Allocation allocation_;                     // allocation action
     434    allocation allocation_;                     // allocation action
    433435    inline virtual_dtor;
    434436};
     
    437439    this.allocation_ = Nodelete;
    438440}
    439 static inline void ?{}( message & this, Allocation allocation ) {
    440     memcpy( &this.allocation_, &allocation, sizeof(allocation) ); // optimization to elide ctor
    441     verifyf( this.allocation_ != Finished, "The Finished Allocation status is not supported for message types.\n");
     441static inline void ?{}( message & this, allocation alloc ) {
     442    memcpy( &this.allocation_, &alloc, sizeof(allocation) ); // optimization to elide ctor
     443    DEBUG_ABORT( this.allocation_ == Finished, "The Finished allocation status is not supported for message types.\n" );
    442444}
    443445static inline void ^?{}( message & this ) with(this) {
     
    453455    } // switch
    454456}
    455 static inline void set_allocation( message & this, Allocation state ) {
     457static inline void set_allocation( message & this, allocation state ) {
    456458    this.allocation_ = state;
    457459}
    458460
    459461static inline void deliver_request( request & this ) {
     462    DEBUG_ABORT( this.receiver->ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );
    460463    this.receiver->allocation_ = this.fn( *this.receiver, *this.msg );
    461464    check_message( *this.msg );
     
    631634
    632635static inline void send( actor & this, request & req ) {
    633     verifyf( this.ticket != (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );
     636    DEBUG_ABORT( this.ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );
    634637    send( *__actor_executor_, req, this.ticket );
    635638}
     
    680683// assigned at creation to __base_msg_finished to avoid unused message warning
    681684message __base_msg_finished @= { .allocation_ : Finished };
    682 struct __DeleteMsg { inline message; } DeleteMsg = __base_msg_finished;
    683 struct __DestroyMsg { inline message; } DestroyMsg = __base_msg_finished;
    684 struct __FinishedMsg { inline message; } FinishedMsg = __base_msg_finished;
    685 
    686 Allocation receive( actor & this, __DeleteMsg & msg ) { return Delete; }
    687 Allocation receive( actor & this, __DestroyMsg & msg ) { return Destroy; }
    688 Allocation receive( actor & this, __FinishedMsg & msg ) { return Finished; }
    689 
     685struct __delete_msg_t { inline message; } delete_msg = __base_msg_finished;
     686struct __destroy_msg_t { inline message; } destroy_msg = __base_msg_finished;
     687struct __finished_msg_t { inline message; } finished_msg = __base_msg_finished;
     688
     689allocation receive( actor & this, __delete_msg_t & msg ) { return Delete; }
     690allocation receive( actor & this, __destroy_msg_t & msg ) { return Destroy; }
     691allocation receive( actor & this, __finished_msg_t & msg ) { return Finished; }
     692
  • libcfa/src/concurrency/channel.hfa

    r2b78949 r8a930c03  
    5151vtable(channel_closed) channel_closed_vt;
    5252
     53static inline bool is_insert( channel_closed & e ) { return e.elem != 0p; }
     54static inline bool is_remove( channel_closed & e ) { return e.elem == 0p; }
     55
    5356// #define CHAN_STATS // define this to get channel stats printed in dtor
    5457
     
    341344}
    342345
     346// special case of __handle_waituntil_OR, that does some work to avoid starvation/deadlock case
     347static inline bool __handle_pending( dlist( select_node ) & queue, select_node & mine ) {
     348    while ( !queue`isEmpty ) {
     349        // if node not a special OR case or if we win the special OR case race break
     350        if ( !queue`first.clause_status || queue`first.park_counter || __pending_set_other( queue`first, mine, ((unsigned long int)(&(queue`first))) ) )
     351            return true;
     352       
     353        // our node lost the race when toggling in __pending_set_other
     354        if ( *mine.clause_status != __SELECT_PENDING )
     355            return false;
     356
     357        // otherwise we lost the special OR race so discard node
     358        try_pop_front( queue );
     359    }
     360    return false;
     361}
     362
    343363// type used by select statement to capture a chan read as the selected operation
    344364struct chan_read {
     
    374394                return false;
    375395            }
    376            
    377             if ( __handle_waituntil_OR( prods ) ) {
     396
     397            if ( __handle_pending( prods, node ) ) {
    378398                __prods_handoff( chan, ret );
    379399                __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
     
    381401                return true;
    382402            }
    383             __make_select_node_unsat( node );
     403            if ( *node.clause_status == __SELECT_PENDING )
     404                __make_select_node_unsat( node );
    384405        }
    385406        // check if we can complete operation. If so race to establish winner in special OR case
     
    423444}
    424445static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
    425 static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) {
     446static inline void on_selected( chan_read(T) & this, select_node & node ) with(this) {
    426447    if ( node.extra == 0p ) // check if woken up due to closed channel
    427448        __closed_remove( chan, ret );
    428449    // This is only reachable if not closed or closed exception was handled
    429     return true;
    430450}
    431451
     
    464484                return false;
    465485            }
    466            
    467             if ( __handle_waituntil_OR( cons ) ) {
     486
     487            if ( __handle_pending( cons, node ) ) {
    468488                __cons_handoff( chan, elem );
    469489                __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node
     
    471491                return true;
    472492            }
    473             __make_select_node_unsat( node );
     493            if ( *node.clause_status == __SELECT_PENDING )
     494                __make_select_node_unsat( node );
    474495        }
    475496        // check if we can complete operation. If so race to establish winner in special OR case
     
    515536static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); }
    516537
    517 static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) {
     538static inline void on_selected( chan_write(T) & this, select_node & node ) with(this) {
    518539    if ( node.extra == 0p ) // check if woken up due to closed channel
    519540        __closed_insert( chan, elem );
    520541
    521542    // This is only reachable if not closed or closed exception was handled
    522     return true;
    523543}
    524544
  • libcfa/src/concurrency/future.hfa

    r2b78949 r8a930c03  
    7070                // check if the future is available
    7171        // currently no mutual exclusion because I can't see when you need this call to be synchronous or protected
    72                 bool available( future(T) & this ) { return this.state; }
     72                bool available( future(T) & this ) { return __atomic_load_n( &this.state, __ATOMIC_RELAXED ); }
    7373
    7474
     
    180180        }
    181181               
    182         bool on_selected( future(T) & this, select_node & node ) { return true; }
     182        void on_selected( future(T) & this, select_node & node ) {}
    183183        }
    184184}
    185185
    186186//--------------------------------------------------------------------------------------------------------
    187 // These futures below do not support select statements so they may not be as useful as 'future'
     187// These futures below do not support select statements so they may not have as many features as 'future'
    188188//  however the 'single_future' is cheap and cheerful and is most likely more performant than 'future'
    189189//  since it uses raw atomics and no locks
  • libcfa/src/concurrency/locks.cfa

    r2b78949 r8a930c03  
    239239}
    240240
    241 bool on_selected( blocking_lock & this, select_node & node ) { return true; }
     241void on_selected( blocking_lock & this, select_node & node ) {}
    242242
    243243//-----------------------------------------------------------------------------
  • libcfa/src/concurrency/locks.hfa

    r2b78949 r8a930c03  
    3232#include "select.hfa"
    3333
    34 #include <fstream.hfa>
    35 
    3634// futex headers
    3735#include <linux/futex.h>      /* Definition of FUTEX_* constants */
     
    114112static inline bool   register_select( single_acquisition_lock & this, select_node & node ) { return register_select( (blocking_lock &)this, node ); }
    115113static inline bool   unregister_select( single_acquisition_lock & this, select_node & node ) { return unregister_select( (blocking_lock &)this, node ); }
    116 static inline bool   on_selected( single_acquisition_lock & this, select_node & node ) { return on_selected( (blocking_lock &)this, node ); }
     114static inline void   on_selected( single_acquisition_lock & this, select_node & node ) { on_selected( (blocking_lock &)this, node ); }
    117115
    118116//----------
     
    131129static inline bool   register_select( owner_lock & this, select_node & node ) { return register_select( (blocking_lock &)this, node ); }
    132130static inline bool   unregister_select( owner_lock & this, select_node & node ) { return unregister_select( (blocking_lock &)this, node ); }
    133 static inline bool   on_selected( owner_lock & this, select_node & node ) { return on_selected( (blocking_lock &)this, node ); }
     131static inline void   on_selected( owner_lock & this, select_node & node ) { on_selected( (blocking_lock &)this, node ); }
    134132
    135133//-----------------------------------------------------------------------------
     
    621619}
    622620
    623 static inline bool on_selected( simple_owner_lock & this, select_node & node ) { return true; }
     621static inline void on_selected( simple_owner_lock & this, select_node & node ) {}
    624622
    625623
  • libcfa/src/concurrency/select.cfa

    r2b78949 r8a930c03  
    4949    return false;
    5050}
    51 bool on_selected( select_timeout_node & this, select_node & node ) { return true; }
     51void on_selected( select_timeout_node & this, select_node & node ) {}
    5252
    5353// Gateway routine to wait on duration
  • libcfa/src/concurrency/select.hfa

    r2b78949 r8a930c03  
    9191    // For unregistering a select stmt on a selectable concurrency primitive
    9292    // If true is returned then the corresponding code block is run (only in non-special OR case and only if node status is not RUN)
    93     bool unregister_select( T &, select_node &  );
     93    bool unregister_select( T &, select_node & );
    9494
    9595    // This routine is run on the selecting thread prior to executing the statement corresponding to the select_node
    9696    //    passed as an arg to this routine
    9797    // If on_selected returns false, the statement is not run, if it returns true it is run.
    98     bool on_selected( T &, select_node & );
     98    void on_selected( T &, select_node & );
    9999};
    100100
     
    102102// Waituntil Helpers
    103103//=============================================================================================
     104
     105static inline void __make_select_node_unsat( select_node & this ) with( this ) {
     106    __atomic_store_n( clause_status, __SELECT_UNSAT, __ATOMIC_SEQ_CST );
     107}
     108static inline void __make_select_node_sat( select_node & this ) with( this ) {
     109    __atomic_store_n( clause_status, __SELECT_SAT, __ATOMIC_SEQ_CST );
     110}
    104111
    105112// used for the 2-stage avail needed by the special OR case
     
    116123}
    117124
    118 static inline void __make_select_node_unsat( select_node & this ) with( this ) {
    119     __atomic_store_n( clause_status, __SELECT_UNSAT, __ATOMIC_SEQ_CST );
    120 }
    121 static inline void __make_select_node_sat( select_node & this ) with( this ) {
    122     __atomic_store_n( clause_status, __SELECT_SAT, __ATOMIC_SEQ_CST );
     125// used for the 2-stage avail by the thread who owns a pending node
     126static inline bool __pending_set_other( select_node & other, select_node & mine, unsigned long int val ) with( other ) {
     127    /* paranoid */ verify( park_counter == 0p );
     128    /* paranoid */ verify( clause_status != 0p );
     129
     130    unsigned long int cmp_status = __SELECT_UNSAT;
     131    while( !__atomic_compare_exchange_n( clause_status, &cmp_status, val, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {
     132        if ( cmp_status != __SELECT_PENDING )
     133            return false;
     134
     135        // toggle current status flag to avoid starvation/deadlock
     136        __make_select_node_unsat( mine );
     137        cmp_status = __SELECT_UNSAT;
     138        if ( !__atomic_compare_exchange_n( mine.clause_status, &cmp_status, __SELECT_PENDING, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) )
     139            return false;
     140        cmp_status = __SELECT_UNSAT;
     141    }
     142    return true;
    123143}
    124144
     
    188208bool register_select( select_timeout_node & this, select_node & node );
    189209bool unregister_select( select_timeout_node & this, select_node & node );
    190 bool on_selected( select_timeout_node & this, select_node & node );
     210void on_selected( select_timeout_node & this, select_node & node );
    191211
    192212// Gateway routines to waituntil on duration
Note: See TracChangeset for help on using the changeset viewer.