Changeset b110bcc for libcfa/src/concurrency/actor.hfa
- Timestamp:
- Apr 21, 2023, 5:36:12 PM (3 years ago)
- Branches:
- ADT, master
- Children:
- 28f8f15, 6e4c44d
- Parents:
- 2ed94a9 (diff), 699a97d (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)links above to see all the changes relative to each parent. - File:
-
- 1 edited
-
libcfa/src/concurrency/actor.hfa (modified) (17 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/actor.hfa
r2ed94a9 rb110bcc 3 3 #include <locks.hfa> 4 4 #include <limits.hfa> 5 #include <list.hfa>6 5 #include <kernel.hfa> 6 #include <iofwd.hfa> 7 #include <virtual_dtor.hfa> 7 8 8 9 #ifdef __CFA_DEBUG__ … … 20 21 // Define the default number of executor request-queues (mailboxes) written to by actors and serviced by the 21 22 // actor-executor threads. Must be greater than 0. 22 #define __DEFAULT_EXECUTOR_RQUEUES__ 223 #define __DEFAULT_EXECUTOR_RQUEUES__ 4 23 24 24 25 // Define if executor is created in a separate cluster 25 26 #define __DEFAULT_EXECUTOR_SEPCLUS__ false 26 27 27 // when you flip this make sure to recompile compiler and flip the appropriate flag there too in Actors.cpp 28 #define __ALLOC 0 28 #define __DEFAULT_EXECUTOR_BUFSIZE__ 10 29 30 #define __STEAL 0 // workstealing toggle. Disjoint from toggles above 31 32 // workstealing heuristic selection (only set one to be 1) 33 // #define RAND 0 34 #define SEARCH 1 35 36 // show stats 37 // #define ACTOR_STATS 29 38 30 39 // forward decls 31 40 struct actor; 32 41 struct message; 42 struct executor; 33 43 34 44 enum Allocation { Nodelete, Delete, Destroy, Finished }; // allocation status … … 40 50 __receive_fn fn; 41 51 bool stop; 42 inline dlink(request);43 52 }; 44 P9_EMBEDDED( request, dlink(request) )45 53 46 54 static inline void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel … … 58 66 } 59 67 60 // hybrid data structure. Copies until buffer is full and then allocates for intrusive list 68 // Vector-like data structure that supports O(1) queue operations with no bound on size 69 // assumes gulping behaviour (once a remove occurs, removes happen until empty beforw next insert) 61 70 struct copy_queue { 62 dlist( request ) list;63 #if ! __ALLOC64 71 request * buffer; 65 size_t count, buffer_size, index; 66 #endif 72 size_t count, buffer_size, index, utilized, last_size; 67 73 }; 68 74 static inline void ?{}( copy_queue & this ) {} 69 75 static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) { 70 list{};71 #if ! __ALLOC72 76 buffer_size = buf_size; 73 77 buffer = aalloc( buffer_size ); 74 78 count = 0; 79 utilized = 0; 75 80 index = 0; 76 #endif 77 } 78 static inline void ^?{}( copy_queue & this ) with(this) { 79 #if ! __ALLOC 80 adelete(buffer); 81 #endif 82 } 81 last_size = 0; 82 } 83 static inline void ^?{}( copy_queue & this ) with(this) { adelete(buffer); } 83 84 84 85 static inline void insert( copy_queue & this, request & elem ) with(this) { 85 #if ! __ALLOC 86 if ( count < buffer_size ) { // fast path ( no alloc ) 87 buffer[count]{ elem }; 88 count++; 89 return; 90 } 91 request * new_elem = alloc(); 92 (*new_elem){ elem }; 93 insert_last( list, *new_elem ); 94 #else 95 insert_last( list, elem ); 96 #endif 86 if ( count >= buffer_size ) { // increase arr size 87 last_size = buffer_size; 88 buffer_size = 2 * buffer_size; 89 buffer = realloc( buffer, sizeof( request ) * buffer_size ); 90 /* paranoid */ verify( buffer ); 91 } 92 memcpy( &buffer[count], &elem, sizeof(request) ); 93 count++; 97 94 } 98 95 99 96 // once you start removing you need to remove all elements 100 // it is not supported to call insert() before the list is fully empty 101 // should_delete is an output param 102 static inline request & remove( copy_queue & this, bool & should_delete ) with(this) { 103 #if ! __ALLOC 97 // it is not supported to call insert() before the array is fully empty 98 static inline request & remove( copy_queue & this ) with(this) { 104 99 if ( count > 0 ) { 105 100 count--; 106 should_delete = false;107 101 size_t old_idx = index; 108 102 index = count == 0 ? 0 : index + 1; 109 103 return buffer[old_idx]; 110 104 } 111 #endif112 should_delete = true;113 return try_pop_front( list ); 114 } 115 116 static inline bool isEmpty( copy_queue & this ) with(this) {117 #if ! __ALLOC118 return count == 0 && list`isEmpty;119 #else120 return list`isEmpty;121 #endif 122 } 123 124 static size_t __buffer_size = 10; // C_TODO: rework this to be passed from executor through ctors (no need for global) 105 request * ret = 0p; 106 return *0p; 107 } 108 109 // try to reclaim some memory if less than half of buffer is utilized 110 static inline void reclaim( copy_queue & this ) with(this) { 111 if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; } 112 utilized = 0; 113 buffer_size--; 114 buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory 115 } 116 117 static inline bool isEmpty( copy_queue & this ) with(this) { return count == 0; } 118 125 119 struct work_queue { 126 120 __spinlock_t mutex_lock; 127 copy_queue owned_queue; 128 copy_queue * c_queue; // C_TODO: try putting this on the stack with ptr juggling 129 121 copy_queue * owned_queue; // copy queue allocated and cleaned up by this work_queue 122 copy_queue * c_queue; // current queue 123 volatile bool being_processed; // flag to prevent concurrent processing 124 #ifdef ACTOR_STATS 125 unsigned int id; 126 size_t missed; // transfers skipped due to being_processed flag being up 127 #endif 130 128 }; // work_queue 131 static inline void ?{}( work_queue & this ) with(this) { 132 // c_queue = alloc(); 133 // (*c_queue){ __buffer_size }; 134 owned_queue{ __buffer_size }; 135 c_queue = &owned_queue; 136 } 137 // static inline void ^?{}( work_queue & this ) with(this) { delete( c_queue ); } 129 static inline void ?{}( work_queue & this, size_t buf_size, unsigned int i ) with(this) { 130 owned_queue = alloc(); // allocated separately to avoid false sharing 131 (*owned_queue){ buf_size }; 132 c_queue = owned_queue; 133 being_processed = false; 134 #ifdef ACTOR_STATS 135 id = i; 136 missed = 0; 137 #endif 138 } 139 140 // clean up copy_queue owned by this work_queue 141 static inline void ^?{}( work_queue & this ) with(this) { delete( owned_queue ); } 138 142 139 143 static inline void insert( work_queue & this, request & elem ) with(this) { … … 145 149 static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) { 146 150 lock( mutex_lock __cfaabi_dbg_ctx2 ); 151 #ifdef __STEAL 152 153 // check if queue is being processed elsewhere 154 if ( unlikely( being_processed ) ) { 155 #ifdef ACTOR_STATS 156 missed++; 157 #endif 158 unlock( mutex_lock ); 159 return; 160 } 161 162 being_processed = c_queue->count != 0; 163 #endif // __STEAL 164 165 c_queue->utilized = c_queue->count; 166 147 167 // swap copy queue ptrs 148 168 copy_queue * temp = *transfer_to; … … 152 172 } // transfer 153 173 174 // needed since some info needs to persist past worker lifetimes 175 struct worker_info { 176 volatile unsigned long long stamp; 177 #ifdef ACTOR_STATS 178 size_t stolen_from, try_steal, stolen, failed_swaps, msgs_stolen; 179 unsigned long long processed; 180 size_t gulps; 181 #endif 182 }; 183 static inline void ?{}( worker_info & this ) { 184 #ifdef ACTOR_STATS 185 this.stolen_from = 0; 186 this.try_steal = 0; // attempts to steal 187 this.stolen = 0; // successful steals 188 this.processed = 0; // requests processed 189 this.gulps = 0; // number of gulps 190 this.failed_swaps = 0; // steal swap failures 191 this.msgs_stolen = 0; // number of messages stolen 192 #endif 193 this.stamp = rdtscl(); 194 } 195 196 // #ifdef ACTOR_STATS 197 // unsigned int * stolen_arr; 198 // unsigned int * replaced_queue; 199 // #endif 154 200 thread worker { 155 copy_queue owned_queue; 156 work_queue * request_queues; 201 work_queue ** request_queues; 157 202 copy_queue * current_queue; 158 request & req;203 executor * executor_; 159 204 unsigned int start, range; 205 int id; 160 206 }; 161 207 162 static inline void ?{}( worker & this, cluster & clu, work_queue * request_queues, unsigned int start, unsigned int range ) { 208 #ifdef ACTOR_STATS 209 // aggregate counters for statistics 210 size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0, 211 __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0; 212 #endif 213 static inline void ?{}( worker & this, cluster & clu, work_queue ** request_queues, copy_queue * current_queue, executor * executor_, 214 unsigned int start, unsigned int range, int id ) { 163 215 ((thread &)this){ clu }; 164 this.request_queues = request_queues; 165 // this.current_queue = alloc(); 166 // (*this.current_queue){ __buffer_size }; 167 this.owned_queue{ __buffer_size }; 168 this.current_queue = &this.owned_queue; 169 this.start = start; 170 this.range = range; 171 } 172 // static inline void ^?{}( worker & mutex this ) with(this) { delete( current_queue ); } 173 216 this.request_queues = request_queues; // array of all queues 217 this.current_queue = current_queue; // currently gulped queue (start with empty queue to use in swap later) 218 this.executor_ = executor_; // pointer to current executor 219 this.start = start; // start of worker's subrange of request_queues 220 this.range = range; // size of worker's subrange of request_queues 221 this.id = id; // worker's id and index in array of workers 222 } 223 224 static bool no_steal = false; 174 225 struct executor { 175 226 cluster * cluster; // if workers execute on separate cluster 176 227 processor ** processors; // array of virtual processors adding parallelism for workers 177 work_queue * request_queues; // master list of work request queues 178 worker ** workers; // array of workers executing work requests 228 work_queue * request_queues; // master array of work request queues 229 copy_queue * local_queues; // array of all worker local queues to avoid deletion race 230 work_queue ** worker_req_queues; // secondary array of work queues to allow for swapping 231 worker ** workers; // array of workers executing work requests 232 worker_info * w_infos; // array of info about each worker 179 233 unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues 180 234 bool seperate_clus; // use same or separate cluster for executor 181 235 }; // executor 182 236 237 // #ifdef ACTOR_STATS 238 // __spinlock_t out_lock; 239 // #endif 240 static inline void ^?{}( worker & mutex this ) with(this) { 241 #ifdef ACTOR_STATS 242 __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST); 243 __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST); 244 __atomic_add_fetch(&__all_msgs_stolen, executor_->w_infos[id].msgs_stolen,__ATOMIC_SEQ_CST); 245 __atomic_add_fetch(&__total_tries, executor_->w_infos[id].try_steal, __ATOMIC_SEQ_CST); 246 __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST); 247 __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST); 248 249 // per worker steal stats (uncomment alongside the lock above this routine to print) 250 // lock( out_lock __cfaabi_dbg_ctx2 ); 251 // printf("Worker id: %d, processed: %llu messages, attempted %lu, stole: %lu, stolen from: %lu\n", id, processed, try_steal, stolen, __atomic_add_fetch(&executor_->w_infos[id].stolen_from, 0, __ATOMIC_SEQ_CST) ); 252 // int count = 0; 253 // int count2 = 0; 254 // for ( i; range ) { 255 // if ( replaced_queue[start + i] > 0 ){ 256 // count++; 257 // // printf("%d: %u, ",i, replaced_queue[i]); 258 // } 259 // if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0) 260 // count2++; 261 // } 262 // printf("swapped with: %d of %u indices\n", count, executor_->nrqueues / executor_->nworkers ); 263 // printf("%d of %u indices were stolen\n", count2, executor_->nrqueues / executor_->nworkers ); 264 // unlock( out_lock ); 265 #endif 266 } 267 183 268 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) { 184 269 if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" ); 185 __buffer_size = buf_size;186 270 this.nprocessors = nprocessors; 187 271 this.nworkers = nworkers; … … 189 273 this.seperate_clus = seperate_clus; 190 274 275 if ( nworkers == nrqueues ) 276 no_steal = true; 277 278 #ifdef ACTOR_STATS 279 // stolen_arr = aalloc( nrqueues ); 280 // replaced_queue = aalloc( nrqueues ); 281 __total_workers = nworkers; 282 #endif 283 191 284 if ( seperate_clus ) { 192 285 cluster = alloc(); … … 195 288 196 289 request_queues = aalloc( nrqueues ); 197 for ( i; nrqueues ) 198 request_queues[i]{}; 290 worker_req_queues = aalloc( nrqueues ); 291 for ( i; nrqueues ) { 292 request_queues[i]{ buf_size, i }; 293 worker_req_queues[i] = &request_queues[i]; 294 } 199 295 200 296 processors = aalloc( nprocessors ); … … 202 298 (*(processors[i] = alloc())){ *cluster }; 203 299 204 workers = alloc( nworkers ); 300 local_queues = aalloc( nworkers ); 301 workers = aalloc( nworkers ); 302 w_infos = aalloc( nworkers ); 205 303 unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers; 304 305 for ( i; nworkers ) { 306 w_infos[i]{}; 307 local_queues[i]{ buf_size }; 308 } 309 206 310 for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) { 207 311 range = reqPerWorker + ( i < extras ? 1 : 0 ); 208 (*(workers[i] = alloc())){ *cluster, request_queues, start, range};312 (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], &this, start, range, i }; 209 313 } // for 210 314 } 211 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __ buffer_size}; }315 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __DEFAULT_EXECUTOR_BUFSIZE__ }; } 212 316 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues ) { this{ nprocessors, nworkers, nrqueues, __DEFAULT_EXECUTOR_SEPCLUS__ }; } 213 317 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers ) { this{ nprocessors, nworkers, __DEFAULT_EXECUTOR_RQUEUES__ }; } … … 216 320 217 321 static inline void ^?{}( executor & this ) with(this) { 322 #ifdef __STEAL 323 request sentinels[nrqueues]; 324 for ( unsigned int i = 0; i < nrqueues; i++ ) { 325 insert( request_queues[i], sentinels[i] ); // force eventually termination 326 } // for 327 #else 218 328 request sentinels[nworkers]; 219 unsigned int reqPerWorker = nrqueues / nworkers; 220 for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) { 329 unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers; 330 for ( unsigned int i = 0, step = 0, range; i < nworkers; i += 1, step += range ) { 331 range = reqPerWorker + ( i < extras ? 1 : 0 ); 221 332 insert( request_queues[step], sentinels[i] ); // force eventually termination 222 333 } // for 334 #endif 223 335 224 336 for ( i; nworkers ) … … 229 341 } // for 230 342 343 #ifdef ACTOR_STATS 344 size_t misses = 0; 345 for ( i; nrqueues ) { 346 misses += worker_req_queues[i]->missed; 347 } 348 // adelete( stolen_arr ); 349 // adelete( replaced_queue ); 350 #endif 351 231 352 adelete( workers ); 353 adelete( w_infos ); 354 adelete( local_queues ); 232 355 adelete( request_queues ); 356 adelete( worker_req_queues ); 233 357 adelete( processors ); 234 358 if ( seperate_clus ) delete( cluster ); 359 360 #ifdef ACTOR_STATS // print formatted stats 361 printf(" Actor System Stats:\n"); 362 printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed); 363 size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps; 364 printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses); 365 printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\n", 366 __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps); 367 size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen; 368 printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal); 369 #endif 370 235 371 } 236 372 237 373 // this is a static field of executor but have to forward decl for get_next_ticket 238 static unsigned int __next_ticket = 0; 239 240 static inline unsigned int get_next_ticket( executor & this ) with(this) { 241 return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 374 static size_t __next_ticket = 0; 375 376 static inline size_t __get_next_ticket( executor & this ) with(this) { 377 #ifdef __CFA_DEBUG__ 378 size_t temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 379 380 // reserve MAX for dead actors 381 if ( unlikely( temp == MAX ) ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 382 return temp; 383 #else 384 return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_RELAXED) % nrqueues; 385 #endif 242 386 } // tickets 243 387 244 // C_TODO: update globals in this file to be static fields once theproject is done388 // TODO: update globals in this file to be static fields once the static fields project is done 245 389 static executor * __actor_executor_ = 0p; 246 static bool __actor_executor_passed = false; // was an executor passed to start_actor_system247 static unsigned long int __num_actors_;// number of actor objects in system390 static bool __actor_executor_passed = false; // was an executor passed to start_actor_system 391 static size_t __num_actors_ = 0; // number of actor objects in system 248 392 static struct thread$ * __actor_executor_thd = 0p; // used to wake executor after actors finish 249 393 struct actor { 250 unsigned long int ticket; // executor-queue handle to provide FIFO message execution 251 Allocation allocation_; // allocation action 394 size_t ticket; // executor-queue handle 395 Allocation allocation_; // allocation action 396 inline virtual_dtor; 252 397 }; 253 398 254 static inline void ?{}( actor & this ) {399 static inline void ?{}( actor & this ) with(this) { 255 400 // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive 256 401 // member must be called to end it 257 verifyf( __actor_executor_, "Creating actor before calling start_actor_system()." ); 258 this.allocation_ = Nodelete; 259 this.ticket = get_next_ticket( *__actor_executor_ ); 260 __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_SEQ_CST ); 261 } 262 static inline void ^?{}( actor & this ) {} 402 verifyf( __actor_executor_, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" ); 403 allocation_ = Nodelete; 404 ticket = __get_next_ticket( *__actor_executor_ ); 405 __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED ); 406 #ifdef ACTOR_STATS 407 __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST ); 408 #endif 409 } 263 410 264 411 static inline void check_actor( actor & this ) { … … 276 423 } 277 424 278 if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_ SEQ_CST) == 0 ) ) { // all actors have terminated425 if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_RELAXED ) == 0 ) ) { // all actors have terminated 279 426 unpark( __actor_executor_thd ); 280 427 } … … 284 431 struct message { 285 432 Allocation allocation_; // allocation action 433 inline virtual_dtor; 286 434 }; 287 435 288 static inline void ?{}( message & this ) { this.allocation_ = Nodelete; } 289 static inline void ?{}( message & this, Allocation allocation ) { this.allocation_ = allocation; } 290 static inline void ^?{}( message & this ) {} 436 static inline void ?{}( message & this ) { 437 this.allocation_ = Nodelete; 438 } 439 static inline void ?{}( message & this, Allocation allocation ) { 440 memcpy( &this.allocation_, &allocation, sizeof(allocation) ); // optimization to elide ctor 441 verifyf( this.allocation_ != Finished, "The Finished Allocation status is not supported for message types.\n"); 442 } 443 static inline void ^?{}( message & this ) with(this) { 444 CFA_DEBUG( if ( allocation_ == Nodelete ) printf("A message at location %p was allocated but never sent.\n", &this); ) 445 } 291 446 292 447 static inline void check_message( message & this ) { 293 448 switch ( this.allocation_ ) { // analyze message status 294 case Nodelete: break;449 case Nodelete: CFA_DEBUG(this.allocation_ = Finished); break; 295 450 case Delete: delete( &this ); break; 296 451 case Destroy: ^?{}(this); break; … … 298 453 } // switch 299 454 } 455 static inline void set_allocation( message & this, Allocation state ) { 456 this.allocation_ = state; 457 } 300 458 301 459 static inline void deliver_request( request & this ) { 302 Allocation actor_allocation= this.fn( *this.receiver, *this.msg );303 this.receiver->allocation_ = actor_allocation;460 this.receiver->allocation_ = this.fn( *this.receiver, *this.msg ); 461 check_message( *this.msg ); 304 462 check_actor( *this.receiver ); 305 check_message( *this.msg ); 463 } 464 465 // tries to atomically swap two queues and returns 0p if the swap failed 466 // returns ptr to newly owned queue if swap succeeds 467 static inline work_queue * try_swap_queues( worker & this, unsigned int victim_idx, unsigned int my_idx ) with(this) { 468 work_queue * my_queue = request_queues[my_idx]; 469 work_queue * other_queue = request_queues[victim_idx]; 470 471 // if either queue is 0p then they are in the process of being stolen 472 if ( other_queue == 0p ) return 0p; 473 474 // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false 475 if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) 476 return 0p; 477 478 // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false 479 if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) { 480 /* paranoid */ verify( request_queues[my_idx] == 0p ); 481 request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val 482 return 0p; 483 } 484 485 // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically 486 request_queues[my_idx] = other_queue; // last write does not need to be atomic 487 return other_queue; 488 } 489 490 // once a worker to steal from has been chosen, choose queue to steal from 491 static inline void choose_queue( worker & this, unsigned int victim_id, unsigned int swap_idx ) with(this) { 492 // have to calculate victim start and range since victim may be deleted before us in shutdown 493 const unsigned int queues_per_worker = executor_->nrqueues / executor_->nworkers; 494 const unsigned int extras = executor_->nrqueues % executor_->nworkers; 495 unsigned int vic_start, vic_range; 496 if ( extras > victim_id ) { 497 vic_range = queues_per_worker + 1; 498 vic_start = vic_range * victim_id; 499 } else { 500 vic_start = extras + victim_id * queues_per_worker; 501 vic_range = queues_per_worker; 502 } 503 unsigned int start_idx = prng( vic_range ); 504 505 unsigned int tries = 0; 506 work_queue * curr_steal_queue; 507 508 for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) { 509 tries++; 510 curr_steal_queue = request_queues[ i + vic_start ]; 511 // avoid empty queues and queues that are being operated on 512 if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) ) 513 continue; 514 515 #ifdef ACTOR_STATS 516 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx ); 517 if ( curr_steal_queue ) { 518 executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count; 519 executor_->w_infos[id].stolen++; 520 // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED); 521 // replaced_queue[swap_idx]++; 522 // __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED); 523 } else { 524 executor_->w_infos[id].failed_swaps++; 525 } 526 #else 527 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx ); 528 #endif // ACTOR_STATS 529 530 return; 531 } 532 533 return; 534 } 535 536 // choose a worker to steal from 537 static inline void steal_work( worker & this, unsigned int swap_idx ) with(this) { 538 #if RAND 539 unsigned int victim = prng( executor_->nworkers ); 540 if ( victim == id ) victim = ( victim + 1 ) % executor_->nworkers; 541 choose_queue( this, victim, swap_idx ); 542 #elif SEARCH 543 unsigned long long min = MAX; // smaller timestamp means longer since service 544 int min_id = 0; // use ints not uints to avoid integer underflow without hacky math 545 int n_workers = executor_->nworkers; 546 unsigned long long curr_stamp; 547 int scount = 1; 548 for ( int i = (id + 1) % n_workers; scount < n_workers; i = (i + 1) % n_workers, scount++ ) { 549 curr_stamp = executor_->w_infos[i].stamp; 550 if ( curr_stamp < min ) { 551 min = curr_stamp; 552 min_id = i; 553 } 554 } 555 choose_queue( this, min_id, swap_idx ); 556 #endif 306 557 } 307 558 308 559 void main( worker & this ) with(this) { 309 bool should_delete; 560 // #ifdef ACTOR_STATS 561 // for ( i; executor_->nrqueues ) { 562 // replaced_queue[i] = 0; 563 // __atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST ); 564 // } 565 // #endif 566 567 // threshold of empty queues we see before we go stealing 568 const unsigned int steal_threshold = 2 * range; 569 570 // Store variable data here instead of worker struct to avoid any potential false sharing 571 unsigned int empty_count = 0; 572 request & req; 573 work_queue * curr_work_queue; 574 310 575 Exit: 311 576 for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers 312 // C_TODO: potentially check queue count instead of immediately trying to transfer 313 transfer( request_queues[i + start], ¤t_queue ); 577 curr_work_queue = request_queues[i + start]; 578 579 // check if queue is empty before trying to gulp it 580 if ( isEmpty( *curr_work_queue->c_queue ) ) { 581 #ifdef __STEAL 582 empty_count++; 583 if ( empty_count < steal_threshold ) continue; 584 #else 585 continue; 586 #endif 587 } 588 transfer( *curr_work_queue, ¤t_queue ); 589 #ifdef ACTOR_STATS 590 executor_->w_infos[id].gulps++; 591 #endif // ACTOR_STATS 592 #ifdef __STEAL 593 if ( isEmpty( *current_queue ) ) { 594 if ( unlikely( no_steal ) ) continue; 595 empty_count++; 596 if ( empty_count < steal_threshold ) continue; 597 empty_count = 0; 598 599 __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED ); 600 601 #ifdef ACTOR_STATS 602 executor_->w_infos[id].try_steal++; 603 #endif // ACTOR_STATS 604 605 steal_work( this, start + prng( range ) ); 606 continue; 607 } 608 #endif // __STEAL 314 609 while ( ! isEmpty( *current_queue ) ) { 315 &req = &remove( *current_queue, should_delete ); 316 if ( !&req ) continue; // possibly add some work stealing/idle sleep here 610 #ifdef ACTOR_STATS 611 executor_->w_infos[id].processed++; 612 #endif 613 &req = &remove( *current_queue ); 614 if ( !&req ) continue; 317 615 if ( req.stop ) break Exit; 318 616 deliver_request( req ); 319 320 if ( should_delete ) delete( &req ); 321 } // while 617 } 618 #ifdef __STEAL 619 curr_work_queue->being_processed = false; // set done processing 620 empty_count = 0; // we found work so reset empty counter 621 #endif 622 623 // potentially reclaim some of the current queue's vector space if it is unused 624 reclaim( *current_queue ); 322 625 } // for 323 626 } … … 328 631 329 632 static inline void send( actor & this, request & req ) { 633 verifyf( this.ticket != (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" ); 330 634 send( *__actor_executor_, req, this.ticket ); 331 635 } 332 636 637 static inline void __reset_stats() { 638 #ifdef ACTOR_STATS 639 __total_tries = 0; 640 __total_stolen = 0; 641 __all_gulps = 0; 642 __total_failed_swaps = 0; 643 __all_processed = 0; 644 __num_actors_stats = 0; 645 __all_msgs_stolen = 0; 646 #endif 647 } 648 333 649 static inline void start_actor_system( size_t num_thds ) { 650 __reset_stats(); 334 651 __actor_executor_thd = active_thread(); 335 652 __actor_executor_ = alloc(); … … 337 654 } 338 655 339 static inline void start_actor_system() { start_actor_system( active_cluster()->procs.total ); } 656 // TODO: potentially revisit getting number of processors 657 // ( currently the value stored in active_cluster()->procs.total is often stale 658 // and doesn't reflect how many procs are allocated ) 659 // static inline void start_actor_system() { start_actor_system( active_cluster()->procs.total ); } 660 static inline void start_actor_system() { start_actor_system( 1 ); } 340 661 341 662 static inline void start_actor_system( executor & this ) { 663 __reset_stats(); 342 664 __actor_executor_thd = active_thread(); 343 665 __actor_executor_ = &this; … … 354 676 __actor_executor_passed = false; 355 677 } 678 679 // Default messages to send to any actor to change status 680 // assigned at creation to __base_msg_finished to avoid unused message warning 681 message __base_msg_finished @= { .allocation_ : Finished }; 682 struct __DeleteMsg { inline message; } DeleteMsg = __base_msg_finished; 683 struct __DestroyMsg { inline message; } DestroyMsg = __base_msg_finished; 684 struct __FinishedMsg { inline message; } FinishedMsg = __base_msg_finished; 685 686 Allocation receive( actor & this, __DeleteMsg & msg ) { return Delete; } 687 Allocation receive( actor & this, __DestroyMsg & msg ) { return Destroy; } 688 Allocation receive( actor & this, __FinishedMsg & msg ) { return Finished; } 689
Note:
See TracChangeset
for help on using the changeset viewer.