source: libcfa/src/concurrency/actor.hfa @ ecfe574

ADTast-experimental
Last change on this file since ecfe574 was ecfe574, checked in by caparsons <caparson@…>, 20 months ago

added envelope copying to avoid allocations

  • Property mode set to 100644
File size: 11.6 KB
Line 
1#pragma once
2
3#include <locks.hfa>
4#include <limits.hfa>
5#include <list.hfa>
6#include <kernel.hfa>
7
8#ifdef __CFA_DEBUG__
9#define CFA_DEBUG( stmt ) stmt
10#else
11#define CFA_DEBUG( stmt )
12#endif // CFA_DEBUG
13
14// Define the default number of processors created in the executor. Must be greater than 0.
15#define __DEFAULT_EXECUTOR_PROCESSORS__ 2
16
17// Define the default number of threads created in the executor. Must be greater than 0.
18#define __DEFAULT_EXECUTOR_WORKERS__ 2
19
20// Define the default number of executor request-queues (mailboxes) written to by actors and serviced by the
21// actor-executor threads. Must be greater than 0.
22#define __DEFAULT_EXECUTOR_RQUEUES__ 2
23
24// Define if executor is created in a separate cluster
25#define __DEFAULT_EXECUTOR_SEPCLUS__ false
26
27// forward decls
28struct actor;
29struct message;
30
31enum Allocation { Nodelete, Delete, Destroy, Finished }; // allocation status
32
33typedef Allocation (*__receive_fn)(actor &, message &);
34struct request {
35    actor * receiver;
36    message * msg;
37    __receive_fn fn;
38    bool stop;
39    inline dlink(request);
40};
41P9_EMBEDDED( request, dlink(request) )
42
43static inline void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel
44static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) {
45    this.receiver = receiver;
46    this.msg = msg;
47    this.fn = fn;
48    this.stop = false;
49}
50static inline void ?{}( request & this, request & copy ) {
51    this.receiver = copy.receiver;
52    this.msg = copy.msg;
53    this.fn = copy.fn;
54    this.stop = copy.stop;
55}
56
57// hybrid data structure. Copies until buffer is full and then allocates for intrusive list
58struct copy_queue {
59    dlist( request ) list;
60    request * buffer;
61    size_t count, buffer_size;
62};
63static inline void ?{}( copy_queue & this ) {}
64static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) {
65    list{};
66    buffer_size = buf_size;
67    buffer = aalloc( buffer_size );
68    count = 0;
69}
70static inline void ^?{}( copy_queue & this ) with(this) { adelete(buffer); }
71
72static inline void insert( copy_queue & this, request & elem ) with(this) {
73    if ( count < buffer_size ) { // fast path ( no alloc )
74        buffer[count]{ elem };
75        count++;
76        return;
77    }
78    request * new_elem = alloc();
79    (*new_elem){ elem };
80    insert_last( list, *new_elem );
81}
82
83// once you start removing you need to remove all elements
84// it is not supported to call insert() before the list is fully empty
85// should_delete is an output param
86static inline request & remove( copy_queue & this, bool & should_delete ) with(this) {
87    if ( count > 0 ) {
88        count--;
89        should_delete = false;
90        return buffer[count];
91    }
92    should_delete = true;
93    return try_pop_front( list );
94}
95
96static inline bool isEmpty( copy_queue & this ) with(this) { return count == 0 && list`isEmpty; }
97
98static size_t __buffer_size = 10; // C_TODO: rework this to be passed from executor through ctors (no need for global)
99struct work_queue {
100    futex_mutex mutex_lock;
101    copy_queue * c_queue; // C_TODO: try putting this on the stack with ptr juggling
102}; // work_queue
103static inline void ?{}( work_queue & this ) with(this) {
104    c_queue = alloc();
105    (*c_queue){ __buffer_size }; // C_TODO: support passing copy buff size as arg to executor
106}
107static inline void ^?{}( work_queue & this ) with(this) { delete( c_queue ); }
108
109static inline void insert( work_queue & this, request & elem ) with(this) {
110    lock( mutex_lock );
111    insert( *c_queue, elem );
112    unlock( mutex_lock );
113} // insert
114
115static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) {
116    lock( mutex_lock );
117    // swap copy queue ptrs
118    copy_queue * temp = *transfer_to;
119    *transfer_to = c_queue;
120    c_queue = temp;
121    unlock( mutex_lock );
122} // transfer
123
124thread worker {
125    work_queue * request_queues;
126    copy_queue * current_queue;
127        request & req;
128    unsigned int start, range;
129};
130
131static inline void ?{}( worker & this, cluster & clu, work_queue * request_queues, unsigned int start, unsigned int range ) {
132    ((thread &)this){ clu };
133    this.request_queues = request_queues;
134    this.current_queue = alloc();
135    (*this.current_queue){ __buffer_size };
136    this.start = start;
137    this.range = range;
138}
139static inline void ^?{}( worker & mutex this ) with(this) { delete( current_queue ); }
140
141struct executor {
142    cluster * cluster;                                                      // if workers execute on separate cluster
143        processor ** processors;                                            // array of virtual processors adding parallelism for workers
144        work_queue * request_queues;                                // master list of work request queues
145        worker ** workers;                                                              // array of workers executing work requests
146        unsigned int nprocessors, nworkers, nrqueues;   // number of processors/threads/request queues
147        bool seperate_clus;                                                             // use same or separate cluster for executor
148}; // executor
149
150static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) {
151    if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" );
152    __buffer_size = buf_size;
153    this.nprocessors = nprocessors;
154    this.nworkers = nworkers;
155    this.nrqueues = nrqueues;
156    this.seperate_clus = seperate_clus;
157
158    if ( seperate_clus ) {
159        cluster = alloc();
160        (*cluster){};
161    } else cluster = active_cluster();
162
163    request_queues = aalloc( nrqueues );
164    for ( i; nrqueues )
165        request_queues[i]{};
166   
167    processors = aalloc( nprocessors );
168    for ( i; nprocessors )
169        (*(processors[i] = alloc())){ *cluster };
170
171    workers = alloc( nworkers );
172    unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
173    for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) {
174        range = reqPerWorker + ( i < extras ? 1 : 0 );
175        (*(workers[i] = alloc())){ *cluster, request_queues, start, range };
176    } // for
177}
178static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __buffer_size }; }
179static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues ) { this{ nprocessors, nworkers, nrqueues, __DEFAULT_EXECUTOR_SEPCLUS__ }; }
180static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers ) { this{ nprocessors, nworkers, __DEFAULT_EXECUTOR_RQUEUES__ }; }
181static inline void ?{}( executor & this, unsigned int nprocessors ) { this{ nprocessors, __DEFAULT_EXECUTOR_WORKERS__ }; }
182static inline void ?{}( executor & this ) { this{ __DEFAULT_EXECUTOR_PROCESSORS__ }; }
183
184static inline void ^?{}( executor & this ) with(this) {
185    request sentinels[nworkers];
186    unsigned int reqPerWorker = nrqueues / nworkers;
187    for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) {
188        insert( request_queues[step], sentinels[i] );           // force eventually termination
189    } // for
190
191    for ( i; nworkers )
192        delete( workers[i] );
193
194    for ( i; nprocessors ) {
195        delete( processors[i] );
196    } // for
197
198    adelete( workers );
199    adelete( request_queues );
200    adelete( processors );
201    if ( seperate_clus ) delete( cluster );
202}
203
204// this is a static field of executor but have to forward decl for get_next_ticket
205static unsigned int __next_ticket = 0;
206
207static inline unsigned int get_next_ticket( executor & this ) with(this) {
208    return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
209} // tickets
210
211// C_TODO: update globals in this file to be static fields once the project is done
212static executor * __actor_executor_ = 0p;
213static bool __actor_executor_passed = false;        // was an executor passed to start_actor_system
214static unsigned long int __num_actors_;                         // number of actor objects in system
215static struct thread$ * __actor_executor_thd = 0p;              // used to wake executor after actors finish
216struct actor {
217    unsigned long int ticket;           // executor-queue handle to provide FIFO message execution
218    Allocation allocation_;                     // allocation action
219};
220
221static inline void ?{}( actor & this ) {
222    // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive
223    // member must be called to end it
224    verifyf( __actor_executor_, "Creating actor before calling start_actor_system()." );
225    this.allocation_ = Nodelete;
226    this.ticket = get_next_ticket( *__actor_executor_ );
227    __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_SEQ_CST );
228}
229static inline void ^?{}( actor & this ) {}
230
231static inline void check_actor( actor & this ) {
232    if ( this.allocation_ != Nodelete ) {
233        switch( this.allocation_ ) {
234            case Delete: delete( &this ); break;
235            case Destroy:
236                CFA_DEBUG( this.ticket = MAX; );        // mark as terminated
237                ^?{}(this);
238                break;
239            case Finished:
240                CFA_DEBUG( this.ticket = MAX; );        // mark as terminated
241                break;
242            default: ;                                                          // stop warning
243        }
244
245        if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_SEQ_CST ) == 0 ) ) { // all actors have terminated
246            unpark( __actor_executor_thd );
247        }
248    }
249}
250
251struct message {
252    Allocation allocation_;                     // allocation action
253};
254
255static inline void ?{}( message & this ) { this.allocation_ = Nodelete; }
256static inline void ?{}( message & this, Allocation allocation ) { this.allocation_ = allocation; }
257static inline void ^?{}( message & this ) {}
258
259static inline void check_message( message & this ) {
260    switch ( this.allocation_ ) {                                               // analyze message status
261        case Nodelete: break;
262        case Delete: delete( &this ); break;
263        case Destroy: ^?{}(this); break;
264        case Finished: break;
265    } // switch
266}
267
268static inline void deliver_request( request & this ) {
269    Allocation actor_allocation = this.fn( *this.receiver, *this.msg );
270    this.receiver->allocation_ = actor_allocation;
271    check_actor( *this.receiver );
272    check_message( *this.msg );
273}
274
275void main( worker & this ) with(this) {
276    bool should_delete;
277    Exit:
278    for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers
279        // C_TODO: potentially check queue count instead of immediately trying to transfer
280        transfer( request_queues[i + start], &current_queue );
281        while ( ! isEmpty( *current_queue ) ) {
282            &req = &remove( *current_queue, should_delete );
283            if ( !&req ) continue; // possibly add some work stealing/idle sleep here
284            if ( req.stop ) break Exit;
285            deliver_request( req );
286
287            if ( should_delete ) delete( &req );
288        } // while
289    } // for
290}
291
292static inline void send( executor & this, request & req, unsigned long int ticket ) with(this) {
293    insert( request_queues[ticket], req);
294}
295
296static inline void send( actor & this, request & req ) {
297    send( *__actor_executor_, req, this.ticket );
298}
299
300static inline void start_actor_system( size_t num_thds ) {
301    __actor_executor_thd = active_thread();
302    __actor_executor_ = alloc();
303    (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 };
304}
305
306static inline void start_actor_system() { start_actor_system( active_cluster()->procs.total ); }
307
308static inline void start_actor_system( executor & this ) {
309    __actor_executor_thd = active_thread();
310    __actor_executor_ = &this;
311    __actor_executor_passed = true;
312}
313
314static inline void stop_actor_system() {
315    park( ); // will receive signal when actor system is finished
316
317    if ( !__actor_executor_passed ) delete( __actor_executor_ );
318    __actor_executor_ = 0p;
319    __actor_executor_thd = 0p;
320    __next_ticket = 0;
321    __actor_executor_passed = false;
322}
Note: See TracBrowser for help on using the repository browser.