Changeset ecfe574 for libcfa


Ignore:
Timestamp:
Feb 1, 2023, 4:20:33 PM (3 years ago)
Author:
caparsons <caparson@…>
Branches:
ADT, ast-experimental, master
Children:
a4ab235
Parents:
dab2b6a
Message:

added envelope copying to avoid allocations

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/actor.hfa

    rdab2b6a recfe574  
    4141P9_EMBEDDED( request, dlink(request) )
    4242
    43 void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel
    44 void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) {
     43static inline void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel
     44static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) {
    4545    this.receiver = receiver;
    4646    this.msg = msg;
     
    4848    this.stop = false;
    4949}
    50 
     50static inline void ?{}( request & this, request & copy ) {
     51    this.receiver = copy.receiver;
     52    this.msg = copy.msg;
     53    this.fn = copy.fn;
     54    this.stop = copy.stop;
     55}
     56
     57// hybrid data structure. Copies until buffer is full and then allocates for intrusive list
     58struct copy_queue {
     59    dlist( request ) list;
     60    request * buffer;
     61    size_t count, buffer_size;
     62};
     63static inline void ?{}( copy_queue & this ) {}
     64static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) {
     65    list{};
     66    buffer_size = buf_size;
     67    buffer = aalloc( buffer_size );
     68    count = 0;
     69}
     70static inline void ^?{}( copy_queue & this ) with(this) { adelete(buffer); }
     71
     72static inline void insert( copy_queue & this, request & elem ) with(this) {
     73    if ( count < buffer_size ) { // fast path ( no alloc )
     74        buffer[count]{ elem };
     75        count++;
     76        return;
     77    }
     78    request * new_elem = alloc();
     79    (*new_elem){ elem };
     80    insert_last( list, *new_elem );
     81}
     82
     83// once you start removing you need to remove all elements
     84// it is not supported to call insert() before the list is fully empty
     85// should_delete is an output param
     86static inline request & remove( copy_queue & this, bool & should_delete ) with(this) {
     87    if ( count > 0 ) {
     88        count--;
     89        should_delete = false;
     90        return buffer[count];
     91    }
     92    should_delete = true;
     93    return try_pop_front( list );
     94}
     95
     96static inline bool isEmpty( copy_queue & this ) with(this) { return count == 0 && list`isEmpty; }
     97
     98static size_t __buffer_size = 10; // C_TODO: rework this to be passed from executor through ctors (no need for global)
    5199struct work_queue {
    52100    futex_mutex mutex_lock;
    53     dlist( request ) input;                                             // unbounded list of work requests
     101    copy_queue * c_queue; // C_TODO: try putting this on the stack with ptr juggling
    54102}; // work_queue
    55 void ?{}( work_queue & this ) with(this) { input{}; mutex_lock{}; }
    56 
    57 void insert( work_queue & this, request & elem ) with(this) {
     103static inline void ?{}( work_queue & this ) with(this) {
     104    c_queue = alloc();
     105    (*c_queue){ __buffer_size }; // C_TODO: support passing copy buff size as arg to executor
     106}
     107static inline void ^?{}( work_queue & this ) with(this) { delete( c_queue ); }
     108
     109static inline void insert( work_queue & this, request & elem ) with(this) {
    58110    lock( mutex_lock );
    59     insert_last( input, elem );
     111    insert( *c_queue, elem );
    60112    unlock( mutex_lock );
    61113} // insert
    62114
    63 void transfer( work_queue & this, dlist(request) & transferTo ) with(this) {
     115static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) {
    64116    lock( mutex_lock );
    65 
    66     //C_TODO CHANGE
    67     // transferTo->transfer( input );              // transfer input to output
    68 
    69     // this is awfully inefficient but Ill use it until transfer is implemented
    70     request * r;
    71     while ( ! input`isEmpty ) {
    72         r = &try_pop_front( input );
    73         if ( r ) insert_last( transferTo, *r );
    74     }
    75 
    76     // transfer( input, transferTo );
    77 
     117    // swap copy queue ptrs
     118    copy_queue * temp = *transfer_to;
     119    *transfer_to = c_queue;
     120    c_queue = temp;
    78121    unlock( mutex_lock );
    79122} // transfer
     
    81124thread worker {
    82125    work_queue * request_queues;
    83     dlist( request ) current_queue;
     126    copy_queue * current_queue;
    84127        request & req;
    85128    unsigned int start, range;
     
    89132    ((thread &)this){ clu };
    90133    this.request_queues = request_queues;
    91     this.current_queue{};
     134    this.current_queue = alloc();
     135    (*this.current_queue){ __buffer_size };
    92136    this.start = start;
    93137    this.range = range;
    94138}
     139static inline void ^?{}( worker & mutex this ) with(this) { delete( current_queue ); }
    95140
    96141struct executor {
     
    103148}; // executor
    104149
    105 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) with(this) {
     150static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) {
    106151    if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" );
     152    __buffer_size = buf_size;
    107153    this.nprocessors = nprocessors;
    108154    this.nworkers = nworkers;
     
    130176    } // for
    131177}
    132 
     178static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __buffer_size }; }
    133179static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues ) { this{ nprocessors, nworkers, nrqueues, __DEFAULT_EXECUTOR_SEPCLUS__ }; }
    134180static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers ) { this{ nprocessors, nworkers, __DEFAULT_EXECUTOR_RQUEUES__ }; }
     
    150196    } // for
    151197
    152     delete( workers );
    153     delete( request_queues );
    154     delete( processors );
     198    adelete( workers );
     199    adelete( request_queues );
     200    adelete( processors );
    155201    if ( seperate_clus ) delete( cluster );
    156202}
     
    173219};
    174220
    175 void ?{}( actor & this ) {
     221static inline void ?{}( actor & this ) {
    176222    // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive
    177223    // member must be called to end it
     
    181227    __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_SEQ_CST );
    182228}
    183 void ^?{}( actor & this ) {}
     229static inline void ^?{}( actor & this ) {}
    184230
    185231static inline void check_actor( actor & this ) {
     
    207253};
    208254
    209 void ?{}( message & this ) { this.allocation_ = Nodelete; }
    210 void ?{}( message & this, Allocation allocation ) { this.allocation_ = allocation; }
    211 void ^?{}( message & this ) {}
     255static inline void ?{}( message & this ) { this.allocation_ = Nodelete; }
     256static inline void ?{}( message & this, Allocation allocation ) { this.allocation_ = allocation; }
     257static inline void ^?{}( message & this ) {}
    212258
    213259static inline void check_message( message & this ) {
     
    220266}
    221267
    222 void deliver_request( request & this ) {
     268static inline void deliver_request( request & this ) {
    223269    Allocation actor_allocation = this.fn( *this.receiver, *this.msg );
    224270    this.receiver->allocation_ = actor_allocation;
     
    228274
    229275void main( worker & this ) with(this) {
     276    bool should_delete;
    230277    Exit:
    231278    for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers
    232         transfer( request_queues[i + start], current_queue );
    233         while ( ! current_queue`isEmpty ) {
    234             &req = &try_pop_front( current_queue );
     279        // C_TODO: potentially check queue count instead of immediately trying to transfer
     280        transfer( request_queues[i + start], &current_queue );
     281        while ( ! isEmpty( *current_queue ) ) {
     282            &req = &remove( *current_queue, should_delete );
    235283            if ( !&req ) continue; // possibly add some work stealing/idle sleep here
    236284            if ( req.stop ) break Exit;
    237285            deliver_request( req );
    238286
    239             delete( &req );
     287            if ( should_delete ) delete( &req );
    240288        } // while
    241289    } // for
     
    253301    __actor_executor_thd = active_thread();
    254302    __actor_executor_ = alloc();
    255     (*__actor_executor_){ 0, num_thds, num_thds * 16 };
     303    (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 };
    256304}
    257305
Note: See TracChangeset for help on using the changeset viewer.