source: libcfa/src/concurrency/actor.hfa@ 7a24d76

ADT ast-experimental
Last change on this file since 7a24d76 was c042d79, checked in by caparsons <caparson@…>, 3 years ago

ported uC++-style actor system

  • Property mode set to 100644
File size: 9.4 KB
Line 
1#include <locks.hfa>
2#include <limits.hfa>
3#include <list.hfa>
4
5#ifdef __CFA_DEBUG__
6#define CFA_DEBUG( stmt ) stmt
7#else
8#define CFA_DEBUG( stmt )
9#endif // CFA_DEBUG
10
11// Define the default number of processors created in the executor. Must be greater than 0.
12#define __DEFAULT_EXECUTOR_PROCESSORS__ 2
13
14// Define the default number of threads created in the executor. Must be greater than 0.
15#define __DEFAULT_EXECUTOR_WORKERS__ 2
16
17// Define the default number of executor request-queues (mailboxes) written to by actors and serviced by the
18// actor-executor threads. Must be greater than 0.
19#define __DEFAULT_EXECUTOR_RQUEUES__ 2
20
21// Define if executor is created in a separate cluster
22#define __DEFAULT_EXECUTOR_SEPCLUS__ false
23
24// forward decls
25struct actor;
26struct message;
27
28enum Allocation { Nodelete, Delete, Destroy, Finished }; // allocation status
29
30typedef Allocation (*__receive_fn)(actor &, message &);
31struct request {
32 actor * receiver;
33 message * msg;
34 __receive_fn fn;
35 bool stop;
36 inline dlink(request);
37};
38P9_EMBEDDED( request, dlink(request) )
39
40void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel
41void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) {
42 this.receiver = receiver;
43 this.msg = msg;
44 this.fn = fn;
45 this.stop = false;
46}
47
48struct work_queue {
49 futex_mutex mutex_lock;
50 dlist( request ) input; // unbounded list of work requests
51}; // work_queue
52void ?{}( work_queue & this ) with(this) { input{}; mutex_lock{}; }
53
54void insert( work_queue & this, request & elem ) with(this) {
55 lock( mutex_lock );
56 insert_last( input, elem );
57 unlock( mutex_lock );
58} // insert
59
60void transfer( work_queue & this, dlist(request) & transferTo ) with(this) {
61 lock( mutex_lock );
62
63 //C_TODO CHANGE
64 // transferTo->transfer( input ); // transfer input to output
65
66 // this is awfully inefficient but Ill use it until transfer is implemented
67 request * r;
68 while ( ! input`isEmpty ) {
69 r = &try_pop_front( input );
70 if ( r ) insert_last( transferTo, *r );
71 }
72
73 // transfer( input, transferTo );
74
75 unlock( mutex_lock );
76} // transfer
77
78thread worker {
79 work_queue * request_queues;
80 dlist( request ) current_queue;
81 request & req;
82 unsigned int start, range;
83};
84
85static inline void ?{}( worker & this, cluster & clu, work_queue * request_queues, unsigned int start, unsigned int range ) {
86 ((thread &)this){ clu };
87 this.request_queues = request_queues;
88 this.current_queue{};
89 this.start = start;
90 this.range = range;
91}
92
93struct executor {
94 cluster * cluster; // if workers execute on separate cluster
95 processor ** processors; // array of virtual processors adding parallelism for workers
96 work_queue * request_queues; // master list of work request queues
97 worker ** workers; // array of workers executing work requests
98 unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues
99 bool seperate_clus; // use same or separate cluster for executor
100}; // executor
101
102static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) with(this) {
103 if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" );
104 this.nprocessors = nprocessors;
105 this.nworkers = nworkers;
106 this.nrqueues = nrqueues;
107 this.seperate_clus = seperate_clus;
108
109 if ( seperate_clus ) {
110 cluster = alloc();
111 (*cluster){};
112 } else cluster = active_cluster();
113
114 request_queues = aalloc( nrqueues );
115 for ( i; nrqueues )
116 request_queues[i]{};
117
118 processors = aalloc( nprocessors );
119 for ( i; nprocessors )
120 (*(processors[i] = alloc())){ *cluster };
121
122 workers = alloc( nworkers );
123 unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
124 for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) {
125 range = reqPerWorker + ( i < extras ? 1 : 0 );
126 (*(workers[i] = alloc())){ *cluster, request_queues, start, range };
127 } // for
128}
129
130static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues ) { this{ nprocessors, nworkers, nrqueues, __DEFAULT_EXECUTOR_SEPCLUS__ }; }
131static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers ) { this{ nprocessors, nworkers, __DEFAULT_EXECUTOR_RQUEUES__ }; }
132static inline void ?{}( executor & this, unsigned int nprocessors ) { this{ nprocessors, __DEFAULT_EXECUTOR_WORKERS__ }; }
133static inline void ?{}( executor & this ) { this{ __DEFAULT_EXECUTOR_PROCESSORS__ }; }
134
135static inline void ^?{}( executor & this ) with(this) {
136 request sentinels[nworkers];
137 unsigned int reqPerWorker = nrqueues / nworkers;
138 for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) {
139 insert( request_queues[step], sentinels[i] ); // force eventually termination
140 } // for
141
142 for ( i; nworkers )
143 delete( workers[i] );
144
145 for ( i; nprocessors ) {
146 delete( processors[i] );
147 } // for
148
149 delete( workers );
150 delete( request_queues );
151 delete( processors );
152 if ( seperate_clus ) delete( cluster );
153}
154
155// this is a static field of executor but have to forward decl for get_next_ticket
156static unsigned int __next_ticket = 0;
157
158static inline unsigned int get_next_ticket( executor & this ) with(this) {
159 return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
160} // tickets
161
162// C_TODO: update globals in this file to be static fields once the project is done
163static executor * __actor_executor_ = 0p;
164static bool __actor_executor_passed = false; // was an executor passed to start_actor_system
165static unsigned long int __num_actors_; // number of actor objects in system
166static struct thread$ * __actor_executor_thd = 0p; // used to wake executor after actors finish
167struct actor {
168 unsigned long int ticket; // executor-queue handle to provide FIFO message execution
169 Allocation allocation_; // allocation action
170};
171
172void ?{}( actor & this ) {
173 // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive
174 // member must be called to end it
175 verifyf( __actor_executor_, "Creating actor before calling start_actor_system()." );
176 this.allocation_ = Nodelete;
177 this.ticket = get_next_ticket( *__actor_executor_ );
178 __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_SEQ_CST );
179}
180void ^?{}( actor & this ) {}
181
182static inline void check_actor( actor & this ) {
183 if ( this.allocation_ != Nodelete ) {
184 switch( this.allocation_ ) {
185 case Delete: delete( &this ); break;
186 case Destroy:
187 CFA_DEBUG( this.ticket = MAX; ); // mark as terminated
188 ^?{}(this);
189 break;
190 case Finished:
191 CFA_DEBUG( this.ticket = MAX; ); // mark as terminated
192 break;
193 default: ; // stop warning
194 }
195
196 if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_SEQ_CST ) == 0 ) ) { // all actors have terminated
197 unpark( __actor_executor_thd );
198 }
199 }
200}
201
202struct message {
203 Allocation allocation_; // allocation action
204};
205
206void ?{}( message & this ) { this.allocation_ = Nodelete; }
207void ?{}( message & this, Allocation allocation ) { this.allocation_ = allocation; }
208void ^?{}( message & this ) {}
209
210static inline void check_message( message & this ) {
211 switch ( this.allocation_ ) { // analyze message status
212 case Nodelete: break;
213 case Delete: delete( &this ); break;
214 case Destroy: ^?{}(this); break;
215 case Finished: break;
216 } // switch
217}
218
219void deliver_request( request & this ) {
220 Allocation actor_allocation = this.fn( *this.receiver, *this.msg );
221 this.receiver->allocation_ = actor_allocation;
222 check_actor( *this.receiver );
223 check_message( *this.msg );
224}
225
226void main( worker & this ) with(this) {
227 Exit:
228 for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers
229 transfer( request_queues[i + start], current_queue );
230 while ( ! current_queue`isEmpty ) {
231 &req = &try_pop_front( current_queue );
232 if ( !&req ) continue; // possibly add some work stealing/idle sleep here
233 if ( req.stop ) break Exit;
234 deliver_request( req );
235
236 delete( &req );
237 } // while
238 } // for
239}
240
241static inline void send( executor & this, request & req, unsigned long int ticket ) with(this) {
242 insert( request_queues[ticket], req);
243}
244
245static inline void send( actor & this, request & req ) {
246 send( *__actor_executor_, req, this.ticket );
247}
248
249static inline void start_actor_system( size_t num_thds ) {
250 __actor_executor_thd = active_thread();
251 __actor_executor_ = alloc();
252 (*__actor_executor_){ 0, num_thds, num_thds * 16 };
253}
254
255static inline void start_actor_system() { start_actor_system( active_cluster()->procs.total ); }
256
257static inline void start_actor_system( executor & this ) {
258 __actor_executor_thd = active_thread();
259 __actor_executor_ = &this;
260 __actor_executor_passed = true;
261}
262
263static inline void stop_actor_system() {
264 park( ); // will receive signal when actor system is finished
265
266 if ( !__actor_executor_passed ) delete( __actor_executor_ );
267 __actor_executor_ = 0p;
268 __actor_executor_thd = 0p;
269 __next_ticket = 0;
270 __actor_executor_passed = false;
271}
Note: See TracBrowser for help on using the repository browser.