source: libcfa/src/concurrency/actor.hfa @ 2ed94a9

ADT
Last change on this file since 2ed94a9 was ccf1d99, checked in by caparsons <caparson@…>, 18 months ago

intermediate push so I can move to nasus to performance test some stuff

  • Property mode set to 100644
File size: 12.3 KB
Line 
1#pragma once
2
3#include <locks.hfa>
4#include <limits.hfa>
5#include <list.hfa>
6#include <kernel.hfa>
7
8#ifdef __CFA_DEBUG__
9#define CFA_DEBUG( stmt ) stmt
10#else
11#define CFA_DEBUG( stmt )
12#endif // CFA_DEBUG
13
14// Define the default number of processors created in the executor. Must be greater than 0.
15#define __DEFAULT_EXECUTOR_PROCESSORS__ 2
16
17// Define the default number of threads created in the executor. Must be greater than 0.
18#define __DEFAULT_EXECUTOR_WORKERS__ 2
19
20// Define the default number of executor request-queues (mailboxes) written to by actors and serviced by the
21// actor-executor threads. Must be greater than 0.
22#define __DEFAULT_EXECUTOR_RQUEUES__ 2
23
24// Define if executor is created in a separate cluster
25#define __DEFAULT_EXECUTOR_SEPCLUS__ false
26
27// when you flip this make sure to recompile compiler and flip the appropriate flag there too in Actors.cpp
28#define __ALLOC 0
29
30// forward decls
31struct actor;
32struct message;
33
34enum Allocation { Nodelete, Delete, Destroy, Finished }; // allocation status
35
36typedef Allocation (*__receive_fn)(actor &, message &);
37struct request {
38    actor * receiver;
39    message * msg;
40    __receive_fn fn;
41    bool stop;
42    inline dlink(request);
43};
44P9_EMBEDDED( request, dlink(request) )
45
46static inline void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel
47static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) {
48    this.receiver = receiver;
49    this.msg = msg;
50    this.fn = fn;
51    this.stop = false;
52}
53static inline void ?{}( request & this, request & copy ) {
54    this.receiver = copy.receiver;
55    this.msg = copy.msg;
56    this.fn = copy.fn;
57    this.stop = copy.stop;
58}
59
60// hybrid data structure. Copies until buffer is full and then allocates for intrusive list
61struct copy_queue {
62    dlist( request ) list;
63    #if ! __ALLOC
64    request * buffer;
65    size_t count, buffer_size, index;
66    #endif
67};
68static inline void ?{}( copy_queue & this ) {}
69static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) {
70    list{};
71    #if ! __ALLOC
72    buffer_size = buf_size;
73    buffer = aalloc( buffer_size );
74    count = 0;
75    index = 0;
76    #endif
77}
78static inline void ^?{}( copy_queue & this ) with(this) {
79    #if ! __ALLOC
80    adelete(buffer);
81    #endif
82}
83
84static inline void insert( copy_queue & this, request & elem ) with(this) {
85    #if ! __ALLOC
86    if ( count < buffer_size ) { // fast path ( no alloc )
87        buffer[count]{ elem };
88        count++;
89        return;
90    }
91    request * new_elem = alloc();
92    (*new_elem){ elem };
93    insert_last( list, *new_elem );
94    #else
95    insert_last( list, elem );
96    #endif
97}
98
99// once you start removing you need to remove all elements
100// it is not supported to call insert() before the list is fully empty
101// should_delete is an output param
102static inline request & remove( copy_queue & this, bool & should_delete ) with(this) {
103    #if ! __ALLOC
104    if ( count > 0 ) {
105        count--;
106        should_delete = false;
107        size_t old_idx = index;
108        index = count == 0 ? 0 : index + 1;
109        return buffer[old_idx];
110    }
111    #endif
112    should_delete = true;
113    return try_pop_front( list );
114}
115
116static inline bool isEmpty( copy_queue & this ) with(this) {
117    #if ! __ALLOC
118    return count == 0 && list`isEmpty;
119    #else
120    return list`isEmpty;
121    #endif
122}
123
124static size_t __buffer_size = 10; // C_TODO: rework this to be passed from executor through ctors (no need for global)
125struct work_queue {
126    __spinlock_t mutex_lock;
127    copy_queue owned_queue;
128    copy_queue * c_queue; // C_TODO: try putting this on the stack with ptr juggling
129
130}; // work_queue
131static inline void ?{}( work_queue & this ) with(this) {
132    // c_queue = alloc();
133    // (*c_queue){ __buffer_size };
134    owned_queue{ __buffer_size };
135    c_queue = &owned_queue;
136}
137// static inline void ^?{}( work_queue & this ) with(this) { delete( c_queue ); }
138
139static inline void insert( work_queue & this, request & elem ) with(this) {
140    lock( mutex_lock __cfaabi_dbg_ctx2 );
141    insert( *c_queue, elem );
142    unlock( mutex_lock );
143} // insert
144
145static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) {
146    lock( mutex_lock __cfaabi_dbg_ctx2 );
147    // swap copy queue ptrs
148    copy_queue * temp = *transfer_to;
149    *transfer_to = c_queue;
150    c_queue = temp;
151    unlock( mutex_lock );
152} // transfer
153
154thread worker {
155    copy_queue owned_queue;
156    work_queue * request_queues;
157    copy_queue * current_queue;
158        request & req;
159    unsigned int start, range;
160};
161
162static inline void ?{}( worker & this, cluster & clu, work_queue * request_queues, unsigned int start, unsigned int range ) {
163    ((thread &)this){ clu };
164    this.request_queues = request_queues;
165    // this.current_queue = alloc();
166    // (*this.current_queue){ __buffer_size };
167    this.owned_queue{ __buffer_size };
168    this.current_queue = &this.owned_queue;
169    this.start = start;
170    this.range = range;
171}
172// static inline void ^?{}( worker & mutex this ) with(this) { delete( current_queue ); }
173
174struct executor {
175    cluster * cluster;                                                      // if workers execute on separate cluster
176        processor ** processors;                                            // array of virtual processors adding parallelism for workers
177        work_queue * request_queues;                                // master list of work request queues
178        worker ** workers;                                                              // array of workers executing work requests
179        unsigned int nprocessors, nworkers, nrqueues;   // number of processors/threads/request queues
180        bool seperate_clus;                                                             // use same or separate cluster for executor
181}; // executor
182
183static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) {
184    if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" );
185    __buffer_size = buf_size;
186    this.nprocessors = nprocessors;
187    this.nworkers = nworkers;
188    this.nrqueues = nrqueues;
189    this.seperate_clus = seperate_clus;
190
191    if ( seperate_clus ) {
192        cluster = alloc();
193        (*cluster){};
194    } else cluster = active_cluster();
195
196    request_queues = aalloc( nrqueues );
197    for ( i; nrqueues )
198        request_queues[i]{};
199   
200    processors = aalloc( nprocessors );
201    for ( i; nprocessors )
202        (*(processors[i] = alloc())){ *cluster };
203
204    workers = alloc( nworkers );
205    unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
206    for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) {
207        range = reqPerWorker + ( i < extras ? 1 : 0 );
208        (*(workers[i] = alloc())){ *cluster, request_queues, start, range };
209    } // for
210}
211static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __buffer_size }; }
212static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues ) { this{ nprocessors, nworkers, nrqueues, __DEFAULT_EXECUTOR_SEPCLUS__ }; }
213static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers ) { this{ nprocessors, nworkers, __DEFAULT_EXECUTOR_RQUEUES__ }; }
214static inline void ?{}( executor & this, unsigned int nprocessors ) { this{ nprocessors, __DEFAULT_EXECUTOR_WORKERS__ }; }
215static inline void ?{}( executor & this ) { this{ __DEFAULT_EXECUTOR_PROCESSORS__ }; }
216
217static inline void ^?{}( executor & this ) with(this) {
218    request sentinels[nworkers];
219    unsigned int reqPerWorker = nrqueues / nworkers;
220    for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) {
221        insert( request_queues[step], sentinels[i] );           // force eventually termination
222    } // for
223
224    for ( i; nworkers )
225        delete( workers[i] );
226
227    for ( i; nprocessors ) {
228        delete( processors[i] );
229    } // for
230
231    adelete( workers );
232    adelete( request_queues );
233    adelete( processors );
234    if ( seperate_clus ) delete( cluster );
235}
236
237// this is a static field of executor but have to forward decl for get_next_ticket
238static unsigned int __next_ticket = 0;
239
240static inline unsigned int get_next_ticket( executor & this ) with(this) {
241    return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
242} // tickets
243
244// C_TODO: update globals in this file to be static fields once the project is done
245static executor * __actor_executor_ = 0p;
246static bool __actor_executor_passed = false;        // was an executor passed to start_actor_system
247static unsigned long int __num_actors_;                         // number of actor objects in system
248static struct thread$ * __actor_executor_thd = 0p;              // used to wake executor after actors finish
249struct actor {
250    unsigned long int ticket;           // executor-queue handle to provide FIFO message execution
251    Allocation allocation_;                     // allocation action
252};
253
254static inline void ?{}( actor & this ) {
255    // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive
256    // member must be called to end it
257    verifyf( __actor_executor_, "Creating actor before calling start_actor_system()." );
258    this.allocation_ = Nodelete;
259    this.ticket = get_next_ticket( *__actor_executor_ );
260    __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_SEQ_CST );
261}
262static inline void ^?{}( actor & this ) {}
263
264static inline void check_actor( actor & this ) {
265    if ( this.allocation_ != Nodelete ) {
266        switch( this.allocation_ ) {
267            case Delete: delete( &this ); break;
268            case Destroy:
269                CFA_DEBUG( this.ticket = MAX; );        // mark as terminated
270                ^?{}(this);
271                break;
272            case Finished:
273                CFA_DEBUG( this.ticket = MAX; );        // mark as terminated
274                break;
275            default: ;                                                          // stop warning
276        }
277
278        if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_SEQ_CST ) == 0 ) ) { // all actors have terminated
279            unpark( __actor_executor_thd );
280        }
281    }
282}
283
284struct message {
285    Allocation allocation_;                     // allocation action
286};
287
288static inline void ?{}( message & this ) { this.allocation_ = Nodelete; }
289static inline void ?{}( message & this, Allocation allocation ) { this.allocation_ = allocation; }
290static inline void ^?{}( message & this ) {}
291
292static inline void check_message( message & this ) {
293    switch ( this.allocation_ ) {                                               // analyze message status
294        case Nodelete: break;
295        case Delete: delete( &this ); break;
296        case Destroy: ^?{}(this); break;
297        case Finished: break;
298    } // switch
299}
300
301static inline void deliver_request( request & this ) {
302    Allocation actor_allocation = this.fn( *this.receiver, *this.msg );
303    this.receiver->allocation_ = actor_allocation;
304    check_actor( *this.receiver );
305    check_message( *this.msg );
306}
307
308void main( worker & this ) with(this) {
309    bool should_delete;
310    Exit:
311    for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers
312        // C_TODO: potentially check queue count instead of immediately trying to transfer
313        transfer( request_queues[i + start], &current_queue );
314        while ( ! isEmpty( *current_queue ) ) {
315            &req = &remove( *current_queue, should_delete );
316            if ( !&req ) continue; // possibly add some work stealing/idle sleep here
317            if ( req.stop ) break Exit;
318            deliver_request( req );
319
320            if ( should_delete ) delete( &req );
321        } // while
322    } // for
323}
324
325static inline void send( executor & this, request & req, unsigned long int ticket ) with(this) {
326    insert( request_queues[ticket], req);
327}
328
329static inline void send( actor & this, request & req ) {
330    send( *__actor_executor_, req, this.ticket );
331}
332
333static inline void start_actor_system( size_t num_thds ) {
334    __actor_executor_thd = active_thread();
335    __actor_executor_ = alloc();
336    (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 };
337}
338
339static inline void start_actor_system() { start_actor_system( active_cluster()->procs.total ); }
340
341static inline void start_actor_system( executor & this ) {
342    __actor_executor_thd = active_thread();
343    __actor_executor_ = &this;
344    __actor_executor_passed = true;
345}
346
347static inline void stop_actor_system() {
348    park( ); // will receive signal when actor system is finished
349
350    if ( !__actor_executor_passed ) delete( __actor_executor_ );
351    __actor_executor_ = 0p;
352    __actor_executor_thd = 0p;
353    __next_ticket = 0;
354    __actor_executor_passed = false;
355}
Note: See TracBrowser for help on using the repository browser.