Changeset ccf1d99


Ignore:
Timestamp:
Feb 2, 2023, 11:08:48 AM (15 months ago)
Author:
caparsons <caparson@…>
Branches:
ADT, ast-experimental, master
Children:
2d028039, 98a2b1dc
Parents:
a64137f
Message:

intermediate push so I can move to nasus to performance test some stuff

Files:
3 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/actor.hfa

    ra64137f rccf1d99  
    2424// Define if executor is created in a separate cluster
    2525#define __DEFAULT_EXECUTOR_SEPCLUS__ false
     26
     27// when you flip this make sure to recompile compiler and flip the appropriate flag there too in Actors.cpp
     28#define __ALLOC 0
    2629
    2730// forward decls
     
    5861struct copy_queue {
    5962    dlist( request ) list;
     63    #if ! __ALLOC
    6064    request * buffer;
    6165    size_t count, buffer_size, index;
     66    #endif
    6267};
    6368static inline void ?{}( copy_queue & this ) {}
    6469static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) {
    6570    list{};
     71    #if ! __ALLOC
    6672    buffer_size = buf_size;
    6773    buffer = aalloc( buffer_size );
    6874    count = 0;
    6975    index = 0;
    70 }
    71 static inline void ^?{}( copy_queue & this ) with(this) { adelete(buffer); }
     76    #endif
     77}
     78static inline void ^?{}( copy_queue & this ) with(this) {
     79    #if ! __ALLOC
     80    adelete(buffer);
     81    #endif
     82}
    7283
    7384static inline void insert( copy_queue & this, request & elem ) with(this) {
     85    #if ! __ALLOC
    7486    if ( count < buffer_size ) { // fast path ( no alloc )
    7587        buffer[count]{ elem };
     
    8092    (*new_elem){ elem };
    8193    insert_last( list, *new_elem );
     94    #else
     95    insert_last( list, elem );
     96    #endif
    8297}
    8398
     
    86101// should_delete is an output param
    87102static inline request & remove( copy_queue & this, bool & should_delete ) with(this) {
     103    #if ! __ALLOC
    88104    if ( count > 0 ) {
    89105        count--;
     
    93109        return buffer[old_idx];
    94110    }
     111    #endif
    95112    should_delete = true;
    96113    return try_pop_front( list );
    97114}
    98115
    99 static inline bool isEmpty( copy_queue & this ) with(this) { return count == 0 && list`isEmpty; }
     116static inline bool isEmpty( copy_queue & this ) with(this) {
     117    #if ! __ALLOC
     118    return count == 0 && list`isEmpty;
     119    #else
     120    return list`isEmpty;
     121    #endif
     122}
    100123
    101124static size_t __buffer_size = 10; // C_TODO: rework this to be passed from executor through ctors (no need for global)
    102125struct work_queue {
    103     futex_mutex mutex_lock;
     126    __spinlock_t mutex_lock;
     127    copy_queue owned_queue;
    104128    copy_queue * c_queue; // C_TODO: try putting this on the stack with ptr juggling
     129
    105130}; // work_queue
    106131static inline void ?{}( work_queue & this ) with(this) {
    107     c_queue = alloc();
    108     (*c_queue){ __buffer_size }; // C_TODO: support passing copy buff size as arg to executor
    109 }
    110 static inline void ^?{}( work_queue & this ) with(this) { delete( c_queue ); }
     132    // c_queue = alloc();
     133    // (*c_queue){ __buffer_size };
     134    owned_queue{ __buffer_size };
     135    c_queue = &owned_queue;
     136}
     137// static inline void ^?{}( work_queue & this ) with(this) { delete( c_queue ); }
    111138
    112139static inline void insert( work_queue & this, request & elem ) with(this) {
    113     lock( mutex_lock );
     140    lock( mutex_lock __cfaabi_dbg_ctx2 );
    114141    insert( *c_queue, elem );
    115142    unlock( mutex_lock );
     
    117144
    118145static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) {
    119     lock( mutex_lock );
     146    lock( mutex_lock __cfaabi_dbg_ctx2 );
    120147    // swap copy queue ptrs
    121148    copy_queue * temp = *transfer_to;
     
    126153
    127154thread worker {
     155    copy_queue owned_queue;
    128156    work_queue * request_queues;
    129157    copy_queue * current_queue;
     
    135163    ((thread &)this){ clu };
    136164    this.request_queues = request_queues;
    137     this.current_queue = alloc();
    138     (*this.current_queue){ __buffer_size };
     165    // this.current_queue = alloc();
     166    // (*this.current_queue){ __buffer_size };
     167    this.owned_queue{ __buffer_size };
     168    this.current_queue = &this.owned_queue;
    139169    this.start = start;
    140170    this.range = range;
    141171}
    142 static inline void ^?{}( worker & mutex this ) with(this) { delete( current_queue ); }
     172// static inline void ^?{}( worker & mutex this ) with(this) { delete( current_queue ); }
    143173
    144174struct executor {
  • src/Concurrency/Actors.cpp

    ra64137f rccf1d99  
    180180};
    181181
    182 #define __ALLOC 0
     182#define __ALLOC 0 // C_TODO: complete swap to no-alloc version
    183183
    184184struct GenReceiveDecls : public ast::WithDeclsToAdd<> {
     
    225225                    return receiver;
    226226                }
    227             */
     227            */ // C_TODO: update this with new no alloc version
    228228            CompoundStmt * sendBody = new CompoundStmt( decl->location );
    229229
  • tests/concurrent/actors/executor.cfa

    ra64137f rccf1d99  
    8686
    8787   
    88     executor e{ Processors, Processors, Processors == 1 ? 1 : Processors * 512, true, BufSize };
     88    executor e{ Processors, Processors, Processors == 1 ? 1 : Processors * 512, true };
    8989
    9090    printf("starting\n");
Note: See TracChangeset for help on using the changeset viewer.