Changes in / [747a7c1:8fa77eb]
- Files:
-
- 4 edited
-
libcfa/src/concurrency/actor.hfa (modified) (13 diffs)
-
src/Common/ScopedMap.h (modified) (1 diff)
-
src/Concurrency/Actors.cpp (modified) (6 diffs)
-
src/GenPoly/Box.cc (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/actor.hfa
r747a7c1 r8fa77eb 5 5 #include <list.hfa> 6 6 #include <kernel.hfa> 7 #include <vector2.hfa>8 7 9 8 #ifdef __CFA_DEBUG__ … … 26 25 #define __DEFAULT_EXECUTOR_SEPCLUS__ false 27 26 28 #define __STEAL 1 // workstealing toggle. Disjoint from toggles above 29 30 // whether to steal work or to steal a queue Only applicable if __STEAL == 1 31 #define __STEAL_WORK 0 32 33 // heuristic selection (only set one to be 1) 34 #define __RAND_QUEUE 1 35 #define __RAND_WORKER 0 36 37 // show stealing stats 38 // #define __STEAL_STATS 27 // when you flip this make sure to recompile compiler and flip the appropriate flag there too in Actors.cpp 28 #define __ALLOC 0 39 29 40 30 // forward decls … … 50 40 __receive_fn fn; 51 41 bool stop; 52 }; 42 inline dlink(request); 43 }; 44 P9_EMBEDDED( request, dlink(request) ) 53 45 54 46 static inline void ?{}( request & this ) { this.stop = true; } // default ctor makes a sentinel … … 68 60 // hybrid data structure. Copies until buffer is full and then allocates for intrusive list 69 61 struct copy_queue { 62 dlist( request ) list; 63 #if ! __ALLOC 70 64 request * buffer; 71 size_t count, buffer_size, index, utilized, last_size; 65 size_t count, buffer_size, index; 66 #endif 72 67 }; 73 68 static inline void ?{}( copy_queue & this ) {} 74 69 static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) { 70 list{}; 71 #if ! __ALLOC 75 72 buffer_size = buf_size; 76 73 buffer = aalloc( buffer_size ); 77 74 count = 0; 78 utilized = 0;79 75 index = 0; 80 last_size = 0; 81 } 82 static inline void ^?{}( copy_queue & this ) with(this) { adelete(buffer); } 76 #endif 77 } 78 static inline void ^?{}( copy_queue & this ) with(this) { 79 #if ! __ALLOC 80 adelete(buffer); 81 #endif 82 } 83 83 84 84 static inline void insert( copy_queue & this, request & elem ) with(this) { 85 if ( count >= buffer_size ) { // increase arr size86 last_size = buffer_size;87 buffer _size = 2 * buffer_size;88 buffer = realloc( buffer, sizeof( request ) * buffer_size );89 /* paranoid */ verify( buffer );85 #if ! __ALLOC 86 if ( count < buffer_size ) { // fast path ( no alloc ) 87 buffer[count]{ elem }; 88 count++; 89 return; 90 90 } 91 buffer[count]{ elem }; // C_TODO: change to memcpy 92 // memcpy( &buffer[count], &elem, sizeof(request) ); 93 count++; 91 request * new_elem = alloc(); 92 (*new_elem){ elem }; 93 insert_last( list, *new_elem ); 94 #else 95 insert_last( list, elem ); 96 #endif 94 97 } 95 98 96 99 // once you start removing you need to remove all elements 97 100 // it is not supported to call insert() before the list is fully empty 98 static inline request & remove( copy_queue & this ) with(this) { 101 // should_delete is an output param 102 static inline request & remove( copy_queue & this, bool & should_delete ) with(this) { 103 #if ! __ALLOC 99 104 if ( count > 0 ) { 100 105 count--; 106 should_delete = false; 101 107 size_t old_idx = index; 102 108 index = count == 0 ? 0 : index + 1; 103 109 return buffer[old_idx]; 104 110 } 105 request * ret = 0p; 106 return *0p; 107 } 108 109 // try to reclaim some memory 110 static inline void reclaim( copy_queue & this ) with(this) { 111 if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; } 112 utilized = 0; 113 buffer_size--; 114 buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory 115 } 116 117 static inline bool isEmpty( copy_queue & this ) with(this) { return count == 0; } 111 #endif 112 should_delete = true; 113 return try_pop_front( list ); 114 } 115 116 static inline bool isEmpty( copy_queue & this ) with(this) { 117 #if ! __ALLOC 118 return count == 0 && list`isEmpty; 119 #else 120 return list`isEmpty; 121 #endif 122 } 118 123 119 124 static size_t __buffer_size = 10; // C_TODO: rework this to be passed from executor through ctors (no need for global) … … 121 126 __spinlock_t mutex_lock; 122 127 copy_queue owned_queue; 123 copy_queue * c_queue; 124 volatile bool being_processed; 128 copy_queue * c_queue; // C_TODO: try putting this on the stack with ptr juggling 129 125 130 }; // work_queue 126 131 static inline void ?{}( work_queue & this ) with(this) { 132 // c_queue = alloc(); 133 // (*c_queue){ __buffer_size }; 127 134 owned_queue{ __buffer_size }; 128 135 c_queue = &owned_queue; 129 being_processed = false; 130 }136 } 137 // static inline void ^?{}( work_queue & this ) with(this) { delete( c_queue ); } 131 138 132 139 static inline void insert( work_queue & this, request & elem ) with(this) { … … 136 143 } // insert 137 144 138 static inline void transfer( work_queue & this, copy_queue ** transfer_to , work_queue ** queue_arr, unsigned int idx) with(this) {145 static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) { 139 146 lock( mutex_lock __cfaabi_dbg_ctx2 ); 140 #if __STEAL141 142 #if __STEAL_WORK143 if ( unlikely( being_processed ) )144 #else145 // check if queue has been stolen out from under us between146 // transfer() call and lock acquire C_TODO: maybe just use new queue!147 if ( unlikely( being_processed || queue_arr[idx] != &this ) )148 #endif // __STEAL_WORK149 {150 unlock( mutex_lock );151 return;152 }153 154 being_processed = c_queue->count != 0;155 #endif // __STEAL156 157 c_queue->utilized = c_queue->count;158 159 147 // swap copy queue ptrs 160 148 copy_queue * temp = *transfer_to; … … 165 153 166 154 thread worker { 167 work_queue ** request_queues; 155 copy_queue owned_queue; 156 work_queue * request_queues; 168 157 copy_queue * current_queue; 169 worker ** worker_arr; // C_TODO: change n_workers, n_queues,worker_arr to just be pulled from ptr to executor170 158 request & req; 171 unsigned int start, range, empty_count, n_workers, n_queues, id; 172 #ifdef __STEAL_STATS 173 unsigned int try_steal, stolen; 174 #endif 175 }; 176 177 #ifdef __STEAL_STATS 178 unsigned int total_tries = 0, total_stolen = 0, total_workers; 179 #endif 180 static inline void ?{}( worker & this, cluster & clu, work_queue ** request_queues, copy_queue * current_queue, unsigned int start, 181 unsigned int range, worker ** worker_arr, unsigned int n_workers, unsigned int n_queues, unsigned int id ) { 159 unsigned int start, range; 160 }; 161 162 static inline void ?{}( worker & this, cluster & clu, work_queue * request_queues, unsigned int start, unsigned int range ) { 182 163 ((thread &)this){ clu }; 183 164 this.request_queues = request_queues; 184 this.current_queue = current_queue; 165 // this.current_queue = alloc(); 166 // (*this.current_queue){ __buffer_size }; 167 this.owned_queue{ __buffer_size }; 168 this.current_queue = &this.owned_queue; 185 169 this.start = start; 186 170 this.range = range; 187 this.empty_count = 0; 188 this.n_workers = n_workers; 189 this.worker_arr = worker_arr; 190 this.n_queues = n_queues; 191 this.id = id; 192 #ifdef __STEAL_STATS 193 this.try_steal = 0; 194 this.stolen = 0; 195 total_workers = n_workers; 196 #endif 197 } 198 static inline void ^?{}( worker & mutex this ) with(this) { 199 // delete( current_queue ); 200 #ifdef __STEAL_STATS 201 __atomic_add_fetch(&total_tries, try_steal, __ATOMIC_SEQ_CST); 202 __atomic_add_fetch(&total_stolen, stolen, __ATOMIC_SEQ_CST); 203 if (__atomic_sub_fetch(&total_workers, 1, __ATOMIC_SEQ_CST) == 0) 204 printf("steal attempts: %u, steals: %u\n", total_tries, total_stolen); 205 #endif 206 } 171 } 172 // static inline void ^?{}( worker & mutex this ) with(this) { delete( current_queue ); } 207 173 208 174 struct executor { 209 175 cluster * cluster; // if workers execute on separate cluster 210 176 processor ** processors; // array of virtual processors adding parallelism for workers 211 work_queue * request_queues; // master array of work request queues 212 copy_queue * local_queues; // array of all worker local queues to avoid deletion race 213 work_queue ** worker_req_queues; // secondary array of work queues to allow for swapping 214 worker ** workers; // array of workers executing work requests 177 work_queue * request_queues; // master list of work request queues 178 worker ** workers; // array of workers executing work requests 215 179 unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues 216 180 bool seperate_clus; // use same or separate cluster for executor … … 231 195 232 196 request_queues = aalloc( nrqueues ); 233 worker_req_queues = aalloc( nrqueues ); 234 for ( i; nrqueues ) { 197 for ( i; nrqueues ) 235 198 request_queues[i]{}; 236 worker_req_queues[i] = &request_queues[i];237 }238 199 239 200 processors = aalloc( nprocessors ); … … 241 202 (*(processors[i] = alloc())){ *cluster }; 242 203 243 local_queues = aalloc( nworkers );244 204 workers = alloc( nworkers ); 245 205 unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers; 246 206 for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) { 247 local_queues[i]{ buf_size };248 207 range = reqPerWorker + ( i < extras ? 1 : 0 ); 249 (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], start, range, workers, nworkers, nrqueues, i};208 (*(workers[i] = alloc())){ *cluster, request_queues, start, range }; 250 209 } // for 251 210 } … … 256 215 static inline void ?{}( executor & this ) { this{ __DEFAULT_EXECUTOR_PROCESSORS__ }; } 257 216 258 // C_TODO: once stealing is implemented make sure shutdown still works259 217 static inline void ^?{}( executor & this ) with(this) { 260 #if __STEAL261 request sentinels[nrqueues];262 for ( unsigned int i = 0; i < nrqueues; i++ ) {263 insert( request_queues[i], sentinels[i] ); // force eventually termination264 } // for265 #else266 218 request sentinels[nworkers]; 267 219 unsigned int reqPerWorker = nrqueues / nworkers; … … 269 221 insert( request_queues[step], sentinels[i] ); // force eventually termination 270 222 } // for 271 #endif272 223 273 224 for ( i; nworkers ) … … 279 230 280 231 adelete( workers ); 281 adelete( local_queues );282 232 adelete( request_queues ); 283 adelete( worker_req_queues );284 233 adelete( processors ); 285 234 if ( seperate_clus ) delete( cluster ); … … 357 306 } 358 307 359 // Couple of ways to approach work stealing360 // 1: completely worker agnostic, just find a big queue and steal it361 // 2: track some heuristic of worker's load and focus on that and then pick a queue from that worker362 // worker heuristics:363 // - how many queues have work?364 // - size of largest queue365 // - total # of messages366 // - messages currently servicing367 // - pick randomly368 // - pick from closer threads/workers (this can be combined with others)369 370 // lock free or global lock for queue stealing371 #define __LOCK_SWP 0372 373 __spinlock_t swp_lock;374 375 // tries to atomically swap two queues and returns a bool indicating if the swap failed376 static inline bool try_swap_queues( worker & this, unsigned int victim_idx, unsigned int my_idx ) with(this) {377 #if __LOCK_SWP378 379 lock( swp_lock __cfaabi_dbg_ctx2 );380 work_queue * temp = request_queues[my_idx];381 request_queues[my_idx] = request_queues[victim_idx];382 request_queues[victim_idx] = temp;383 unlock( swp_lock );384 385 return true;386 387 #else // __LOCK_SWP else388 work_queue * my_queue = request_queues[my_idx];389 work_queue * other_queue = request_queues[victim_idx];390 if ( other_queue == 0p || my_queue == 0p ) return false;391 392 // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false393 if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) )394 return false;395 396 // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false397 if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {398 /* paranoid */ verify( request_queues[my_idx] == 0p );399 request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val400 return false;401 }402 403 // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically404 request_queues[my_idx] = other_queue; // last write does not need to be atomic405 return true;406 407 #endif // __LOCK_SWP408 }409 410 // once a worker to steal from has been chosen, choose queue to steal from411 static inline bool choose_queue( worker & this, unsigned int victim_id, unsigned int & last_idx ) with(this) {412 #if __RAND_QUEUE413 unsigned int tries = 0;414 const unsigned int start_idx = prng( n_queues );415 work_queue * curr_steal_queue;416 417 for ( unsigned int i = start_idx; tries < n_queues; i = (i + 1) % n_queues ) {418 tries++;419 curr_steal_queue = request_queues[i];420 #if __STEAL_WORK421 422 // avoid empty queues and queues that are being operated on423 if ( curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) )424 continue;425 426 // in this case we just return from transfer if this doesn't work427 transfer( *curr_steal_queue, ¤t_queue, request_queues, i );428 if ( isEmpty( *current_queue ) ) continue;429 last_idx = i;430 431 #ifdef __STEAL_STATS432 stolen++;433 #endif // __STEAL_STATS434 435 #else // __STEAL_WORK else436 437 // avoid empty queues and queues that are being operated on438 if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) )439 continue;440 441 #ifdef __STEAL_STATS442 bool success = try_swap_queues( this, i, last_idx );443 if ( success ) stolen++;444 #else445 try_swap_queues( this, i, last_idx );446 #endif // __STEAL_STATS447 448 // C_TODO: try transfer immediately449 // transfer( *request_queues[last_idx], ¤t_queue, request_queues, last_idx );450 // if ( isEmpty( *current_queue ) ) return false;451 return false;452 453 #endif // __STEAL_WORK454 455 return true;456 } // for457 return false;458 459 #elif __RAND_WORKER460 461 // have to calculate victim start and range since victim may be deleted before us in shutdown462 const unsigned int queues_per_worker = n_queues / n_workers;463 const unsigned int extras = n_queues % n_workers;464 unsigned int vic_start, vic_range;465 if ( extras > victim_id ) {466 vic_range = queues_per_worker + 1;467 vic_start = vic_range * victim_id;468 } else {469 vic_start = extras + victim_id * queues_per_worker;470 vic_range = queues_per_worker;471 }472 unsigned int start_idx = prng( vic_range );473 unsigned int tries = 0;474 work_queue * curr_steal_queue;475 476 for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) {477 tries++;478 curr_steal_queue = request_queues[ i + vic_start ];479 // avoid empty queues and queues that are being operated on480 if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || isEmpty( *curr_steal_queue->c_queue ) )481 continue;482 483 try_swap_queues( this, i, last_idx );484 485 #ifdef __STEAL_STATS486 bool success = try_swap_queues( this, i, last_idx );487 if ( success ) stolen++;488 #else489 try_swap_queues( this, i, last_idx );490 #endif // __STEAL_STATS491 492 // C_TODO: try transfer immediately493 // transfer( *request_queues[last_idx], ¤t_queue, request_queues, last_idx );494 // if ( isEmpty( *current_queue ) ) return false;495 return false;496 }497 #endif498 }499 500 // choose a worker to steal from501 static inline bool choose_victim( worker & this, unsigned int & last_idx ) with(this) {502 #if __RAND_WORKER503 unsigned int victim = prng( n_workers );504 if ( victim == id ) victim = ( victim + 1 ) % n_workers;505 return choose_queue( this, victim, last_idx );506 #else507 return choose_queue( this, 0, last_idx );508 #endif509 }510 511 // look for work to steal512 // returns a bool: true => a queue was stolen, false => no work was stolen513 static inline bool steal_work( worker & this, unsigned int & last_idx ) with(this) { // C_TODO: add debug tracking of how many steals occur514 // to steal queue acquire both queue's locks in address ordering (maybe can do atomic swap)515 // maybe a flag to hint which queue is being processed516 // look at count to see if queue is worth stealing (dont steal empty queues)517 // if steal and then flag is up then dont process and just continue looking at own queues518 // (best effort approach) its ok if stealing isn't fruitful519 // -> more important to not delay busy threads520 521 return choose_victim( this, last_idx );522 }523 524 308 void main( worker & this ) with(this) { 525 // threshold of empty queues we see before we go stealing 526 const unsigned int steal_threshold = 2 * n_queues; 527 unsigned int curr_idx; 528 work_queue * curr_work_queue; 309 bool should_delete; 529 310 Exit: 530 311 for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers 531 312 // C_TODO: potentially check queue count instead of immediately trying to transfer 532 curr_idx = i + start; 533 curr_work_queue = request_queues[curr_idx]; 534 transfer( *curr_work_queue, ¤t_queue, request_queues, curr_idx ); 535 if ( isEmpty( *current_queue ) ) { 536 #if __STEAL 537 empty_count++; 538 if ( empty_count < steal_threshold ) continue; 539 empty_count = 0; // C_TODO: look into stealing backoff schemes 540 #ifdef __STEAL_STATS 541 try_steal++; 542 #endif // __STEAL_STATS 543 544 if ( ! steal_work( this, curr_idx ) ) continue; 545 546 #else // __STEAL else 547 548 continue; 549 550 #endif // __STEAL 551 } 313 transfer( request_queues[i + start], ¤t_queue ); 552 314 while ( ! isEmpty( *current_queue ) ) { 553 &req = &remove( *current_queue );315 &req = &remove( *current_queue, should_delete ); 554 316 if ( !&req ) continue; // possibly add some work stealing/idle sleep here 555 317 if ( req.stop ) break Exit; 556 318 deliver_request( req ); 557 } 558 #if __STEAL 559 curr_work_queue->being_processed = false; // set done processing 560 #endif 561 empty_count = 0; // we found work so reset empty counter 562 reclaim( *current_queue ); 319 320 if ( should_delete ) delete( &req ); 321 } // while 563 322 } // for 564 323 } -
src/Common/ScopedMap.h
r747a7c1 r8fa77eb 170 170 size_type erase( const Key & key ) { 171 171 for ( auto it = scopes.rbegin() ; it != scopes.rend() ; ++it ) { 172 size_type i = it->map.erase( key ); 173 if ( 0 != i ) return i; 172 if ( size_type i = it->map.erase( key ) ; 0 != i ) return i; 174 173 } 175 174 return 0; -
src/Concurrency/Actors.cpp
r747a7c1 r8fa77eb 180 180 }; 181 181 182 #define __ALLOC 0 // C_TODO: complete swap to no-alloc version 183 182 184 struct GenReceiveDecls : public ast::WithDeclsToAdd<> { 183 185 unordered_set<const StructDecl *> & actorStructDecls; … … 216 218 /* 217 219 static inline derived_actor & ?|?( derived_actor & receiver, derived_msg & msg ) { 218 request new_req;220 request * new_req = alloc(); 219 221 Allocation (*my_work_fn)( derived_actor &, derived_msg & ) = receive; 220 222 __receive_fn fn = (__receive_fn)my_work_fn; 221 new_req{ &receiver, &msg, fn };222 send( receiver, new_req );223 (*new_req){ &receiver, &msg, fn }; 224 send( receiver, *new_req ); 223 225 return receiver; 224 226 } … … 226 228 CompoundStmt * sendBody = new CompoundStmt( decl->location ); 227 229 230 #if __ALLOC 231 // Generates: request * new_req = alloc(); 232 sendBody->push_back( new DeclStmt( 233 decl->location, 234 new ObjectDecl( 235 decl->location, 236 "new_req", 237 new PointerType( new StructInstType( *requestDecl ) ), 238 new SingleInit( decl->location, new UntypedExpr( decl->location, new NameExpr( decl->location, "alloc" ), {} ) ) 239 ) 240 )); 241 #else 228 242 // Generates: request new_req; 229 243 sendBody->push_back( new DeclStmt( … … 235 249 ) 236 250 )); 251 #endif 237 252 238 253 // Function type is: Allocation (*)( derived_actor &, derived_msg & ) … … 275 290 )); 276 291 292 #if __ALLOC 293 // Generates: (*new_req){ &receiver, &msg, fn }; 294 sendBody->push_back( new ExprStmt( 295 decl->location, 296 new UntypedExpr ( 297 decl->location, 298 new NameExpr( decl->location, "?{}" ), 299 { 300 new UntypedExpr( decl->location, new NameExpr( decl->location, "*?" ), { new NameExpr( decl->location, "new_req" ) } ), 301 new AddressExpr( new NameExpr( decl->location, "receiver" ) ), 302 new AddressExpr( new NameExpr( decl->location, "msg" ) ), 303 new NameExpr( decl->location, "fn" ) 304 } 305 ) 306 )); 307 308 // Generates: send( receiver, *new_req ); 309 sendBody->push_back( new ExprStmt( 310 decl->location, 311 new UntypedExpr ( 312 decl->location, 313 new NameExpr( decl->location, "send" ), 314 { 315 { 316 new NameExpr( decl->location, "receiver" ), 317 new UntypedExpr( decl->location, new NameExpr( decl->location, "*?" ), { new NameExpr( decl->location, "new_req" ) } ) 318 } 319 } 320 ) 321 )); 322 #else 277 323 // Generates: new_req{ &receiver, &msg, fn }; 278 324 sendBody->push_back( new ExprStmt( … … 304 350 ) 305 351 )); 352 #endif 306 353 307 354 // Generates: return receiver; -
src/GenPoly/Box.cc
r747a7c1 r8fa77eb 80 80 CallAdapter(); 81 81 82 void premutate( Declaration * declaration );83 82 void premutate( FunctionDecl * functionDecl ); 84 83 void premutate( TypeDecl * typeDecl ); … … 455 454 456 455 CallAdapter::CallAdapter() : tempNamer( "_temp" ) {} 457 458 void CallAdapter::premutate( Declaration * ) {459 // Prevent type declaration information from leaking out.460 GuardScope( scopeTyVars );461 }462 456 463 457 void CallAdapter::premutate( FunctionDecl *functionDecl ) {
Note:
See TracChangeset
for help on using the changeset viewer.