source: libcfa/src/concurrency/actor.hfa @ a64137f

ADTast-experimental
Last change on this file since a64137f was 5c473c9, checked in by caparsons <caparson@…>, 18 months ago

fixed bug where I used buffer as a stack not a queue

  • Property mode set to 100644
File size: 11.7 KB
Line 
1#pragma once
2
3#include <locks.hfa>
4#include <limits.hfa>
5#include <list.hfa>
6#include <kernel.hfa>
7
8#ifdef __CFA_DEBUG__
9#define CFA_DEBUG( stmt ) stmt
10#else
11#define CFA_DEBUG( stmt )
12#endif // CFA_DEBUG
13
14// Define the default number of processors created in the executor. Must be greater than 0.
15#define __DEFAULT_EXECUTOR_PROCESSORS__ 2
16
17// Define the default number of threads created in the executor. Must be greater than 0.
18#define __DEFAULT_EXECUTOR_WORKERS__ 2
19
20// Define the default number of executor request-queues (mailboxes) written to by actors and serviced by the
21// actor-executor threads. Must be greater than 0.
22#define __DEFAULT_EXECUTOR_RQUEUES__ 2
23
24// Define if executor is created in a separate cluster
25#define __DEFAULT_EXECUTOR_SEPCLUS__ false
26
27// forward decls
28struct actor;
29struct message;
30
31enum Allocation { Nodelete, Delete, Destroy, Finished }; // allocation status
32
33typedef Allocation (*__receive_fn)(actor &, message &);
34struct request {
35    actor * receiver;
36    message * msg;
37    __receive_fn fn;
38    bool stop;
39    inline dlink(request);
40};
41P9_EMBEDDED( request, dlink(request) )
42
43static inline void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel
44static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) {
45    this.receiver = receiver;
46    this.msg = msg;
47    this.fn = fn;
48    this.stop = false;
49}
50static inline void ?{}( request & this, request & copy ) {
51    this.receiver = copy.receiver;
52    this.msg = copy.msg;
53    this.fn = copy.fn;
54    this.stop = copy.stop;
55}
56
57// hybrid data structure. Copies until buffer is full and then allocates for intrusive list
58struct copy_queue {
59    dlist( request ) list;
60    request * buffer;
61    size_t count, buffer_size, index;
62};
63static inline void ?{}( copy_queue & this ) {}
64static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) {
65    list{};
66    buffer_size = buf_size;
67    buffer = aalloc( buffer_size );
68    count = 0;
69    index = 0;
70}
71static inline void ^?{}( copy_queue & this ) with(this) { adelete(buffer); }
72
73static inline void insert( copy_queue & this, request & elem ) with(this) {
74    if ( count < buffer_size ) { // fast path ( no alloc )
75        buffer[count]{ elem };
76        count++;
77        return;
78    }
79    request * new_elem = alloc();
80    (*new_elem){ elem };
81    insert_last( list, *new_elem );
82}
83
84// once you start removing you need to remove all elements
85// it is not supported to call insert() before the list is fully empty
86// should_delete is an output param
87static inline request & remove( copy_queue & this, bool & should_delete ) with(this) {
88    if ( count > 0 ) {
89        count--;
90        should_delete = false;
91        size_t old_idx = index;
92        index = count == 0 ? 0 : index + 1;
93        return buffer[old_idx];
94    }
95    should_delete = true;
96    return try_pop_front( list );
97}
98
99static inline bool isEmpty( copy_queue & this ) with(this) { return count == 0 && list`isEmpty; }
100
101static size_t __buffer_size = 10; // C_TODO: rework this to be passed from executor through ctors (no need for global)
102struct work_queue {
103    futex_mutex mutex_lock;
104    copy_queue * c_queue; // C_TODO: try putting this on the stack with ptr juggling
105}; // work_queue
106static inline void ?{}( work_queue & this ) with(this) {
107    c_queue = alloc();
108    (*c_queue){ __buffer_size }; // C_TODO: support passing copy buff size as arg to executor
109}
110static inline void ^?{}( work_queue & this ) with(this) { delete( c_queue ); }
111
112static inline void insert( work_queue & this, request & elem ) with(this) {
113    lock( mutex_lock );
114    insert( *c_queue, elem );
115    unlock( mutex_lock );
116} // insert
117
118static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) {
119    lock( mutex_lock );
120    // swap copy queue ptrs
121    copy_queue * temp = *transfer_to;
122    *transfer_to = c_queue;
123    c_queue = temp;
124    unlock( mutex_lock );
125} // transfer
126
127thread worker {
128    work_queue * request_queues;
129    copy_queue * current_queue;
130        request & req;
131    unsigned int start, range;
132};
133
134static inline void ?{}( worker & this, cluster & clu, work_queue * request_queues, unsigned int start, unsigned int range ) {
135    ((thread &)this){ clu };
136    this.request_queues = request_queues;
137    this.current_queue = alloc();
138    (*this.current_queue){ __buffer_size };
139    this.start = start;
140    this.range = range;
141}
142static inline void ^?{}( worker & mutex this ) with(this) { delete( current_queue ); }
143
144struct executor {
145    cluster * cluster;                                                      // if workers execute on separate cluster
146        processor ** processors;                                            // array of virtual processors adding parallelism for workers
147        work_queue * request_queues;                                // master list of work request queues
148        worker ** workers;                                                              // array of workers executing work requests
149        unsigned int nprocessors, nworkers, nrqueues;   // number of processors/threads/request queues
150        bool seperate_clus;                                                             // use same or separate cluster for executor
151}; // executor
152
153static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) {
154    if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" );
155    __buffer_size = buf_size;
156    this.nprocessors = nprocessors;
157    this.nworkers = nworkers;
158    this.nrqueues = nrqueues;
159    this.seperate_clus = seperate_clus;
160
161    if ( seperate_clus ) {
162        cluster = alloc();
163        (*cluster){};
164    } else cluster = active_cluster();
165
166    request_queues = aalloc( nrqueues );
167    for ( i; nrqueues )
168        request_queues[i]{};
169   
170    processors = aalloc( nprocessors );
171    for ( i; nprocessors )
172        (*(processors[i] = alloc())){ *cluster };
173
174    workers = alloc( nworkers );
175    unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
176    for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) {
177        range = reqPerWorker + ( i < extras ? 1 : 0 );
178        (*(workers[i] = alloc())){ *cluster, request_queues, start, range };
179    } // for
180}
181static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __buffer_size }; }
182static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues ) { this{ nprocessors, nworkers, nrqueues, __DEFAULT_EXECUTOR_SEPCLUS__ }; }
183static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers ) { this{ nprocessors, nworkers, __DEFAULT_EXECUTOR_RQUEUES__ }; }
184static inline void ?{}( executor & this, unsigned int nprocessors ) { this{ nprocessors, __DEFAULT_EXECUTOR_WORKERS__ }; }
185static inline void ?{}( executor & this ) { this{ __DEFAULT_EXECUTOR_PROCESSORS__ }; }
186
187static inline void ^?{}( executor & this ) with(this) {
188    request sentinels[nworkers];
189    unsigned int reqPerWorker = nrqueues / nworkers;
190    for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) {
191        insert( request_queues[step], sentinels[i] );           // force eventually termination
192    } // for
193
194    for ( i; nworkers )
195        delete( workers[i] );
196
197    for ( i; nprocessors ) {
198        delete( processors[i] );
199    } // for
200
201    adelete( workers );
202    adelete( request_queues );
203    adelete( processors );
204    if ( seperate_clus ) delete( cluster );
205}
206
207// this is a static field of executor but have to forward decl for get_next_ticket
208static unsigned int __next_ticket = 0;
209
210static inline unsigned int get_next_ticket( executor & this ) with(this) {
211    return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
212} // tickets
213
214// C_TODO: update globals in this file to be static fields once the project is done
215static executor * __actor_executor_ = 0p;
216static bool __actor_executor_passed = false;        // was an executor passed to start_actor_system
217static unsigned long int __num_actors_;                         // number of actor objects in system
218static struct thread$ * __actor_executor_thd = 0p;              // used to wake executor after actors finish
219struct actor {
220    unsigned long int ticket;           // executor-queue handle to provide FIFO message execution
221    Allocation allocation_;                     // allocation action
222};
223
224static inline void ?{}( actor & this ) {
225    // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive
226    // member must be called to end it
227    verifyf( __actor_executor_, "Creating actor before calling start_actor_system()." );
228    this.allocation_ = Nodelete;
229    this.ticket = get_next_ticket( *__actor_executor_ );
230    __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_SEQ_CST );
231}
232static inline void ^?{}( actor & this ) {}
233
234static inline void check_actor( actor & this ) {
235    if ( this.allocation_ != Nodelete ) {
236        switch( this.allocation_ ) {
237            case Delete: delete( &this ); break;
238            case Destroy:
239                CFA_DEBUG( this.ticket = MAX; );        // mark as terminated
240                ^?{}(this);
241                break;
242            case Finished:
243                CFA_DEBUG( this.ticket = MAX; );        // mark as terminated
244                break;
245            default: ;                                                          // stop warning
246        }
247
248        if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_SEQ_CST ) == 0 ) ) { // all actors have terminated
249            unpark( __actor_executor_thd );
250        }
251    }
252}
253
254struct message {
255    Allocation allocation_;                     // allocation action
256};
257
258static inline void ?{}( message & this ) { this.allocation_ = Nodelete; }
259static inline void ?{}( message & this, Allocation allocation ) { this.allocation_ = allocation; }
260static inline void ^?{}( message & this ) {}
261
262static inline void check_message( message & this ) {
263    switch ( this.allocation_ ) {                                               // analyze message status
264        case Nodelete: break;
265        case Delete: delete( &this ); break;
266        case Destroy: ^?{}(this); break;
267        case Finished: break;
268    } // switch
269}
270
271static inline void deliver_request( request & this ) {
272    Allocation actor_allocation = this.fn( *this.receiver, *this.msg );
273    this.receiver->allocation_ = actor_allocation;
274    check_actor( *this.receiver );
275    check_message( *this.msg );
276}
277
278void main( worker & this ) with(this) {
279    bool should_delete;
280    Exit:
281    for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers
282        // C_TODO: potentially check queue count instead of immediately trying to transfer
283        transfer( request_queues[i + start], &current_queue );
284        while ( ! isEmpty( *current_queue ) ) {
285            &req = &remove( *current_queue, should_delete );
286            if ( !&req ) continue; // possibly add some work stealing/idle sleep here
287            if ( req.stop ) break Exit;
288            deliver_request( req );
289
290            if ( should_delete ) delete( &req );
291        } // while
292    } // for
293}
294
295static inline void send( executor & this, request & req, unsigned long int ticket ) with(this) {
296    insert( request_queues[ticket], req);
297}
298
299static inline void send( actor & this, request & req ) {
300    send( *__actor_executor_, req, this.ticket );
301}
302
303static inline void start_actor_system( size_t num_thds ) {
304    __actor_executor_thd = active_thread();
305    __actor_executor_ = alloc();
306    (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 };
307}
308
309static inline void start_actor_system() { start_actor_system( active_cluster()->procs.total ); }
310
311static inline void start_actor_system( executor & this ) {
312    __actor_executor_thd = active_thread();
313    __actor_executor_ = &this;
314    __actor_executor_passed = true;
315}
316
317static inline void stop_actor_system() {
318    park( ); // will receive signal when actor system is finished
319
320    if ( !__actor_executor_passed ) delete( __actor_executor_ );
321    __actor_executor_ = 0p;
322    __actor_executor_thd = 0p;
323    __next_ticket = 0;
324    __actor_executor_passed = false;
325}
Note: See TracBrowser for help on using the repository browser.