Ignore:
Timestamp:
Feb 3, 2023, 1:28:43 PM (18 months ago)
Author:
Peter A. Buhr <pabuhr@…>
Branches:
ADT, ast-experimental, master
Children:
2f61765
Parents:
8a97248 (diff), db9d7a9 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/actor.hfa

    r8a97248 r2125443a  
     1#pragma once
     2
    13#include <locks.hfa>
    24#include <limits.hfa>
    35#include <list.hfa>
     6#include <kernel.hfa>
    47
    58#ifdef __CFA_DEBUG__
     
    2124// Define if executor is created in a separate cluster
    2225#define __DEFAULT_EXECUTOR_SEPCLUS__ false
     26
     27// when you flip this make sure to recompile compiler and flip the appropriate flag there too in Actors.cpp
     28#define __ALLOC 0
    2329
    2430// forward decls
     
    3844P9_EMBEDDED( request, dlink(request) )
    3945
    40 void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel
    41 void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) {
     46static inline void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel
     47static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) {
    4248    this.receiver = receiver;
    4349    this.msg = msg;
     
    4551    this.stop = false;
    4652}
    47 
     53static inline void ?{}( request & this, request & copy ) {
     54    this.receiver = copy.receiver;
     55    this.msg = copy.msg;
     56    this.fn = copy.fn;
     57    this.stop = copy.stop;
     58}
     59
     60// hybrid data structure. Copies until buffer is full and then allocates for intrusive list
     61struct copy_queue {
     62    dlist( request ) list;
     63    #if ! __ALLOC
     64    request * buffer;
     65    size_t count, buffer_size, index;
     66    #endif
     67};
     68static inline void ?{}( copy_queue & this ) {}
     69static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) {
     70    list{};
     71    #if ! __ALLOC
     72    buffer_size = buf_size;
     73    buffer = aalloc( buffer_size );
     74    count = 0;
     75    index = 0;
     76    #endif
     77}
     78static inline void ^?{}( copy_queue & this ) with(this) {
     79    #if ! __ALLOC
     80    adelete(buffer);
     81    #endif
     82}
     83
     84static inline void insert( copy_queue & this, request & elem ) with(this) {
     85    #if ! __ALLOC
     86    if ( count < buffer_size ) { // fast path ( no alloc )
     87        buffer[count]{ elem };
     88        count++;
     89        return;
     90    }
     91    request * new_elem = alloc();
     92    (*new_elem){ elem };
     93    insert_last( list, *new_elem );
     94    #else
     95    insert_last( list, elem );
     96    #endif
     97}
     98
     99// once you start removing you need to remove all elements
     100// it is not supported to call insert() before the list is fully empty
     101// should_delete is an output param
     102static inline request & remove( copy_queue & this, bool & should_delete ) with(this) {
     103    #if ! __ALLOC
     104    if ( count > 0 ) {
     105        count--;
     106        should_delete = false;
     107        size_t old_idx = index;
     108        index = count == 0 ? 0 : index + 1;
     109        return buffer[old_idx];
     110    }
     111    #endif
     112    should_delete = true;
     113    return try_pop_front( list );
     114}
     115
     116static inline bool isEmpty( copy_queue & this ) with(this) {
     117    #if ! __ALLOC
     118    return count == 0 && list`isEmpty;
     119    #else
     120    return list`isEmpty;
     121    #endif
     122}
     123
     124static size_t __buffer_size = 10; // C_TODO: rework this to be passed from executor through ctors (no need for global)
    48125struct work_queue {
    49     futex_mutex mutex_lock;
    50     dlist( request ) input;                                             // unbounded list of work requests
     126    __spinlock_t mutex_lock;
     127    copy_queue owned_queue;
     128    copy_queue * c_queue; // C_TODO: try putting this on the stack with ptr juggling
     129
    51130}; // work_queue
    52 void ?{}( work_queue & this ) with(this) { input{}; mutex_lock{}; }
    53 
    54 void insert( work_queue & this, request & elem ) with(this) {
    55     lock( mutex_lock );
    56     insert_last( input, elem );
     131static inline void ?{}( work_queue & this ) with(this) {
     132    // c_queue = alloc();
     133    // (*c_queue){ __buffer_size };
     134    owned_queue{ __buffer_size };
     135    c_queue = &owned_queue;
     136}
     137// static inline void ^?{}( work_queue & this ) with(this) { delete( c_queue ); }
     138
     139static inline void insert( work_queue & this, request & elem ) with(this) {
     140    lock( mutex_lock __cfaabi_dbg_ctx2 );
     141    insert( *c_queue, elem );
    57142    unlock( mutex_lock );
    58143} // insert
    59144
    60 void transfer( work_queue & this, dlist(request) & transferTo ) with(this) {
    61     lock( mutex_lock );
    62 
    63     //C_TODO CHANGE
    64     // transferTo->transfer( input );              // transfer input to output
    65 
    66     // this is awfully inefficient but Ill use it until transfer is implemented
    67     request * r;
    68     while ( ! input`isEmpty ) {
    69         r = &try_pop_front( input );
    70         if ( r ) insert_last( transferTo, *r );
    71     }
    72 
    73     // transfer( input, transferTo );
    74 
     145static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) {
     146    lock( mutex_lock __cfaabi_dbg_ctx2 );
     147    // swap copy queue ptrs
     148    copy_queue * temp = *transfer_to;
     149    *transfer_to = c_queue;
     150    c_queue = temp;
    75151    unlock( mutex_lock );
    76152} // transfer
    77153
    78154thread worker {
     155    copy_queue owned_queue;
    79156    work_queue * request_queues;
    80     dlist( request ) current_queue;
     157    copy_queue * current_queue;
    81158        request & req;
    82159    unsigned int start, range;
     
    86163    ((thread &)this){ clu };
    87164    this.request_queues = request_queues;
    88     this.current_queue{};
     165    // this.current_queue = alloc();
     166    // (*this.current_queue){ __buffer_size };
     167    this.owned_queue{ __buffer_size };
     168    this.current_queue = &this.owned_queue;
    89169    this.start = start;
    90170    this.range = range;
    91171}
     172// static inline void ^?{}( worker & mutex this ) with(this) { delete( current_queue ); }
    92173
    93174struct executor {
     
    100181}; // executor
    101182
    102 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) with(this) {
     183static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) {
    103184    if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" );
     185    __buffer_size = buf_size;
    104186    this.nprocessors = nprocessors;
    105187    this.nworkers = nworkers;
     
    127209    } // for
    128210}
    129 
     211static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __buffer_size }; }
    130212static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues ) { this{ nprocessors, nworkers, nrqueues, __DEFAULT_EXECUTOR_SEPCLUS__ }; }
    131213static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers ) { this{ nprocessors, nworkers, __DEFAULT_EXECUTOR_RQUEUES__ }; }
     
    147229    } // for
    148230
    149     delete( workers );
    150     delete( request_queues );
    151     delete( processors );
     231    adelete( workers );
     232    adelete( request_queues );
     233    adelete( processors );
    152234    if ( seperate_clus ) delete( cluster );
    153235}
     
    170252};
    171253
    172 void ?{}( actor & this ) {
     254static inline void ?{}( actor & this ) {
    173255    // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive
    174256    // member must be called to end it
     
    178260    __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_SEQ_CST );
    179261}
    180 void ^?{}( actor & this ) {}
     262static inline void ^?{}( actor & this ) {}
    181263
    182264static inline void check_actor( actor & this ) {
     
    204286};
    205287
    206 void ?{}( message & this ) { this.allocation_ = Nodelete; }
    207 void ?{}( message & this, Allocation allocation ) { this.allocation_ = allocation; }
    208 void ^?{}( message & this ) {}
     288static inline void ?{}( message & this ) { this.allocation_ = Nodelete; }
     289static inline void ?{}( message & this, Allocation allocation ) { this.allocation_ = allocation; }
     290static inline void ^?{}( message & this ) {}
    209291
    210292static inline void check_message( message & this ) {
     
    217299}
    218300
    219 void deliver_request( request & this ) {
     301static inline void deliver_request( request & this ) {
    220302    Allocation actor_allocation = this.fn( *this.receiver, *this.msg );
    221303    this.receiver->allocation_ = actor_allocation;
     
    225307
    226308void main( worker & this ) with(this) {
     309    bool should_delete;
    227310    Exit:
    228311    for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers
    229         transfer( request_queues[i + start], current_queue );
    230         while ( ! current_queue`isEmpty ) {
    231             &req = &try_pop_front( current_queue );
     312        // C_TODO: potentially check queue count instead of immediately trying to transfer
     313        transfer( request_queues[i + start], &current_queue );
     314        while ( ! isEmpty( *current_queue ) ) {
     315            &req = &remove( *current_queue, should_delete );
    232316            if ( !&req ) continue; // possibly add some work stealing/idle sleep here
    233317            if ( req.stop ) break Exit;
    234318            deliver_request( req );
    235319
    236             delete( &req );
     320            if ( should_delete ) delete( &req );
    237321        } // while
    238322    } // for
     
    250334    __actor_executor_thd = active_thread();
    251335    __actor_executor_ = alloc();
    252     (*__actor_executor_){ 0, num_thds, num_thds * 16 };
     336    (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 };
    253337}
    254338
Note: See TracChangeset for help on using the changeset viewer.