Changeset 2125443a for libcfa/src/concurrency/actor.hfa
- Timestamp:
- Feb 3, 2023, 1:28:43 PM (22 months ago)
- Branches:
- ADT, ast-experimental, master
- Children:
- 2f61765
- Parents:
- 8a97248 (diff), db9d7a9 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/actor.hfa
r8a97248 r2125443a 1 #pragma once 2 1 3 #include <locks.hfa> 2 4 #include <limits.hfa> 3 5 #include <list.hfa> 6 #include <kernel.hfa> 4 7 5 8 #ifdef __CFA_DEBUG__ … … 21 24 // Define if executor is created in a separate cluster 22 25 #define __DEFAULT_EXECUTOR_SEPCLUS__ false 26 27 // when you flip this make sure to recompile compiler and flip the appropriate flag there too in Actors.cpp 28 #define __ALLOC 0 23 29 24 30 // forward decls … … 38 44 P9_EMBEDDED( request, dlink(request) ) 39 45 40 void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel41 void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) {46 static inline void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel 47 static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) { 42 48 this.receiver = receiver; 43 49 this.msg = msg; … … 45 51 this.stop = false; 46 52 } 47 53 static inline void ?{}( request & this, request & copy ) { 54 this.receiver = copy.receiver; 55 this.msg = copy.msg; 56 this.fn = copy.fn; 57 this.stop = copy.stop; 58 } 59 60 // hybrid data structure. Copies until buffer is full and then allocates for intrusive list 61 struct copy_queue { 62 dlist( request ) list; 63 #if ! __ALLOC 64 request * buffer; 65 size_t count, buffer_size, index; 66 #endif 67 }; 68 static inline void ?{}( copy_queue & this ) {} 69 static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) { 70 list{}; 71 #if ! __ALLOC 72 buffer_size = buf_size; 73 buffer = aalloc( buffer_size ); 74 count = 0; 75 index = 0; 76 #endif 77 } 78 static inline void ^?{}( copy_queue & this ) with(this) { 79 #if ! __ALLOC 80 adelete(buffer); 81 #endif 82 } 83 84 static inline void insert( copy_queue & this, request & elem ) with(this) { 85 #if ! __ALLOC 86 if ( count < buffer_size ) { // fast path ( no alloc ) 87 buffer[count]{ elem }; 88 count++; 89 return; 90 } 91 request * new_elem = alloc(); 92 (*new_elem){ elem }; 93 insert_last( list, *new_elem ); 94 #else 95 insert_last( list, elem ); 96 #endif 97 } 98 99 // once you start removing you need to remove all elements 100 // it is not supported to call insert() before the list is fully empty 101 // should_delete is an output param 102 static inline request & remove( copy_queue & this, bool & should_delete ) with(this) { 103 #if ! __ALLOC 104 if ( count > 0 ) { 105 count--; 106 should_delete = false; 107 size_t old_idx = index; 108 index = count == 0 ? 0 : index + 1; 109 return buffer[old_idx]; 110 } 111 #endif 112 should_delete = true; 113 return try_pop_front( list ); 114 } 115 116 static inline bool isEmpty( copy_queue & this ) with(this) { 117 #if ! __ALLOC 118 return count == 0 && list`isEmpty; 119 #else 120 return list`isEmpty; 121 #endif 122 } 123 124 static size_t __buffer_size = 10; // C_TODO: rework this to be passed from executor through ctors (no need for global) 48 125 struct work_queue { 49 futex_mutex mutex_lock; 50 dlist( request ) input; // unbounded list of work requests 126 __spinlock_t mutex_lock; 127 copy_queue owned_queue; 128 copy_queue * c_queue; // C_TODO: try putting this on the stack with ptr juggling 129 51 130 }; // work_queue 52 void ?{}( work_queue & this ) with(this) { input{}; mutex_lock{}; } 53 54 void insert( work_queue & this, request & elem ) with(this) { 55 lock( mutex_lock ); 56 insert_last( input, elem ); 131 static inline void ?{}( work_queue & this ) with(this) { 132 // c_queue = alloc(); 133 // (*c_queue){ __buffer_size }; 134 owned_queue{ __buffer_size }; 135 c_queue = &owned_queue; 136 } 137 // static inline void ^?{}( work_queue & this ) with(this) { delete( c_queue ); } 138 139 static inline void insert( work_queue & this, request & elem ) with(this) { 140 lock( mutex_lock __cfaabi_dbg_ctx2 ); 141 insert( *c_queue, elem ); 57 142 unlock( mutex_lock ); 58 143 } // insert 59 144 60 void transfer( work_queue & this, dlist(request) & transferTo ) with(this) { 61 lock( mutex_lock ); 62 63 //C_TODO CHANGE 64 // transferTo->transfer( input ); // transfer input to output 65 66 // this is awfully inefficient but Ill use it until transfer is implemented 67 request * r; 68 while ( ! input`isEmpty ) { 69 r = &try_pop_front( input ); 70 if ( r ) insert_last( transferTo, *r ); 71 } 72 73 // transfer( input, transferTo ); 74 145 static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) { 146 lock( mutex_lock __cfaabi_dbg_ctx2 ); 147 // swap copy queue ptrs 148 copy_queue * temp = *transfer_to; 149 *transfer_to = c_queue; 150 c_queue = temp; 75 151 unlock( mutex_lock ); 76 152 } // transfer 77 153 78 154 thread worker { 155 copy_queue owned_queue; 79 156 work_queue * request_queues; 80 dlist( request )current_queue;157 copy_queue * current_queue; 81 158 request & req; 82 159 unsigned int start, range; … … 86 163 ((thread &)this){ clu }; 87 164 this.request_queues = request_queues; 88 this.current_queue{}; 165 // this.current_queue = alloc(); 166 // (*this.current_queue){ __buffer_size }; 167 this.owned_queue{ __buffer_size }; 168 this.current_queue = &this.owned_queue; 89 169 this.start = start; 90 170 this.range = range; 91 171 } 172 // static inline void ^?{}( worker & mutex this ) with(this) { delete( current_queue ); } 92 173 93 174 struct executor { … … 100 181 }; // executor 101 182 102 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) with(this) {183 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) { 103 184 if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" ); 185 __buffer_size = buf_size; 104 186 this.nprocessors = nprocessors; 105 187 this.nworkers = nworkers; … … 127 209 } // for 128 210 } 129 211 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __buffer_size }; } 130 212 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues ) { this{ nprocessors, nworkers, nrqueues, __DEFAULT_EXECUTOR_SEPCLUS__ }; } 131 213 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers ) { this{ nprocessors, nworkers, __DEFAULT_EXECUTOR_RQUEUES__ }; } … … 147 229 } // for 148 230 149 delete( workers );150 delete( request_queues );151 delete( processors );231 adelete( workers ); 232 adelete( request_queues ); 233 adelete( processors ); 152 234 if ( seperate_clus ) delete( cluster ); 153 235 } … … 170 252 }; 171 253 172 void ?{}( actor & this ) {254 static inline void ?{}( actor & this ) { 173 255 // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive 174 256 // member must be called to end it … … 178 260 __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_SEQ_CST ); 179 261 } 180 void ^?{}( actor & this ) {}262 static inline void ^?{}( actor & this ) {} 181 263 182 264 static inline void check_actor( actor & this ) { … … 204 286 }; 205 287 206 void ?{}( message & this ) { this.allocation_ = Nodelete; }207 void ?{}( message & this, Allocation allocation ) { this.allocation_ = allocation; }208 void ^?{}( message & this ) {}288 static inline void ?{}( message & this ) { this.allocation_ = Nodelete; } 289 static inline void ?{}( message & this, Allocation allocation ) { this.allocation_ = allocation; } 290 static inline void ^?{}( message & this ) {} 209 291 210 292 static inline void check_message( message & this ) { … … 217 299 } 218 300 219 void deliver_request( request & this ) {301 static inline void deliver_request( request & this ) { 220 302 Allocation actor_allocation = this.fn( *this.receiver, *this.msg ); 221 303 this.receiver->allocation_ = actor_allocation; … … 225 307 226 308 void main( worker & this ) with(this) { 309 bool should_delete; 227 310 Exit: 228 311 for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers 229 transfer( request_queues[i + start], current_queue ); 230 while ( ! current_queue`isEmpty ) { 231 &req = &try_pop_front( current_queue ); 312 // C_TODO: potentially check queue count instead of immediately trying to transfer 313 transfer( request_queues[i + start], ¤t_queue ); 314 while ( ! isEmpty( *current_queue ) ) { 315 &req = &remove( *current_queue, should_delete ); 232 316 if ( !&req ) continue; // possibly add some work stealing/idle sleep here 233 317 if ( req.stop ) break Exit; 234 318 deliver_request( req ); 235 319 236 delete( &req );320 if ( should_delete ) delete( &req ); 237 321 } // while 238 322 } // for … … 250 334 __actor_executor_thd = active_thread(); 251 335 __actor_executor_ = alloc(); 252 (*__actor_executor_){ 0, num_thds, num_thds * 16 };336 (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 }; 253 337 } 254 338
Note: See TracChangeset
for help on using the changeset viewer.