source: libcfa/src/concurrency/actor.hfa @ c910709

ADTast-experimental
Last change on this file since c910709 was 2d028039, checked in by caparson <caparson@…>, 18 months ago

added support for copy based envelopes

  • Property mode set to 100644
File size: 21.8 KB
Line 
1#pragma once
2
3#include <locks.hfa>
4#include <limits.hfa>
5#include <list.hfa>
6#include <kernel.hfa>
7#include <vector2.hfa>
8
9#ifdef __CFA_DEBUG__
10#define CFA_DEBUG( stmt ) stmt
11#else
12#define CFA_DEBUG( stmt )
13#endif // CFA_DEBUG
14
15// Define the default number of processors created in the executor. Must be greater than 0.
16#define __DEFAULT_EXECUTOR_PROCESSORS__ 2
17
18// Define the default number of threads created in the executor. Must be greater than 0.
19#define __DEFAULT_EXECUTOR_WORKERS__ 2
20
21// Define the default number of executor request-queues (mailboxes) written to by actors and serviced by the
22// actor-executor threads. Must be greater than 0.
23#define __DEFAULT_EXECUTOR_RQUEUES__ 2
24
25// Define if executor is created in a separate cluster
26#define __DEFAULT_EXECUTOR_SEPCLUS__ false
27
28#define __STEAL 1 // workstealing toggle. Disjoint from toggles above
29
30// whether to steal work or to steal a queue Only applicable if __STEAL == 1
31#define __STEAL_WORK 0
32
33// heuristic selection (only set one to be 1)
34#define __RAND_QUEUE 1
35#define __RAND_WORKER 0
36
37// show stealing stats
38// #define __STEAL_STATS
39
40// forward decls
41struct actor;
42struct message;
43
44enum Allocation { Nodelete, Delete, Destroy, Finished }; // allocation status
45
46typedef Allocation (*__receive_fn)(actor &, message &);
47struct request {
48    actor * receiver;
49    message * msg;
50    __receive_fn fn;
51    bool stop;
52};
53
54static inline void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel
55static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) {
56    this.receiver = receiver;
57    this.msg = msg;
58    this.fn = fn;
59    this.stop = false;
60}
61static inline void ?{}( request & this, request & copy ) {
62    this.receiver = copy.receiver;
63    this.msg = copy.msg;
64    this.fn = copy.fn;
65    this.stop = copy.stop;
66}
67
68// hybrid data structure. Copies until buffer is full and then allocates for intrusive list
69struct copy_queue {
70    request * buffer;
71    size_t count, buffer_size, index, utilized, last_size;
72};
73static inline void ?{}( copy_queue & this ) {}
74static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) {
75    buffer_size = buf_size;
76    buffer = aalloc( buffer_size );
77    count = 0;
78    utilized = 0;
79    index = 0;
80    last_size = 0;
81}
82static inline void ^?{}( copy_queue & this ) with(this) { adelete(buffer); }
83
84static inline void insert( copy_queue & this, request & elem ) with(this) {
85    if ( count >= buffer_size ) { // increase arr size
86        last_size = buffer_size;
87        buffer_size = 2 * buffer_size;
88        buffer = realloc( buffer, sizeof( request ) * buffer_size );
89        /* paranoid */ verify( buffer );
90    }
91    buffer[count]{ elem }; // C_TODO: change to memcpy
92    // memcpy( &buffer[count], &elem, sizeof(request) );
93    count++;
94}
95
96// once you start removing you need to remove all elements
97// it is not supported to call insert() before the list is fully empty
98static inline request & remove( copy_queue & this ) with(this) {
99    if ( count > 0 ) {
100        count--;
101        size_t old_idx = index;
102        index = count == 0 ? 0 : index + 1;
103        return buffer[old_idx];
104    }
105    request * ret = 0p;
106    return *0p;
107}
108
109// try to reclaim some memory
110static inline void reclaim( copy_queue & this ) with(this) {
111    if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; }
112    utilized = 0;
113    buffer_size--;
114    buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory
115}
116
117static inline bool isEmpty( copy_queue & this ) with(this) { return count == 0; }
118
119static size_t __buffer_size = 10; // C_TODO: rework this to be passed from executor through ctors (no need for global)
120struct work_queue {
121    __spinlock_t mutex_lock;
122    copy_queue owned_queue;
123    copy_queue * c_queue;
124    volatile bool being_processed;
125}; // work_queue
126static inline void ?{}( work_queue & this ) with(this) {
127    owned_queue{ __buffer_size };
128    c_queue = &owned_queue;
129    being_processed = false;
130}
131
132static inline void insert( work_queue & this, request & elem ) with(this) {
133    lock( mutex_lock __cfaabi_dbg_ctx2 );
134    insert( *c_queue, elem );
135    unlock( mutex_lock );
136} // insert
137
138static inline void transfer( work_queue & this, copy_queue ** transfer_to, work_queue ** queue_arr, unsigned int idx ) with(this) {
139    lock( mutex_lock __cfaabi_dbg_ctx2 );
140    #if __STEAL
141
142    #if __STEAL_WORK
143    if (  unlikely( being_processed ) )
144    #else
145    // check if queue has been stolen out from under us between
146    // transfer() call and lock acquire C_TODO: maybe just use new queue!
147    if ( unlikely( being_processed || queue_arr[idx] != &this ) )
148    #endif // __STEAL_WORK
149    {
150        unlock( mutex_lock );
151        return;
152    }
153
154    being_processed = c_queue->count != 0;
155    #endif // __STEAL
156
157    c_queue->utilized = c_queue->count;
158
159    // swap copy queue ptrs
160    copy_queue * temp = *transfer_to;
161    *transfer_to = c_queue;
162    c_queue = temp;
163    unlock( mutex_lock );
164} // transfer
165
166thread worker {
167    work_queue ** request_queues;
168    copy_queue * current_queue;
169    worker ** worker_arr; // C_TODO: change n_workers, n_queues,worker_arr to just be pulled from ptr to executor
170        request & req;
171    unsigned int start, range, empty_count, n_workers, n_queues, id;
172    #ifdef __STEAL_STATS
173    unsigned int try_steal, stolen;
174    #endif
175};
176
177#ifdef __STEAL_STATS
178unsigned int total_tries = 0, total_stolen = 0, total_workers;
179#endif
180static inline void ?{}( worker & this, cluster & clu, work_queue ** request_queues, copy_queue * current_queue, unsigned int start,
181        unsigned int range, worker ** worker_arr, unsigned int n_workers, unsigned int n_queues, unsigned int id ) {
182    ((thread &)this){ clu };
183    this.request_queues = request_queues;
184    this.current_queue = current_queue;
185    this.start = start;
186    this.range = range;
187    this.empty_count = 0;
188    this.n_workers = n_workers;
189    this.worker_arr = worker_arr;
190    this.n_queues = n_queues;
191    this.id = id;
192    #ifdef __STEAL_STATS
193    this.try_steal = 0;
194    this.stolen = 0;
195    total_workers = n_workers;
196    #endif
197}
198static inline void ^?{}( worker & mutex this ) with(this) {
199    // delete( current_queue );
200    #ifdef __STEAL_STATS
201    __atomic_add_fetch(&total_tries, try_steal, __ATOMIC_SEQ_CST);
202    __atomic_add_fetch(&total_stolen, stolen, __ATOMIC_SEQ_CST);
203    if (__atomic_sub_fetch(&total_workers, 1, __ATOMIC_SEQ_CST) == 0)
204        printf("steal attempts: %u, steals: %u\n", total_tries, total_stolen);
205    #endif
206}
207
208struct executor {
209    cluster * cluster;                                                      // if workers execute on separate cluster
210        processor ** processors;                                            // array of virtual processors adding parallelism for workers
211        work_queue * request_queues;                                // master array of work request queues
212    copy_queue * local_queues;                      // array of all worker local queues to avoid deletion race
213        work_queue ** worker_req_queues;                // secondary array of work queues to allow for swapping
214    worker ** workers;                                                          // array of workers executing work requests
215        unsigned int nprocessors, nworkers, nrqueues;   // number of processors/threads/request queues
216        bool seperate_clus;                                                             // use same or separate cluster for executor
217}; // executor
218
219static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) {
220    if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" );
221    __buffer_size = buf_size;
222    this.nprocessors = nprocessors;
223    this.nworkers = nworkers;
224    this.nrqueues = nrqueues;
225    this.seperate_clus = seperate_clus;
226
227    if ( seperate_clus ) {
228        cluster = alloc();
229        (*cluster){};
230    } else cluster = active_cluster();
231
232    request_queues = aalloc( nrqueues );
233    worker_req_queues = aalloc( nrqueues );
234    for ( i; nrqueues ) {
235        request_queues[i]{};
236        worker_req_queues[i] = &request_queues[i];
237    }
238   
239    processors = aalloc( nprocessors );
240    for ( i; nprocessors )
241        (*(processors[i] = alloc())){ *cluster };
242
243    local_queues = aalloc( nworkers );
244    workers = alloc( nworkers );
245    unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
246    for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) {
247        local_queues[i]{ buf_size };
248        range = reqPerWorker + ( i < extras ? 1 : 0 );
249        (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], start, range, workers, nworkers, nrqueues, i };
250    } // for
251}
252static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __buffer_size }; }
253static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues ) { this{ nprocessors, nworkers, nrqueues, __DEFAULT_EXECUTOR_SEPCLUS__ }; }
254static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers ) { this{ nprocessors, nworkers, __DEFAULT_EXECUTOR_RQUEUES__ }; }
255static inline void ?{}( executor & this, unsigned int nprocessors ) { this{ nprocessors, __DEFAULT_EXECUTOR_WORKERS__ }; }
256static inline void ?{}( executor & this ) { this{ __DEFAULT_EXECUTOR_PROCESSORS__ }; }
257
258// C_TODO: once stealing is implemented make sure shutdown still works
259static inline void ^?{}( executor & this ) with(this) {
260    #if __STEAL
261    request sentinels[nrqueues];
262    for ( unsigned int i = 0; i < nrqueues; i++ ) {
263        insert( request_queues[i], sentinels[i] );              // force eventually termination
264    } // for
265    #else
266    request sentinels[nworkers];
267    unsigned int reqPerWorker = nrqueues / nworkers;
268    for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) {
269        insert( request_queues[step], sentinels[i] );           // force eventually termination
270    } // for
271    #endif
272
273    for ( i; nworkers )
274        delete( workers[i] );
275
276    for ( i; nprocessors ) {
277        delete( processors[i] );
278    } // for
279
280    adelete( workers );
281    adelete( local_queues );
282    adelete( request_queues );
283    adelete( worker_req_queues );
284    adelete( processors );
285    if ( seperate_clus ) delete( cluster );
286}
287
288// this is a static field of executor but have to forward decl for get_next_ticket
289static unsigned int __next_ticket = 0;
290
291static inline unsigned int get_next_ticket( executor & this ) with(this) {
292    return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
293} // tickets
294
295// C_TODO: update globals in this file to be static fields once the project is done
296static executor * __actor_executor_ = 0p;
297static bool __actor_executor_passed = false;        // was an executor passed to start_actor_system
298static unsigned long int __num_actors_;                         // number of actor objects in system
299static struct thread$ * __actor_executor_thd = 0p;              // used to wake executor after actors finish
300struct actor {
301    unsigned long int ticket;           // executor-queue handle to provide FIFO message execution
302    Allocation allocation_;                     // allocation action
303};
304
305static inline void ?{}( actor & this ) {
306    // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive
307    // member must be called to end it
308    verifyf( __actor_executor_, "Creating actor before calling start_actor_system()." );
309    this.allocation_ = Nodelete;
310    this.ticket = get_next_ticket( *__actor_executor_ );
311    __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_SEQ_CST );
312}
313static inline void ^?{}( actor & this ) {}
314
315static inline void check_actor( actor & this ) {
316    if ( this.allocation_ != Nodelete ) {
317        switch( this.allocation_ ) {
318            case Delete: delete( &this ); break;
319            case Destroy:
320                CFA_DEBUG( this.ticket = MAX; );        // mark as terminated
321                ^?{}(this);
322                break;
323            case Finished:
324                CFA_DEBUG( this.ticket = MAX; );        // mark as terminated
325                break;
326            default: ;                                                          // stop warning
327        }
328
329        if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_SEQ_CST ) == 0 ) ) { // all actors have terminated
330            unpark( __actor_executor_thd );
331        }
332    }
333}
334
335struct message {
336    Allocation allocation_;                     // allocation action
337};
338
339static inline void ?{}( message & this ) { this.allocation_ = Nodelete; }
340static inline void ?{}( message & this, Allocation allocation ) { this.allocation_ = allocation; }
341static inline void ^?{}( message & this ) {}
342
343static inline void check_message( message & this ) {
344    switch ( this.allocation_ ) {                                               // analyze message status
345        case Nodelete: break;
346        case Delete: delete( &this ); break;
347        case Destroy: ^?{}(this); break;
348        case Finished: break;
349    } // switch
350}
351
352static inline void deliver_request( request & this ) {
353    Allocation actor_allocation = this.fn( *this.receiver, *this.msg );
354    this.receiver->allocation_ = actor_allocation;
355    check_actor( *this.receiver );
356    check_message( *this.msg );
357}
358
359// Couple of ways to approach work stealing
360// 1: completely worker agnostic, just find a big queue and steal it
361// 2: track some heuristic of worker's load and focus on that and then pick a queue from that worker
362//   worker heuristics:
363//     - how many queues have work?
364//     - size of largest queue
365//     - total # of messages
366//     - messages currently servicing
367//     - pick randomly
368//     - pick from closer threads/workers (this can be combined with others)
369
370// lock free or global lock for queue stealing
371#define __LOCK_SWP 0
372
373__spinlock_t swp_lock;
374
375// tries to atomically swap two queues and returns a bool indicating if the swap failed
376static inline bool try_swap_queues( worker & this, unsigned int victim_idx, unsigned int my_idx ) with(this) {
377    #if __LOCK_SWP
378
379    lock( swp_lock __cfaabi_dbg_ctx2 );
380    work_queue * temp = request_queues[my_idx];
381    request_queues[my_idx] = request_queues[victim_idx];
382    request_queues[victim_idx] = temp;
383    unlock( swp_lock );
384   
385    return true;
386
387    #else // __LOCK_SWP else
388    work_queue * my_queue = request_queues[my_idx];
389    work_queue * other_queue = request_queues[victim_idx];
390    if ( other_queue == 0p || my_queue == 0p ) return false;
391
392    // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false
393    if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) )
394        return false;
395
396    // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false
397    if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {
398        /* paranoid */ verify( request_queues[my_idx] == 0p );
399        request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val
400        return false;
401    }
402
403    // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically
404    request_queues[my_idx] = other_queue; // last write does not need to be atomic
405    return true;
406
407    #endif // __LOCK_SWP
408}
409
410// once a worker to steal from has been chosen, choose queue to steal from
411static inline bool choose_queue( worker & this, unsigned int victim_id, unsigned int & last_idx ) with(this) {
412    #if __RAND_QUEUE
413    unsigned int tries = 0;
414    const unsigned int start_idx = prng( n_queues );
415    work_queue * curr_steal_queue;
416
417    for ( unsigned int i = start_idx; tries < n_queues; i = (i + 1) % n_queues ) {
418        tries++;
419        curr_steal_queue = request_queues[i];
420        #if __STEAL_WORK
421
422        // avoid empty queues and queues that are being operated on
423        if ( curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) )
424            continue;
425       
426        // in this case we just return from transfer if this doesn't work
427        transfer( *curr_steal_queue, &current_queue, request_queues, i );
428        if ( isEmpty( *current_queue ) ) continue;
429        last_idx = i;
430
431        #ifdef __STEAL_STATS
432        stolen++;
433        #endif // __STEAL_STATS
434
435        #else // __STEAL_WORK else
436
437        // avoid empty queues and queues that are being operated on
438        if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) )
439            continue;
440
441        #ifdef __STEAL_STATS
442        bool success = try_swap_queues( this, i, last_idx );
443        if ( success ) stolen++;
444        #else
445        try_swap_queues( this, i, last_idx );
446        #endif // __STEAL_STATS
447
448        // C_TODO: try transfer immediately
449        // transfer( *request_queues[last_idx], &current_queue, request_queues, last_idx );
450        // if ( isEmpty( *current_queue ) ) return false;
451        return false;
452
453        #endif // __STEAL_WORK
454
455        return true;
456    } // for
457    return false;
458
459    #elif __RAND_WORKER
460
461    // have to calculate victim start and range since victim may be deleted before us in shutdown
462    const unsigned int queues_per_worker = n_queues / n_workers;
463    const unsigned int extras = n_queues % n_workers;
464    unsigned int vic_start, vic_range;
465    if ( extras > victim_id  ) {
466        vic_range = queues_per_worker + 1;
467        vic_start = vic_range * victim_id;
468    } else {
469        vic_start = extras + victim_id * queues_per_worker;
470        vic_range = queues_per_worker;
471    }
472    unsigned int start_idx = prng( vic_range );
473    unsigned int tries = 0;
474    work_queue * curr_steal_queue;
475
476    for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) {
477        tries++;
478        curr_steal_queue = request_queues[ i + vic_start ];
479        // avoid empty queues and queues that are being operated on
480        if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) )
481            continue;
482
483        try_swap_queues( this, i, last_idx );
484
485        #ifdef __STEAL_STATS
486        bool success = try_swap_queues( this, i, last_idx );
487        if ( success ) stolen++;
488        #else
489        try_swap_queues( this, i, last_idx );
490        #endif // __STEAL_STATS
491
492        // C_TODO: try transfer immediately
493        // transfer( *request_queues[last_idx], &current_queue, request_queues, last_idx );
494        // if ( isEmpty( *current_queue ) ) return false;
495        return false;
496    }
497    #endif
498}
499
500// choose a worker to steal from
501static inline bool choose_victim( worker & this, unsigned int & last_idx ) with(this) {
502    #if __RAND_WORKER
503    unsigned int victim = prng( n_workers );
504    if ( victim == id ) victim = ( victim + 1 ) % n_workers;
505    return choose_queue( this, victim, last_idx );
506    #else
507    return choose_queue( this, 0, last_idx );
508    #endif
509}
510
511// look for work to steal
512// returns a bool: true => a queue was stolen, false => no work was stolen
513static inline bool steal_work( worker & this, unsigned int & last_idx ) with(this) { // C_TODO: add debug tracking of how many steals occur
514    // to steal queue acquire both queue's locks in address ordering (maybe can do atomic swap)
515    // maybe a flag to hint which queue is being processed
516    // look at count to see if queue is worth stealing (dont steal empty queues)
517    // if steal and then flag is up then dont process and just continue looking at own queues
518    // (best effort approach) its ok if stealing isn't fruitful
519    //          -> more important to not delay busy threads
520
521    return choose_victim( this, last_idx );
522}
523
524void main( worker & this ) with(this) {
525    // threshold of empty queues we see before we go stealing
526    const unsigned int steal_threshold = 2 * n_queues;
527    unsigned int curr_idx;
528    work_queue * curr_work_queue;
529    Exit:
530    for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers
531        // C_TODO: potentially check queue count instead of immediately trying to transfer
532        curr_idx = i + start;
533        curr_work_queue = request_queues[curr_idx];
534        transfer( *curr_work_queue, &current_queue, request_queues, curr_idx );
535        if ( isEmpty( *current_queue ) ) {
536            #if __STEAL
537            empty_count++;
538            if ( empty_count < steal_threshold ) continue;
539            empty_count = 0; // C_TODO: look into stealing backoff schemes
540            #ifdef __STEAL_STATS
541            try_steal++;
542            #endif // __STEAL_STATS
543
544            if ( ! steal_work( this, curr_idx ) ) continue;
545
546            #else // __STEAL else
547
548            continue;
549           
550            #endif // __STEAL
551        }
552        while ( ! isEmpty( *current_queue ) ) {
553            &req = &remove( *current_queue );
554            if ( !&req ) continue; // possibly add some work stealing/idle sleep here
555            if ( req.stop ) break Exit;
556            deliver_request( req );
557        }
558        #if __STEAL
559        curr_work_queue->being_processed = false; // set done processing
560        #endif
561        empty_count = 0; // we found work so reset empty counter
562        reclaim( *current_queue );
563    } // for
564}
565
566static inline void send( executor & this, request & req, unsigned long int ticket ) with(this) {
567    insert( request_queues[ticket], req);
568}
569
570static inline void send( actor & this, request & req ) {
571    send( *__actor_executor_, req, this.ticket );
572}
573
574static inline void start_actor_system( size_t num_thds ) {
575    __actor_executor_thd = active_thread();
576    __actor_executor_ = alloc();
577    (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 };
578}
579
580static inline void start_actor_system() { start_actor_system( active_cluster()->procs.total ); }
581
582static inline void start_actor_system( executor & this ) {
583    __actor_executor_thd = active_thread();
584    __actor_executor_ = &this;
585    __actor_executor_passed = true;
586}
587
588static inline void stop_actor_system() {
589    park( ); // will receive signal when actor system is finished
590
591    if ( !__actor_executor_passed ) delete( __actor_executor_ );
592    __actor_executor_ = 0p;
593    __actor_executor_thd = 0p;
594    __next_ticket = 0;
595    __actor_executor_passed = false;
596}
Note: See TracBrowser for help on using the repository browser.