source: libcfa/src/concurrency/actor.hfa @ c042d79

ADTast-experimental
Last change on this file since c042d79 was c042d79, checked in by caparsons <caparson@…>, 23 months ago

ported uC++-style actor system

  • Property mode set to 100644
File size: 9.4 KB
Line 
1#include <locks.hfa>
2#include <limits.hfa>
3#include <list.hfa>
4
5#ifdef __CFA_DEBUG__
6#define CFA_DEBUG( stmt ) stmt
7#else
8#define CFA_DEBUG( stmt )
9#endif // CFA_DEBUG
10
11// Define the default number of processors created in the executor. Must be greater than 0.
12#define __DEFAULT_EXECUTOR_PROCESSORS__ 2
13
14// Define the default number of threads created in the executor. Must be greater than 0.
15#define __DEFAULT_EXECUTOR_WORKERS__ 2
16
17// Define the default number of executor request-queues (mailboxes) written to by actors and serviced by the
18// actor-executor threads. Must be greater than 0.
19#define __DEFAULT_EXECUTOR_RQUEUES__ 2
20
21// Define if executor is created in a separate cluster
22#define __DEFAULT_EXECUTOR_SEPCLUS__ false
23
24// forward decls
25struct actor;
26struct message;
27
28enum Allocation { Nodelete, Delete, Destroy, Finished }; // allocation status
29
30typedef Allocation (*__receive_fn)(actor &, message &);
31struct request {
32    actor * receiver;
33    message * msg;
34    __receive_fn fn;
35    bool stop;
36    inline dlink(request);
37};
38P9_EMBEDDED( request, dlink(request) )
39
40void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel
41void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) {
42    this.receiver = receiver;
43    this.msg = msg;
44    this.fn = fn;
45    this.stop = false;
46}
47
48struct work_queue {
49    futex_mutex mutex_lock;
50    dlist( request ) input;                                             // unbounded list of work requests
51}; // work_queue
52void ?{}( work_queue & this ) with(this) { input{}; mutex_lock{}; }
53
54void insert( work_queue & this, request & elem ) with(this) {
55    lock( mutex_lock );
56    insert_last( input, elem );
57    unlock( mutex_lock );
58} // insert
59
60void transfer( work_queue & this, dlist(request) & transferTo ) with(this) {
61    lock( mutex_lock );
62
63    //C_TODO CHANGE
64    // transferTo->transfer( input );              // transfer input to output
65
66    // this is awfully inefficient but Ill use it until transfer is implemented
67    request * r;
68    while ( ! input`isEmpty ) {
69        r = &try_pop_front( input );
70        if ( r ) insert_last( transferTo, *r );
71    }
72
73    // transfer( input, transferTo );
74
75    unlock( mutex_lock );
76} // transfer
77
78thread worker {
79    work_queue * request_queues;
80    dlist( request ) current_queue;
81        request & req;
82    unsigned int start, range;
83};
84
85static inline void ?{}( worker & this, cluster & clu, work_queue * request_queues, unsigned int start, unsigned int range ) {
86    ((thread &)this){ clu };
87    this.request_queues = request_queues;
88    this.current_queue{};
89    this.start = start;
90    this.range = range;
91}
92
93struct executor {
94    cluster * cluster;                                                      // if workers execute on separate cluster
95        processor ** processors;                                            // array of virtual processors adding parallelism for workers
96        work_queue * request_queues;                                // master list of work request queues
97        worker ** workers;                                                              // array of workers executing work requests
98        unsigned int nprocessors, nworkers, nrqueues;   // number of processors/threads/request queues
99        bool seperate_clus;                                                             // use same or separate cluster for executor
100}; // executor
101
102static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) with(this) {
103    if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" );
104    this.nprocessors = nprocessors;
105    this.nworkers = nworkers;
106    this.nrqueues = nrqueues;
107    this.seperate_clus = seperate_clus;
108
109    if ( seperate_clus ) {
110        cluster = alloc();
111        (*cluster){};
112    } else cluster = active_cluster();
113
114    request_queues = aalloc( nrqueues );
115    for ( i; nrqueues )
116        request_queues[i]{};
117   
118    processors = aalloc( nprocessors );
119    for ( i; nprocessors )
120        (*(processors[i] = alloc())){ *cluster };
121
122    workers = alloc( nworkers );
123    unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
124    for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) {
125        range = reqPerWorker + ( i < extras ? 1 : 0 );
126        (*(workers[i] = alloc())){ *cluster, request_queues, start, range };
127    } // for
128}
129
130static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues ) { this{ nprocessors, nworkers, nrqueues, __DEFAULT_EXECUTOR_SEPCLUS__ }; }
131static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers ) { this{ nprocessors, nworkers, __DEFAULT_EXECUTOR_RQUEUES__ }; }
132static inline void ?{}( executor & this, unsigned int nprocessors ) { this{ nprocessors, __DEFAULT_EXECUTOR_WORKERS__ }; }
133static inline void ?{}( executor & this ) { this{ __DEFAULT_EXECUTOR_PROCESSORS__ }; }
134
135static inline void ^?{}( executor & this ) with(this) {
136    request sentinels[nworkers];
137    unsigned int reqPerWorker = nrqueues / nworkers;
138    for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) {
139        insert( request_queues[step], sentinels[i] );           // force eventually termination
140    } // for
141
142    for ( i; nworkers )
143        delete( workers[i] );
144
145    for ( i; nprocessors ) {
146        delete( processors[i] );
147    } // for
148
149    delete( workers );
150    delete( request_queues );
151    delete( processors );
152    if ( seperate_clus ) delete( cluster );
153}
154
155// this is a static field of executor but have to forward decl for get_next_ticket
156static unsigned int __next_ticket = 0;
157
158static inline unsigned int get_next_ticket( executor & this ) with(this) {
159    return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
160} // tickets
161
162// C_TODO: update globals in this file to be static fields once the project is done
163static executor * __actor_executor_ = 0p;
164static bool __actor_executor_passed = false;        // was an executor passed to start_actor_system
165static unsigned long int __num_actors_;                         // number of actor objects in system
166static struct thread$ * __actor_executor_thd = 0p;              // used to wake executor after actors finish
167struct actor {
168    unsigned long int ticket;           // executor-queue handle to provide FIFO message execution
169    Allocation allocation_;                     // allocation action
170};
171
172void ?{}( actor & this ) {
173    // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive
174    // member must be called to end it
175    verifyf( __actor_executor_, "Creating actor before calling start_actor_system()." );
176    this.allocation_ = Nodelete;
177    this.ticket = get_next_ticket( *__actor_executor_ );
178    __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_SEQ_CST );
179}
180void ^?{}( actor & this ) {}
181
182static inline void check_actor( actor & this ) {
183    if ( this.allocation_ != Nodelete ) {
184        switch( this.allocation_ ) {
185            case Delete: delete( &this ); break;
186            case Destroy:
187                CFA_DEBUG( this.ticket = MAX; );        // mark as terminated
188                ^?{}(this);
189                break;
190            case Finished:
191                CFA_DEBUG( this.ticket = MAX; );        // mark as terminated
192                break;
193            default: ;                                                          // stop warning
194        }
195
196        if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_SEQ_CST ) == 0 ) ) { // all actors have terminated
197            unpark( __actor_executor_thd );
198        }
199    }
200}
201
202struct message {
203    Allocation allocation_;                     // allocation action
204};
205
206void ?{}( message & this ) { this.allocation_ = Nodelete; }
207void ?{}( message & this, Allocation allocation ) { this.allocation_ = allocation; }
208void ^?{}( message & this ) {}
209
210static inline void check_message( message & this ) {
211    switch ( this.allocation_ ) {                                               // analyze message status
212        case Nodelete: break;
213        case Delete: delete( &this ); break;
214        case Destroy: ^?{}(this); break;
215        case Finished: break;
216    } // switch
217}
218
219void deliver_request( request & this ) {
220    Allocation actor_allocation = this.fn( *this.receiver, *this.msg );
221    this.receiver->allocation_ = actor_allocation;
222    check_actor( *this.receiver );
223    check_message( *this.msg );
224}
225
226void main( worker & this ) with(this) {
227    Exit:
228    for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers
229        transfer( request_queues[i + start], current_queue );
230        while ( ! current_queue`isEmpty ) {
231            &req = &try_pop_front( current_queue );
232            if ( !&req ) continue; // possibly add some work stealing/idle sleep here
233            if ( req.stop ) break Exit;
234            deliver_request( req );
235
236            delete( &req );
237        } // while
238    } // for
239}
240
241static inline void send( executor & this, request & req, unsigned long int ticket ) with(this) {
242    insert( request_queues[ticket], req);
243}
244
245static inline void send( actor & this, request & req ) {
246    send( *__actor_executor_, req, this.ticket );
247}
248
249static inline void start_actor_system( size_t num_thds ) {
250    __actor_executor_thd = active_thread();
251    __actor_executor_ = alloc();
252    (*__actor_executor_){ 0, num_thds, num_thds * 16 };
253}
254
255static inline void start_actor_system() { start_actor_system( active_cluster()->procs.total ); }
256
257static inline void start_actor_system( executor & this ) {
258    __actor_executor_thd = active_thread();
259    __actor_executor_ = &this;
260    __actor_executor_passed = true;
261}
262
263static inline void stop_actor_system() {
264    park( ); // will receive signal when actor system is finished
265
266    if ( !__actor_executor_passed ) delete( __actor_executor_ );
267    __actor_executor_ = 0p;
268    __actor_executor_thd = 0p;
269    __next_ticket = 0;
270    __actor_executor_passed = false;
271}
Note: See TracBrowser for help on using the repository browser.