source: libcfa/src/concurrency/actor.hfa@ 757099e

ADT ast-experimental
Last change on this file since 757099e was 9d0ff30, checked in by caparsons <caparson@…>, 3 years ago

added missing header

  • Property mode set to 100644
File size: 9.4 KB
Line 
1#pragma once
2
3#include <locks.hfa>
4#include <limits.hfa>
5#include <list.hfa>
6#include <kernel.hfa>
7
8#ifdef __CFA_DEBUG__
9#define CFA_DEBUG( stmt ) stmt
10#else
11#define CFA_DEBUG( stmt )
12#endif // CFA_DEBUG
13
14// Define the default number of processors created in the executor. Must be greater than 0.
15#define __DEFAULT_EXECUTOR_PROCESSORS__ 2
16
17// Define the default number of threads created in the executor. Must be greater than 0.
18#define __DEFAULT_EXECUTOR_WORKERS__ 2
19
20// Define the default number of executor request-queues (mailboxes) written to by actors and serviced by the
21// actor-executor threads. Must be greater than 0.
22#define __DEFAULT_EXECUTOR_RQUEUES__ 2
23
24// Define if executor is created in a separate cluster
25#define __DEFAULT_EXECUTOR_SEPCLUS__ false
26
27// forward decls
28struct actor;
29struct message;
30
31enum Allocation { Nodelete, Delete, Destroy, Finished }; // allocation status
32
33typedef Allocation (*__receive_fn)(actor &, message &);
34struct request {
35 actor * receiver;
36 message * msg;
37 __receive_fn fn;
38 bool stop;
39 inline dlink(request);
40};
41P9_EMBEDDED( request, dlink(request) )
42
43void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel
44void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) {
45 this.receiver = receiver;
46 this.msg = msg;
47 this.fn = fn;
48 this.stop = false;
49}
50
51struct work_queue {
52 futex_mutex mutex_lock;
53 dlist( request ) input; // unbounded list of work requests
54}; // work_queue
55void ?{}( work_queue & this ) with(this) { input{}; mutex_lock{}; }
56
57void insert( work_queue & this, request & elem ) with(this) {
58 lock( mutex_lock );
59 insert_last( input, elem );
60 unlock( mutex_lock );
61} // insert
62
63void transfer( work_queue & this, dlist(request) & transferTo ) with(this) {
64 lock( mutex_lock );
65
66 //C_TODO CHANGE
67 // transferTo->transfer( input ); // transfer input to output
68
69 // this is awfully inefficient but Ill use it until transfer is implemented
70 request * r;
71 while ( ! input`isEmpty ) {
72 r = &try_pop_front( input );
73 if ( r ) insert_last( transferTo, *r );
74 }
75
76 // transfer( input, transferTo );
77
78 unlock( mutex_lock );
79} // transfer
80
81thread worker {
82 work_queue * request_queues;
83 dlist( request ) current_queue;
84 request & req;
85 unsigned int start, range;
86};
87
88static inline void ?{}( worker & this, cluster & clu, work_queue * request_queues, unsigned int start, unsigned int range ) {
89 ((thread &)this){ clu };
90 this.request_queues = request_queues;
91 this.current_queue{};
92 this.start = start;
93 this.range = range;
94}
95
96struct executor {
97 cluster * cluster; // if workers execute on separate cluster
98 processor ** processors; // array of virtual processors adding parallelism for workers
99 work_queue * request_queues; // master list of work request queues
100 worker ** workers; // array of workers executing work requests
101 unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues
102 bool seperate_clus; // use same or separate cluster for executor
103}; // executor
104
105static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) with(this) {
106 if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" );
107 this.nprocessors = nprocessors;
108 this.nworkers = nworkers;
109 this.nrqueues = nrqueues;
110 this.seperate_clus = seperate_clus;
111
112 if ( seperate_clus ) {
113 cluster = alloc();
114 (*cluster){};
115 } else cluster = active_cluster();
116
117 request_queues = aalloc( nrqueues );
118 for ( i; nrqueues )
119 request_queues[i]{};
120
121 processors = aalloc( nprocessors );
122 for ( i; nprocessors )
123 (*(processors[i] = alloc())){ *cluster };
124
125 workers = alloc( nworkers );
126 unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
127 for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) {
128 range = reqPerWorker + ( i < extras ? 1 : 0 );
129 (*(workers[i] = alloc())){ *cluster, request_queues, start, range };
130 } // for
131}
132
133static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues ) { this{ nprocessors, nworkers, nrqueues, __DEFAULT_EXECUTOR_SEPCLUS__ }; }
134static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers ) { this{ nprocessors, nworkers, __DEFAULT_EXECUTOR_RQUEUES__ }; }
135static inline void ?{}( executor & this, unsigned int nprocessors ) { this{ nprocessors, __DEFAULT_EXECUTOR_WORKERS__ }; }
136static inline void ?{}( executor & this ) { this{ __DEFAULT_EXECUTOR_PROCESSORS__ }; }
137
138static inline void ^?{}( executor & this ) with(this) {
139 request sentinels[nworkers];
140 unsigned int reqPerWorker = nrqueues / nworkers;
141 for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) {
142 insert( request_queues[step], sentinels[i] ); // force eventually termination
143 } // for
144
145 for ( i; nworkers )
146 delete( workers[i] );
147
148 for ( i; nprocessors ) {
149 delete( processors[i] );
150 } // for
151
152 delete( workers );
153 delete( request_queues );
154 delete( processors );
155 if ( seperate_clus ) delete( cluster );
156}
157
158// this is a static field of executor but have to forward decl for get_next_ticket
159static unsigned int __next_ticket = 0;
160
161static inline unsigned int get_next_ticket( executor & this ) with(this) {
162 return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
163} // tickets
164
165// C_TODO: update globals in this file to be static fields once the project is done
166static executor * __actor_executor_ = 0p;
167static bool __actor_executor_passed = false; // was an executor passed to start_actor_system
168static unsigned long int __num_actors_; // number of actor objects in system
169static struct thread$ * __actor_executor_thd = 0p; // used to wake executor after actors finish
170struct actor {
171 unsigned long int ticket; // executor-queue handle to provide FIFO message execution
172 Allocation allocation_; // allocation action
173};
174
175void ?{}( actor & this ) {
176 // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive
177 // member must be called to end it
178 verifyf( __actor_executor_, "Creating actor before calling start_actor_system()." );
179 this.allocation_ = Nodelete;
180 this.ticket = get_next_ticket( *__actor_executor_ );
181 __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_SEQ_CST );
182}
183void ^?{}( actor & this ) {}
184
185static inline void check_actor( actor & this ) {
186 if ( this.allocation_ != Nodelete ) {
187 switch( this.allocation_ ) {
188 case Delete: delete( &this ); break;
189 case Destroy:
190 CFA_DEBUG( this.ticket = MAX; ); // mark as terminated
191 ^?{}(this);
192 break;
193 case Finished:
194 CFA_DEBUG( this.ticket = MAX; ); // mark as terminated
195 break;
196 default: ; // stop warning
197 }
198
199 if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_SEQ_CST ) == 0 ) ) { // all actors have terminated
200 unpark( __actor_executor_thd );
201 }
202 }
203}
204
205struct message {
206 Allocation allocation_; // allocation action
207};
208
209void ?{}( message & this ) { this.allocation_ = Nodelete; }
210void ?{}( message & this, Allocation allocation ) { this.allocation_ = allocation; }
211void ^?{}( message & this ) {}
212
213static inline void check_message( message & this ) {
214 switch ( this.allocation_ ) { // analyze message status
215 case Nodelete: break;
216 case Delete: delete( &this ); break;
217 case Destroy: ^?{}(this); break;
218 case Finished: break;
219 } // switch
220}
221
222void deliver_request( request & this ) {
223 Allocation actor_allocation = this.fn( *this.receiver, *this.msg );
224 this.receiver->allocation_ = actor_allocation;
225 check_actor( *this.receiver );
226 check_message( *this.msg );
227}
228
229void main( worker & this ) with(this) {
230 Exit:
231 for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers
232 transfer( request_queues[i + start], current_queue );
233 while ( ! current_queue`isEmpty ) {
234 &req = &try_pop_front( current_queue );
235 if ( !&req ) continue; // possibly add some work stealing/idle sleep here
236 if ( req.stop ) break Exit;
237 deliver_request( req );
238
239 delete( &req );
240 } // while
241 } // for
242}
243
244static inline void send( executor & this, request & req, unsigned long int ticket ) with(this) {
245 insert( request_queues[ticket], req);
246}
247
248static inline void send( actor & this, request & req ) {
249 send( *__actor_executor_, req, this.ticket );
250}
251
252static inline void start_actor_system( size_t num_thds ) {
253 __actor_executor_thd = active_thread();
254 __actor_executor_ = alloc();
255 (*__actor_executor_){ 0, num_thds, num_thds * 16 };
256}
257
258static inline void start_actor_system() { start_actor_system( active_cluster()->procs.total ); }
259
260static inline void start_actor_system( executor & this ) {
261 __actor_executor_thd = active_thread();
262 __actor_executor_ = &this;
263 __actor_executor_passed = true;
264}
265
266static inline void stop_actor_system() {
267 park( ); // will receive signal when actor system is finished
268
269 if ( !__actor_executor_passed ) delete( __actor_executor_ );
270 __actor_executor_ = 0p;
271 __actor_executor_thd = 0p;
272 __next_ticket = 0;
273 __actor_executor_passed = false;
274}
Note: See TracBrowser for help on using the repository browser.