source: libcfa/src/concurrency/actor.hfa@ 46ab782

ADT ast-experimental
Last change on this file since 46ab782 was 2d028039, checked in by caparson <caparson@…>, 3 years ago

added support for copy based envelopes

  • Property mode set to 100644
File size: 21.8 KB
Line 
1#pragma once
2
3#include <locks.hfa>
4#include <limits.hfa>
5#include <list.hfa>
6#include <kernel.hfa>
7#include <vector2.hfa>
8
9#ifdef __CFA_DEBUG__
10#define CFA_DEBUG( stmt ) stmt
11#else
12#define CFA_DEBUG( stmt )
13#endif // CFA_DEBUG
14
15// Define the default number of processors created in the executor. Must be greater than 0.
16#define __DEFAULT_EXECUTOR_PROCESSORS__ 2
17
18// Define the default number of threads created in the executor. Must be greater than 0.
19#define __DEFAULT_EXECUTOR_WORKERS__ 2
20
21// Define the default number of executor request-queues (mailboxes) written to by actors and serviced by the
22// actor-executor threads. Must be greater than 0.
23#define __DEFAULT_EXECUTOR_RQUEUES__ 2
24
25// Define if executor is created in a separate cluster
26#define __DEFAULT_EXECUTOR_SEPCLUS__ false
27
28#define __STEAL 1 // workstealing toggle. Disjoint from toggles above
29
30// whether to steal work or to steal a queue Only applicable if __STEAL == 1
31#define __STEAL_WORK 0
32
33// heuristic selection (only set one to be 1)
34#define __RAND_QUEUE 1
35#define __RAND_WORKER 0
36
37// show stealing stats
38// #define __STEAL_STATS
39
40// forward decls
41struct actor;
42struct message;
43
44enum Allocation { Nodelete, Delete, Destroy, Finished }; // allocation status
45
46typedef Allocation (*__receive_fn)(actor &, message &);
47struct request {
48 actor * receiver;
49 message * msg;
50 __receive_fn fn;
51 bool stop;
52};
53
54static inline void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel
55static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) {
56 this.receiver = receiver;
57 this.msg = msg;
58 this.fn = fn;
59 this.stop = false;
60}
61static inline void ?{}( request & this, request & copy ) {
62 this.receiver = copy.receiver;
63 this.msg = copy.msg;
64 this.fn = copy.fn;
65 this.stop = copy.stop;
66}
67
68// hybrid data structure. Copies until buffer is full and then allocates for intrusive list
69struct copy_queue {
70 request * buffer;
71 size_t count, buffer_size, index, utilized, last_size;
72};
73static inline void ?{}( copy_queue & this ) {}
74static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) {
75 buffer_size = buf_size;
76 buffer = aalloc( buffer_size );
77 count = 0;
78 utilized = 0;
79 index = 0;
80 last_size = 0;
81}
82static inline void ^?{}( copy_queue & this ) with(this) { adelete(buffer); }
83
84static inline void insert( copy_queue & this, request & elem ) with(this) {
85 if ( count >= buffer_size ) { // increase arr size
86 last_size = buffer_size;
87 buffer_size = 2 * buffer_size;
88 buffer = realloc( buffer, sizeof( request ) * buffer_size );
89 /* paranoid */ verify( buffer );
90 }
91 buffer[count]{ elem }; // C_TODO: change to memcpy
92 // memcpy( &buffer[count], &elem, sizeof(request) );
93 count++;
94}
95
96// once you start removing you need to remove all elements
97// it is not supported to call insert() before the list is fully empty
98static inline request & remove( copy_queue & this ) with(this) {
99 if ( count > 0 ) {
100 count--;
101 size_t old_idx = index;
102 index = count == 0 ? 0 : index + 1;
103 return buffer[old_idx];
104 }
105 request * ret = 0p;
106 return *0p;
107}
108
109// try to reclaim some memory
110static inline void reclaim( copy_queue & this ) with(this) {
111 if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; }
112 utilized = 0;
113 buffer_size--;
114 buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory
115}
116
117static inline bool isEmpty( copy_queue & this ) with(this) { return count == 0; }
118
119static size_t __buffer_size = 10; // C_TODO: rework this to be passed from executor through ctors (no need for global)
120struct work_queue {
121 __spinlock_t mutex_lock;
122 copy_queue owned_queue;
123 copy_queue * c_queue;
124 volatile bool being_processed;
125}; // work_queue
126static inline void ?{}( work_queue & this ) with(this) {
127 owned_queue{ __buffer_size };
128 c_queue = &owned_queue;
129 being_processed = false;
130}
131
132static inline void insert( work_queue & this, request & elem ) with(this) {
133 lock( mutex_lock __cfaabi_dbg_ctx2 );
134 insert( *c_queue, elem );
135 unlock( mutex_lock );
136} // insert
137
138static inline void transfer( work_queue & this, copy_queue ** transfer_to, work_queue ** queue_arr, unsigned int idx ) with(this) {
139 lock( mutex_lock __cfaabi_dbg_ctx2 );
140 #if __STEAL
141
142 #if __STEAL_WORK
143 if ( unlikely( being_processed ) )
144 #else
145 // check if queue has been stolen out from under us between
146 // transfer() call and lock acquire C_TODO: maybe just use new queue!
147 if ( unlikely( being_processed || queue_arr[idx] != &this ) )
148 #endif // __STEAL_WORK
149 {
150 unlock( mutex_lock );
151 return;
152 }
153
154 being_processed = c_queue->count != 0;
155 #endif // __STEAL
156
157 c_queue->utilized = c_queue->count;
158
159 // swap copy queue ptrs
160 copy_queue * temp = *transfer_to;
161 *transfer_to = c_queue;
162 c_queue = temp;
163 unlock( mutex_lock );
164} // transfer
165
166thread worker {
167 work_queue ** request_queues;
168 copy_queue * current_queue;
169 worker ** worker_arr; // C_TODO: change n_workers, n_queues,worker_arr to just be pulled from ptr to executor
170 request & req;
171 unsigned int start, range, empty_count, n_workers, n_queues, id;
172 #ifdef __STEAL_STATS
173 unsigned int try_steal, stolen;
174 #endif
175};
176
177#ifdef __STEAL_STATS
178unsigned int total_tries = 0, total_stolen = 0, total_workers;
179#endif
180static inline void ?{}( worker & this, cluster & clu, work_queue ** request_queues, copy_queue * current_queue, unsigned int start,
181 unsigned int range, worker ** worker_arr, unsigned int n_workers, unsigned int n_queues, unsigned int id ) {
182 ((thread &)this){ clu };
183 this.request_queues = request_queues;
184 this.current_queue = current_queue;
185 this.start = start;
186 this.range = range;
187 this.empty_count = 0;
188 this.n_workers = n_workers;
189 this.worker_arr = worker_arr;
190 this.n_queues = n_queues;
191 this.id = id;
192 #ifdef __STEAL_STATS
193 this.try_steal = 0;
194 this.stolen = 0;
195 total_workers = n_workers;
196 #endif
197}
198static inline void ^?{}( worker & mutex this ) with(this) {
199 // delete( current_queue );
200 #ifdef __STEAL_STATS
201 __atomic_add_fetch(&total_tries, try_steal, __ATOMIC_SEQ_CST);
202 __atomic_add_fetch(&total_stolen, stolen, __ATOMIC_SEQ_CST);
203 if (__atomic_sub_fetch(&total_workers, 1, __ATOMIC_SEQ_CST) == 0)
204 printf("steal attempts: %u, steals: %u\n", total_tries, total_stolen);
205 #endif
206}
207
208struct executor {
209 cluster * cluster; // if workers execute on separate cluster
210 processor ** processors; // array of virtual processors adding parallelism for workers
211 work_queue * request_queues; // master array of work request queues
212 copy_queue * local_queues; // array of all worker local queues to avoid deletion race
213 work_queue ** worker_req_queues; // secondary array of work queues to allow for swapping
214 worker ** workers; // array of workers executing work requests
215 unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues
216 bool seperate_clus; // use same or separate cluster for executor
217}; // executor
218
219static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) {
220 if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" );
221 __buffer_size = buf_size;
222 this.nprocessors = nprocessors;
223 this.nworkers = nworkers;
224 this.nrqueues = nrqueues;
225 this.seperate_clus = seperate_clus;
226
227 if ( seperate_clus ) {
228 cluster = alloc();
229 (*cluster){};
230 } else cluster = active_cluster();
231
232 request_queues = aalloc( nrqueues );
233 worker_req_queues = aalloc( nrqueues );
234 for ( i; nrqueues ) {
235 request_queues[i]{};
236 worker_req_queues[i] = &request_queues[i];
237 }
238
239 processors = aalloc( nprocessors );
240 for ( i; nprocessors )
241 (*(processors[i] = alloc())){ *cluster };
242
243 local_queues = aalloc( nworkers );
244 workers = alloc( nworkers );
245 unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
246 for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) {
247 local_queues[i]{ buf_size };
248 range = reqPerWorker + ( i < extras ? 1 : 0 );
249 (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], start, range, workers, nworkers, nrqueues, i };
250 } // for
251}
252static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __buffer_size }; }
253static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues ) { this{ nprocessors, nworkers, nrqueues, __DEFAULT_EXECUTOR_SEPCLUS__ }; }
254static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers ) { this{ nprocessors, nworkers, __DEFAULT_EXECUTOR_RQUEUES__ }; }
255static inline void ?{}( executor & this, unsigned int nprocessors ) { this{ nprocessors, __DEFAULT_EXECUTOR_WORKERS__ }; }
256static inline void ?{}( executor & this ) { this{ __DEFAULT_EXECUTOR_PROCESSORS__ }; }
257
258// C_TODO: once stealing is implemented make sure shutdown still works
259static inline void ^?{}( executor & this ) with(this) {
260 #if __STEAL
261 request sentinels[nrqueues];
262 for ( unsigned int i = 0; i < nrqueues; i++ ) {
263 insert( request_queues[i], sentinels[i] ); // force eventually termination
264 } // for
265 #else
266 request sentinels[nworkers];
267 unsigned int reqPerWorker = nrqueues / nworkers;
268 for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) {
269 insert( request_queues[step], sentinels[i] ); // force eventually termination
270 } // for
271 #endif
272
273 for ( i; nworkers )
274 delete( workers[i] );
275
276 for ( i; nprocessors ) {
277 delete( processors[i] );
278 } // for
279
280 adelete( workers );
281 adelete( local_queues );
282 adelete( request_queues );
283 adelete( worker_req_queues );
284 adelete( processors );
285 if ( seperate_clus ) delete( cluster );
286}
287
288// this is a static field of executor but have to forward decl for get_next_ticket
289static unsigned int __next_ticket = 0;
290
291static inline unsigned int get_next_ticket( executor & this ) with(this) {
292 return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;
293} // tickets
294
295// C_TODO: update globals in this file to be static fields once the project is done
296static executor * __actor_executor_ = 0p;
297static bool __actor_executor_passed = false; // was an executor passed to start_actor_system
298static unsigned long int __num_actors_; // number of actor objects in system
299static struct thread$ * __actor_executor_thd = 0p; // used to wake executor after actors finish
300struct actor {
301 unsigned long int ticket; // executor-queue handle to provide FIFO message execution
302 Allocation allocation_; // allocation action
303};
304
305static inline void ?{}( actor & this ) {
306 // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive
307 // member must be called to end it
308 verifyf( __actor_executor_, "Creating actor before calling start_actor_system()." );
309 this.allocation_ = Nodelete;
310 this.ticket = get_next_ticket( *__actor_executor_ );
311 __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_SEQ_CST );
312}
313static inline void ^?{}( actor & this ) {}
314
315static inline void check_actor( actor & this ) {
316 if ( this.allocation_ != Nodelete ) {
317 switch( this.allocation_ ) {
318 case Delete: delete( &this ); break;
319 case Destroy:
320 CFA_DEBUG( this.ticket = MAX; ); // mark as terminated
321 ^?{}(this);
322 break;
323 case Finished:
324 CFA_DEBUG( this.ticket = MAX; ); // mark as terminated
325 break;
326 default: ; // stop warning
327 }
328
329 if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_SEQ_CST ) == 0 ) ) { // all actors have terminated
330 unpark( __actor_executor_thd );
331 }
332 }
333}
334
335struct message {
336 Allocation allocation_; // allocation action
337};
338
339static inline void ?{}( message & this ) { this.allocation_ = Nodelete; }
340static inline void ?{}( message & this, Allocation allocation ) { this.allocation_ = allocation; }
341static inline void ^?{}( message & this ) {}
342
343static inline void check_message( message & this ) {
344 switch ( this.allocation_ ) { // analyze message status
345 case Nodelete: break;
346 case Delete: delete( &this ); break;
347 case Destroy: ^?{}(this); break;
348 case Finished: break;
349 } // switch
350}
351
352static inline void deliver_request( request & this ) {
353 Allocation actor_allocation = this.fn( *this.receiver, *this.msg );
354 this.receiver->allocation_ = actor_allocation;
355 check_actor( *this.receiver );
356 check_message( *this.msg );
357}
358
359// Couple of ways to approach work stealing
360// 1: completely worker agnostic, just find a big queue and steal it
361// 2: track some heuristic of worker's load and focus on that and then pick a queue from that worker
362// worker heuristics:
363// - how many queues have work?
364// - size of largest queue
365// - total # of messages
366// - messages currently servicing
367// - pick randomly
368// - pick from closer threads/workers (this can be combined with others)
369
370// lock free or global lock for queue stealing
371#define __LOCK_SWP 0
372
373__spinlock_t swp_lock;
374
375// tries to atomically swap two queues and returns a bool indicating if the swap failed
376static inline bool try_swap_queues( worker & this, unsigned int victim_idx, unsigned int my_idx ) with(this) {
377 #if __LOCK_SWP
378
379 lock( swp_lock __cfaabi_dbg_ctx2 );
380 work_queue * temp = request_queues[my_idx];
381 request_queues[my_idx] = request_queues[victim_idx];
382 request_queues[victim_idx] = temp;
383 unlock( swp_lock );
384
385 return true;
386
387 #else // __LOCK_SWP else
388 work_queue * my_queue = request_queues[my_idx];
389 work_queue * other_queue = request_queues[victim_idx];
390 if ( other_queue == 0p || my_queue == 0p ) return false;
391
392 // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false
393 if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) )
394 return false;
395
396 // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false
397 if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {
398 /* paranoid */ verify( request_queues[my_idx] == 0p );
399 request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val
400 return false;
401 }
402
403 // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically
404 request_queues[my_idx] = other_queue; // last write does not need to be atomic
405 return true;
406
407 #endif // __LOCK_SWP
408}
409
410// once a worker to steal from has been chosen, choose queue to steal from
411static inline bool choose_queue( worker & this, unsigned int victim_id, unsigned int & last_idx ) with(this) {
412 #if __RAND_QUEUE
413 unsigned int tries = 0;
414 const unsigned int start_idx = prng( n_queues );
415 work_queue * curr_steal_queue;
416
417 for ( unsigned int i = start_idx; tries < n_queues; i = (i + 1) % n_queues ) {
418 tries++;
419 curr_steal_queue = request_queues[i];
420 #if __STEAL_WORK
421
422 // avoid empty queues and queues that are being operated on
423 if ( curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) )
424 continue;
425
426 // in this case we just return from transfer if this doesn't work
427 transfer( *curr_steal_queue, &current_queue, request_queues, i );
428 if ( isEmpty( *current_queue ) ) continue;
429 last_idx = i;
430
431 #ifdef __STEAL_STATS
432 stolen++;
433 #endif // __STEAL_STATS
434
435 #else // __STEAL_WORK else
436
437 // avoid empty queues and queues that are being operated on
438 if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) )
439 continue;
440
441 #ifdef __STEAL_STATS
442 bool success = try_swap_queues( this, i, last_idx );
443 if ( success ) stolen++;
444 #else
445 try_swap_queues( this, i, last_idx );
446 #endif // __STEAL_STATS
447
448 // C_TODO: try transfer immediately
449 // transfer( *request_queues[last_idx], &current_queue, request_queues, last_idx );
450 // if ( isEmpty( *current_queue ) ) return false;
451 return false;
452
453 #endif // __STEAL_WORK
454
455 return true;
456 } // for
457 return false;
458
459 #elif __RAND_WORKER
460
461 // have to calculate victim start and range since victim may be deleted before us in shutdown
462 const unsigned int queues_per_worker = n_queues / n_workers;
463 const unsigned int extras = n_queues % n_workers;
464 unsigned int vic_start, vic_range;
465 if ( extras > victim_id ) {
466 vic_range = queues_per_worker + 1;
467 vic_start = vic_range * victim_id;
468 } else {
469 vic_start = extras + victim_id * queues_per_worker;
470 vic_range = queues_per_worker;
471 }
472 unsigned int start_idx = prng( vic_range );
473 unsigned int tries = 0;
474 work_queue * curr_steal_queue;
475
476 for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) {
477 tries++;
478 curr_steal_queue = request_queues[ i + vic_start ];
479 // avoid empty queues and queues that are being operated on
480 if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) )
481 continue;
482
483 try_swap_queues( this, i, last_idx );
484
485 #ifdef __STEAL_STATS
486 bool success = try_swap_queues( this, i, last_idx );
487 if ( success ) stolen++;
488 #else
489 try_swap_queues( this, i, last_idx );
490 #endif // __STEAL_STATS
491
492 // C_TODO: try transfer immediately
493 // transfer( *request_queues[last_idx], &current_queue, request_queues, last_idx );
494 // if ( isEmpty( *current_queue ) ) return false;
495 return false;
496 }
497 #endif
498}
499
500// choose a worker to steal from
501static inline bool choose_victim( worker & this, unsigned int & last_idx ) with(this) {
502 #if __RAND_WORKER
503 unsigned int victim = prng( n_workers );
504 if ( victim == id ) victim = ( victim + 1 ) % n_workers;
505 return choose_queue( this, victim, last_idx );
506 #else
507 return choose_queue( this, 0, last_idx );
508 #endif
509}
510
511// look for work to steal
512// returns a bool: true => a queue was stolen, false => no work was stolen
513static inline bool steal_work( worker & this, unsigned int & last_idx ) with(this) { // C_TODO: add debug tracking of how many steals occur
514 // to steal queue acquire both queue's locks in address ordering (maybe can do atomic swap)
515 // maybe a flag to hint which queue is being processed
516 // look at count to see if queue is worth stealing (dont steal empty queues)
517 // if steal and then flag is up then dont process and just continue looking at own queues
518 // (best effort approach) its ok if stealing isn't fruitful
519 // -> more important to not delay busy threads
520
521 return choose_victim( this, last_idx );
522}
523
524void main( worker & this ) with(this) {
525 // threshold of empty queues we see before we go stealing
526 const unsigned int steal_threshold = 2 * n_queues;
527 unsigned int curr_idx;
528 work_queue * curr_work_queue;
529 Exit:
530 for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers
531 // C_TODO: potentially check queue count instead of immediately trying to transfer
532 curr_idx = i + start;
533 curr_work_queue = request_queues[curr_idx];
534 transfer( *curr_work_queue, &current_queue, request_queues, curr_idx );
535 if ( isEmpty( *current_queue ) ) {
536 #if __STEAL
537 empty_count++;
538 if ( empty_count < steal_threshold ) continue;
539 empty_count = 0; // C_TODO: look into stealing backoff schemes
540 #ifdef __STEAL_STATS
541 try_steal++;
542 #endif // __STEAL_STATS
543
544 if ( ! steal_work( this, curr_idx ) ) continue;
545
546 #else // __STEAL else
547
548 continue;
549
550 #endif // __STEAL
551 }
552 while ( ! isEmpty( *current_queue ) ) {
553 &req = &remove( *current_queue );
554 if ( !&req ) continue; // possibly add some work stealing/idle sleep here
555 if ( req.stop ) break Exit;
556 deliver_request( req );
557 }
558 #if __STEAL
559 curr_work_queue->being_processed = false; // set done processing
560 #endif
561 empty_count = 0; // we found work so reset empty counter
562 reclaim( *current_queue );
563 } // for
564}
565
566static inline void send( executor & this, request & req, unsigned long int ticket ) with(this) {
567 insert( request_queues[ticket], req);
568}
569
570static inline void send( actor & this, request & req ) {
571 send( *__actor_executor_, req, this.ticket );
572}
573
574static inline void start_actor_system( size_t num_thds ) {
575 __actor_executor_thd = active_thread();
576 __actor_executor_ = alloc();
577 (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 };
578}
579
580static inline void start_actor_system() { start_actor_system( active_cluster()->procs.total ); }
581
582static inline void start_actor_system( executor & this ) {
583 __actor_executor_thd = active_thread();
584 __actor_executor_ = &this;
585 __actor_executor_passed = true;
586}
587
588static inline void stop_actor_system() {
589 park( ); // will receive signal when actor system is finished
590
591 if ( !__actor_executor_passed ) delete( __actor_executor_ );
592 __actor_executor_ = 0p;
593 __actor_executor_thd = 0p;
594 __next_ticket = 0;
595 __actor_executor_passed = false;
596}
Note: See TracBrowser for help on using the repository browser.