Changeset 2d028039 for libcfa/src


Ignore:
Timestamp:
Feb 8, 2023, 3:07:52 PM (23 months ago)
Author:
caparson <caparson@…>
Branches:
ADT, ast-experimental, master
Children:
4616622
Parents:
ccf1d99
Message:

added support for copy based envelopes

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/actor.hfa

    rccf1d99 r2d028039  
    55#include <list.hfa>
    66#include <kernel.hfa>
     7#include <vector2.hfa>
    78
    89#ifdef __CFA_DEBUG__
     
    2526#define __DEFAULT_EXECUTOR_SEPCLUS__ false
    2627
    27 // when you flip this make sure to recompile compiler and flip the appropriate flag there too in Actors.cpp
    28 #define __ALLOC 0
     28#define __STEAL 1 // workstealing toggle. Disjoint from toggles above
     29
     30// whether to steal work or to steal a queue Only applicable if __STEAL == 1
     31#define __STEAL_WORK 0
     32
     33// heuristic selection (only set one to be 1)
     34#define __RAND_QUEUE 1
     35#define __RAND_WORKER 0
     36
     37// show stealing stats
     38// #define __STEAL_STATS
    2939
    3040// forward decls
     
    4050    __receive_fn fn;
    4151    bool stop;
    42     inline dlink(request);
    4352};
    44 P9_EMBEDDED( request, dlink(request) )
    4553
    4654static inline void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel
     
    6068// hybrid data structure. Copies until buffer is full and then allocates for intrusive list
    6169struct copy_queue {
    62     dlist( request ) list;
    63     #if ! __ALLOC
    6470    request * buffer;
    65     size_t count, buffer_size, index;
    66     #endif
     71    size_t count, buffer_size, index, utilized, last_size;
    6772};
    6873static inline void ?{}( copy_queue & this ) {}
    6974static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) {
    70     list{};
    71     #if ! __ALLOC
    7275    buffer_size = buf_size;
    7376    buffer = aalloc( buffer_size );
    7477    count = 0;
     78    utilized = 0;
    7579    index = 0;
    76     #endif
    77 }
    78 static inline void ^?{}( copy_queue & this ) with(this) {
    79     #if ! __ALLOC
    80     adelete(buffer);
    81     #endif
    82 }
     80    last_size = 0;
     81}
     82static inline void ^?{}( copy_queue & this ) with(this) { adelete(buffer); }
    8383
    8484static inline void insert( copy_queue & this, request & elem ) with(this) {
    85     #if ! __ALLOC
    86     if ( count < buffer_size ) { // fast path ( no alloc )
    87         buffer[count]{ elem };
    88         count++;
    89         return;
    90     }
    91     request * new_elem = alloc();
    92     (*new_elem){ elem };
    93     insert_last( list, *new_elem );
    94     #else
    95     insert_last( list, elem );
    96     #endif
     85    if ( count >= buffer_size ) { // increase arr size
     86        last_size = buffer_size;
     87        buffer_size = 2 * buffer_size;
     88        buffer = realloc( buffer, sizeof( request ) * buffer_size );
     89        /* paranoid */ verify( buffer );
     90    }
     91    buffer[count]{ elem }; // C_TODO: change to memcpy
     92    // memcpy( &buffer[count], &elem, sizeof(request) );
     93    count++;
    9794}
    9895
    9996// once you start removing you need to remove all elements
    10097// it is not supported to call insert() before the list is fully empty
    101 // should_delete is an output param
    102 static inline request & remove( copy_queue & this, bool & should_delete ) with(this) {
    103     #if ! __ALLOC
     98static inline request & remove( copy_queue & this ) with(this) {
    10499    if ( count > 0 ) {
    105100        count--;
    106         should_delete = false;
    107101        size_t old_idx = index;
    108102        index = count == 0 ? 0 : index + 1;
    109103        return buffer[old_idx];
    110104    }
    111     #endif
    112     should_delete = true;
    113     return try_pop_front( list );
    114 }
    115 
    116 static inline bool isEmpty( copy_queue & this ) with(this) {
    117     #if ! __ALLOC
    118     return count == 0 && list`isEmpty;
    119     #else
    120     return list`isEmpty;
    121     #endif
    122 }
     105    request * ret = 0p;
     106    return *0p;
     107}
     108
     109// try to reclaim some memory
     110static inline void reclaim( copy_queue & this ) with(this) {
     111    if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; }
     112    utilized = 0;
     113    buffer_size--;
     114    buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory
     115}
     116
     117static inline bool isEmpty( copy_queue & this ) with(this) { return count == 0; }
    123118
    124119static size_t __buffer_size = 10; // C_TODO: rework this to be passed from executor through ctors (no need for global)
     
    126121    __spinlock_t mutex_lock;
    127122    copy_queue owned_queue;
    128     copy_queue * c_queue; // C_TODO: try putting this on the stack with ptr juggling
    129 
     123    copy_queue * c_queue;
     124    volatile bool being_processed;
    130125}; // work_queue
    131126static inline void ?{}( work_queue & this ) with(this) {
    132     // c_queue = alloc();
    133     // (*c_queue){ __buffer_size };
    134127    owned_queue{ __buffer_size };
    135128    c_queue = &owned_queue;
    136 }
    137 // static inline void ^?{}( work_queue & this ) with(this) { delete( c_queue ); }
     129    being_processed = false;
     130}
    138131
    139132static inline void insert( work_queue & this, request & elem ) with(this) {
     
    143136} // insert
    144137
    145 static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) {
     138static inline void transfer( work_queue & this, copy_queue ** transfer_to, work_queue ** queue_arr, unsigned int idx ) with(this) {
    146139    lock( mutex_lock __cfaabi_dbg_ctx2 );
     140    #if __STEAL
     141
     142    #if __STEAL_WORK
     143    if (  unlikely( being_processed ) )
     144    #else
     145    // check if queue has been stolen out from under us between
     146    // transfer() call and lock acquire C_TODO: maybe just use new queue!
     147    if ( unlikely( being_processed || queue_arr[idx] != &this ) )
     148    #endif // __STEAL_WORK
     149    {
     150        unlock( mutex_lock );
     151        return;
     152    }
     153
     154    being_processed = c_queue->count != 0;
     155    #endif // __STEAL
     156
     157    c_queue->utilized = c_queue->count;
     158
    147159    // swap copy queue ptrs
    148160    copy_queue * temp = *transfer_to;
     
    153165
    154166thread worker {
    155     copy_queue owned_queue;
    156     work_queue * request_queues;
     167    work_queue ** request_queues;
    157168    copy_queue * current_queue;
     169    worker ** worker_arr; // C_TODO: change n_workers, n_queues,worker_arr to just be pulled from ptr to executor
    158170        request & req;
    159     unsigned int start, range;
     171    unsigned int start, range, empty_count, n_workers, n_queues, id;
     172    #ifdef __STEAL_STATS
     173    unsigned int try_steal, stolen;
     174    #endif
    160175};
    161176
    162 static inline void ?{}( worker & this, cluster & clu, work_queue * request_queues, unsigned int start, unsigned int range ) {
     177#ifdef __STEAL_STATS
     178unsigned int total_tries = 0, total_stolen = 0, total_workers;
     179#endif
     180static inline void ?{}( worker & this, cluster & clu, work_queue ** request_queues, copy_queue * current_queue, unsigned int start,
     181        unsigned int range, worker ** worker_arr, unsigned int n_workers, unsigned int n_queues, unsigned int id ) {
    163182    ((thread &)this){ clu };
    164183    this.request_queues = request_queues;
    165     // this.current_queue = alloc();
    166     // (*this.current_queue){ __buffer_size };
    167     this.owned_queue{ __buffer_size };
    168     this.current_queue = &this.owned_queue;
     184    this.current_queue = current_queue;
    169185    this.start = start;
    170186    this.range = range;
    171 }
    172 // static inline void ^?{}( worker & mutex this ) with(this) { delete( current_queue ); }
     187    this.empty_count = 0;
     188    this.n_workers = n_workers;
     189    this.worker_arr = worker_arr;
     190    this.n_queues = n_queues;
     191    this.id = id;
     192    #ifdef __STEAL_STATS
     193    this.try_steal = 0;
     194    this.stolen = 0;
     195    total_workers = n_workers;
     196    #endif
     197}
     198static inline void ^?{}( worker & mutex this ) with(this) {
     199    // delete( current_queue );
     200    #ifdef __STEAL_STATS
     201    __atomic_add_fetch(&total_tries, try_steal, __ATOMIC_SEQ_CST);
     202    __atomic_add_fetch(&total_stolen, stolen, __ATOMIC_SEQ_CST);
     203    if (__atomic_sub_fetch(&total_workers, 1, __ATOMIC_SEQ_CST) == 0)
     204        printf("steal attempts: %u, steals: %u\n", total_tries, total_stolen);
     205    #endif
     206}
    173207
    174208struct executor {
    175209    cluster * cluster;                                                      // if workers execute on separate cluster
    176210        processor ** processors;                                            // array of virtual processors adding parallelism for workers
    177         work_queue * request_queues;                                // master list of work request queues
    178         worker ** workers;                                                              // array of workers executing work requests
     211        work_queue * request_queues;                                // master array of work request queues
     212    copy_queue * local_queues;                      // array of all worker local queues to avoid deletion race
     213        work_queue ** worker_req_queues;                // secondary array of work queues to allow for swapping
     214    worker ** workers;                                                          // array of workers executing work requests
    179215        unsigned int nprocessors, nworkers, nrqueues;   // number of processors/threads/request queues
    180216        bool seperate_clus;                                                             // use same or separate cluster for executor
     
    195231
    196232    request_queues = aalloc( nrqueues );
    197     for ( i; nrqueues )
     233    worker_req_queues = aalloc( nrqueues );
     234    for ( i; nrqueues ) {
    198235        request_queues[i]{};
     236        worker_req_queues[i] = &request_queues[i];
     237    }
    199238   
    200239    processors = aalloc( nprocessors );
     
    202241        (*(processors[i] = alloc())){ *cluster };
    203242
     243    local_queues = aalloc( nworkers );
    204244    workers = alloc( nworkers );
    205245    unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
    206246    for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) {
     247        local_queues[i]{ buf_size };
    207248        range = reqPerWorker + ( i < extras ? 1 : 0 );
    208         (*(workers[i] = alloc())){ *cluster, request_queues, start, range };
     249        (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], start, range, workers, nworkers, nrqueues, i };
    209250    } // for
    210251}
     
    215256static inline void ?{}( executor & this ) { this{ __DEFAULT_EXECUTOR_PROCESSORS__ }; }
    216257
     258// C_TODO: once stealing is implemented make sure shutdown still works
    217259static inline void ^?{}( executor & this ) with(this) {
     260    #if __STEAL
     261    request sentinels[nrqueues];
     262    for ( unsigned int i = 0; i < nrqueues; i++ ) {
     263        insert( request_queues[i], sentinels[i] );              // force eventually termination
     264    } // for
     265    #else
    218266    request sentinels[nworkers];
    219267    unsigned int reqPerWorker = nrqueues / nworkers;
     
    221269        insert( request_queues[step], sentinels[i] );           // force eventually termination
    222270    } // for
     271    #endif
    223272
    224273    for ( i; nworkers )
     
    230279
    231280    adelete( workers );
     281    adelete( local_queues );
    232282    adelete( request_queues );
     283    adelete( worker_req_queues );
    233284    adelete( processors );
    234285    if ( seperate_clus ) delete( cluster );
     
    306357}
    307358
     359// Couple of ways to approach work stealing
     360// 1: completely worker agnostic, just find a big queue and steal it
     361// 2: track some heuristic of worker's load and focus on that and then pick a queue from that worker
     362//   worker heuristics:
     363//     - how many queues have work?
     364//     - size of largest queue
     365//     - total # of messages
     366//     - messages currently servicing
     367//     - pick randomly
     368//     - pick from closer threads/workers (this can be combined with others)
     369
     370// lock free or global lock for queue stealing
     371#define __LOCK_SWP 0
     372
     373__spinlock_t swp_lock;
     374
     375// tries to atomically swap two queues and returns a bool indicating if the swap failed
     376static inline bool try_swap_queues( worker & this, unsigned int victim_idx, unsigned int my_idx ) with(this) {
     377    #if __LOCK_SWP
     378
     379    lock( swp_lock __cfaabi_dbg_ctx2 );
     380    work_queue * temp = request_queues[my_idx];
     381    request_queues[my_idx] = request_queues[victim_idx];
     382    request_queues[victim_idx] = temp;
     383    unlock( swp_lock );
     384   
     385    return true;
     386
     387    #else // __LOCK_SWP else
     388    work_queue * my_queue = request_queues[my_idx];
     389    work_queue * other_queue = request_queues[victim_idx];
     390    if ( other_queue == 0p || my_queue == 0p ) return false;
     391
     392    // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false
     393    if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) )
     394        return false;
     395
     396    // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false
     397    if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {
     398        /* paranoid */ verify( request_queues[my_idx] == 0p );
     399        request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val
     400        return false;
     401    }
     402
     403    // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically
     404    request_queues[my_idx] = other_queue; // last write does not need to be atomic
     405    return true;
     406
     407    #endif // __LOCK_SWP
     408}
     409
     410// once a worker to steal from has been chosen, choose queue to steal from
     411static inline bool choose_queue( worker & this, unsigned int victim_id, unsigned int & last_idx ) with(this) {
     412    #if __RAND_QUEUE
     413    unsigned int tries = 0;
     414    const unsigned int start_idx = prng( n_queues );
     415    work_queue * curr_steal_queue;
     416
     417    for ( unsigned int i = start_idx; tries < n_queues; i = (i + 1) % n_queues ) {
     418        tries++;
     419        curr_steal_queue = request_queues[i];
     420        #if __STEAL_WORK
     421
     422        // avoid empty queues and queues that are being operated on
     423        if ( curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) )
     424            continue;
     425       
     426        // in this case we just return from transfer if this doesn't work
     427        transfer( *curr_steal_queue, &current_queue, request_queues, i );
     428        if ( isEmpty( *current_queue ) ) continue;
     429        last_idx = i;
     430
     431        #ifdef __STEAL_STATS
     432        stolen++;
     433        #endif // __STEAL_STATS
     434
     435        #else // __STEAL_WORK else
     436
     437        // avoid empty queues and queues that are being operated on
     438        if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) )
     439            continue;
     440
     441        #ifdef __STEAL_STATS
     442        bool success = try_swap_queues( this, i, last_idx );
     443        if ( success ) stolen++;
     444        #else
     445        try_swap_queues( this, i, last_idx );
     446        #endif // __STEAL_STATS
     447
     448        // C_TODO: try transfer immediately
     449        // transfer( *request_queues[last_idx], &current_queue, request_queues, last_idx );
     450        // if ( isEmpty( *current_queue ) ) return false;
     451        return false;
     452
     453        #endif // __STEAL_WORK
     454
     455        return true;
     456    } // for
     457    return false;
     458
     459    #elif __RAND_WORKER
     460
     461    // have to calculate victim start and range since victim may be deleted before us in shutdown
     462    const unsigned int queues_per_worker = n_queues / n_workers;
     463    const unsigned int extras = n_queues % n_workers;
     464    unsigned int vic_start, vic_range;
     465    if ( extras > victim_id  ) {
     466        vic_range = queues_per_worker + 1;
     467        vic_start = vic_range * victim_id;
     468    } else {
     469        vic_start = extras + victim_id * queues_per_worker;
     470        vic_range = queues_per_worker;
     471    }
     472    unsigned int start_idx = prng( vic_range );
     473    unsigned int tries = 0;
     474    work_queue * curr_steal_queue;
     475
     476    for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) {
     477        tries++;
     478        curr_steal_queue = request_queues[ i + vic_start ];
     479        // avoid empty queues and queues that are being operated on
     480        if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) )
     481            continue;
     482
     483        try_swap_queues( this, i, last_idx );
     484
     485        #ifdef __STEAL_STATS
     486        bool success = try_swap_queues( this, i, last_idx );
     487        if ( success ) stolen++;
     488        #else
     489        try_swap_queues( this, i, last_idx );
     490        #endif // __STEAL_STATS
     491
     492        // C_TODO: try transfer immediately
     493        // transfer( *request_queues[last_idx], &current_queue, request_queues, last_idx );
     494        // if ( isEmpty( *current_queue ) ) return false;
     495        return false;
     496    }
     497    #endif
     498}
     499
     500// choose a worker to steal from
     501static inline bool choose_victim( worker & this, unsigned int & last_idx ) with(this) {
     502    #if __RAND_WORKER
     503    unsigned int victim = prng( n_workers );
     504    if ( victim == id ) victim = ( victim + 1 ) % n_workers;
     505    return choose_queue( this, victim, last_idx );
     506    #else
     507    return choose_queue( this, 0, last_idx );
     508    #endif
     509}
     510
     511// look for work to steal
     512// returns a bool: true => a queue was stolen, false => no work was stolen
     513static inline bool steal_work( worker & this, unsigned int & last_idx ) with(this) { // C_TODO: add debug tracking of how many steals occur
     514    // to steal queue acquire both queue's locks in address ordering (maybe can do atomic swap)
     515    // maybe a flag to hint which queue is being processed
     516    // look at count to see if queue is worth stealing (dont steal empty queues)
     517    // if steal and then flag is up then dont process and just continue looking at own queues
     518    // (best effort approach) its ok if stealing isn't fruitful
     519    //          -> more important to not delay busy threads
     520
     521    return choose_victim( this, last_idx );
     522}
     523
    308524void main( worker & this ) with(this) {
    309     bool should_delete;
     525    // threshold of empty queues we see before we go stealing
     526    const unsigned int steal_threshold = 2 * n_queues;
     527    unsigned int curr_idx;
     528    work_queue * curr_work_queue;
    310529    Exit:
    311530    for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers
    312531        // C_TODO: potentially check queue count instead of immediately trying to transfer
    313         transfer( request_queues[i + start], &current_queue );
     532        curr_idx = i + start;
     533        curr_work_queue = request_queues[curr_idx];
     534        transfer( *curr_work_queue, &current_queue, request_queues, curr_idx );
     535        if ( isEmpty( *current_queue ) ) {
     536            #if __STEAL
     537            empty_count++;
     538            if ( empty_count < steal_threshold ) continue;
     539            empty_count = 0; // C_TODO: look into stealing backoff schemes
     540            #ifdef __STEAL_STATS
     541            try_steal++;
     542            #endif // __STEAL_STATS
     543
     544            if ( ! steal_work( this, curr_idx ) ) continue;
     545
     546            #else // __STEAL else
     547
     548            continue;
     549           
     550            #endif // __STEAL
     551        }
    314552        while ( ! isEmpty( *current_queue ) ) {
    315             &req = &remove( *current_queue, should_delete );
     553            &req = &remove( *current_queue );
    316554            if ( !&req ) continue; // possibly add some work stealing/idle sleep here
    317555            if ( req.stop ) break Exit;
    318556            deliver_request( req );
    319 
    320             if ( should_delete ) delete( &req );
    321         } // while
     557        }
     558        #if __STEAL
     559        curr_work_queue->being_processed = false; // set done processing
     560        #endif
     561        empty_count = 0; // we found work so reset empty counter
     562        reclaim( *current_queue );
    322563    } // for
    323564}
Note: See TracChangeset for help on using the changeset viewer.