Changeset 884f3f67
- Timestamp:
- Mar 14, 2022, 2:24:51 PM (3 years ago)
- Branches:
- ADT, ast-experimental, enum, master, pthread-emulation, qualifiedEnum
- Children:
- bfb9bf5
- Parents:
- c42b8a1
- Location:
- libcfa/src/concurrency
- Files:
-
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/kernel.hfa
rc42b8a1 r884f3f67 155 155 void ^?{}(__intrusive_lane_t & this); 156 156 157 // Aligned timestamps which are used by the re laxed ready queue157 // Aligned timestamps which are used by the ready queue and io subsystem 158 158 struct __attribute__((aligned(128))) __timestamp_t { 159 159 volatile unsigned long long tv; … … 161 161 }; 162 162 163 static inline void ?{}(__timestamp_t & this) { this.tv = 0; this.ma = 0; } 164 static inline void ^?{}(__timestamp_t &) {} 165 166 163 167 struct __attribute__((aligned(16))) __cache_id_t { 164 168 volatile unsigned id; 165 169 }; 166 170 167 // Aligned timestamps which are used by the relaxed ready queue 168 struct __attribute__((aligned(128))) __help_cnts_t { 169 volatile unsigned long long src; 170 volatile unsigned long long dst; 171 volatile unsigned long long tri; 172 }; 173 174 static inline void ?{}(__timestamp_t & this) { this.tv = 0; this.ma = 0; } 175 static inline void ^?{}(__timestamp_t &) {} 176 177 struct __attribute__((aligned(128))) __ready_queue_caches_t; 178 void ?{}(__ready_queue_caches_t & this); 179 void ^?{}(__ready_queue_caches_t & this); 180 181 //TODO adjust cache size to ARCHITECTURE 182 // Structure holding the ready queue 183 struct __ready_queue_t { 184 // Data tracking the actual lanes 185 // On a seperate cacheline from the used struct since 186 // used can change on each push/pop but this data 187 // only changes on shrink/grow 188 struct { 189 // Arary of lanes 190 __intrusive_lane_t * volatile data; 191 192 // Array of times 193 __timestamp_t * volatile tscs; 194 195 __cache_id_t * volatile caches; 196 197 // Array of stats 198 __help_cnts_t * volatile help; 199 200 // Number of lanes (empty or not) 201 volatile size_t count; 202 } lanes; 203 }; 204 205 void ?{}(__ready_queue_t & this); 206 void ^?{}(__ready_queue_t & this); 171 // //TODO adjust cache size to ARCHITECTURE 172 // // Structure holding the ready queue 173 // struct __ready_queue_t { 174 // // Data tracking the actual lanes 175 // // On a seperate cacheline from the used struct since 176 // // used can change on each push/pop but this data 177 // // only changes on shrink/grow 178 // struct { 179 // // Arary of lanes 180 // __intrusive_lane_t * volatile data; 181 182 // __cache_id_t * volatile caches; 183 184 // // Number of lanes (empty or not) 185 // volatile size_t count; 186 // } lanes; 187 // }; 188 189 // void ?{}(__ready_queue_t & this); 190 // void ^?{}(__ready_queue_t & this); 207 191 208 192 // Idle Sleep … … 230 214 // Cluster 231 215 struct __attribute__((aligned(128))) cluster { 232 // Ready queue for threads 233 __ready_queue_t ready_queue; 216 struct { 217 struct { 218 // Arary of subqueues 219 __intrusive_lane_t * volatile data; 220 221 // Time since subqueues were processed 222 __timestamp_t * volatile tscs; 223 224 // Number of subqueue / timestamps 225 size_t count; 226 } readyQ; 227 228 struct { 229 // Number of I/O subqueues 230 volatile size_t count; 231 232 // Time since subqueues were processed 233 __timestamp_t * volatile tscs; 234 } io; 235 236 // Cache each kernel thread belongs to 237 __cache_id_t * volatile caches; 238 } sched; 239 240 // // Ready queue for threads 241 // __ready_queue_t ready_queue; 234 242 235 243 // Name of the cluster -
libcfa/src/concurrency/kernel/cluster.cfa
rc42b8a1 r884f3f67 221 221 //----------------------------------------------------------------------- 222 222 // Check that all the intrusive queues in the data structure are still consistent 223 static void check ( __ready_queue_t & q ) with (q) {223 static void check_readyQ( cluster * cltr ) with (cltr->sched) { 224 224 #if defined(__CFA_WITH_VERIFY__) 225 225 { 226 for( idx ; lanes.count ) { 227 __intrusive_lane_t & sl = lanes.data[idx]; 228 assert(!lanes.data[idx].lock); 226 const unsigned lanes_count = readyQ.count; 227 for( idx ; lanes_count ) { 228 __intrusive_lane_t & sl = readyQ.data[idx]; 229 assert(!readyQ.data[idx].lock); 229 230 230 231 if(is_empty(sl)) { … … 257 258 it->rdq.id = value; 258 259 it->rdq.target = MAX; 259 value += __ readyq_shard_factor;260 value += __shard_factor.readyq; 260 261 it = &(*it)`next; 261 262 } … … 268 269 } 269 270 270 static void fix_times( struct cluster * cltr ) with( cltr->ready_queue) {271 lanes.tscs = alloc(lanes.count, lanes.tscs`realloc);272 for(i; lanes.count) {273 lanes.tscs[i].tv = rdtscl();274 lanes.tscs[i].ma = 0;271 static void fix_times( __timestamp_t * volatile & tscs, unsigned count ) { 272 tscs = alloc(count, tscs`realloc); 273 for(i; count) { 274 tscs[i].tv = rdtscl(); 275 tscs[i].ma = 0; 275 276 } 276 277 } … … 278 279 // Grow the ready queue 279 280 void ready_queue_grow(struct cluster * cltr) { 280 size_t ncount;281 281 int target = cltr->procs.total; 282 282 … … 285 285 286 286 // Make sure that everything is consistent 287 /* paranoid */ check( cltr->ready_queue ); 288 289 // grow the ready queue 290 with( cltr->ready_queue ) { 291 // Find new count 292 // Make sure we always have atleast 1 list 293 if(target >= 2) { 294 ncount = target * __readyq_shard_factor; 295 } else { 296 ncount = __readyq_single_shard; 297 } 298 299 // Allocate new array (uses realloc and memcpies the data) 300 lanes.data = alloc( ncount, lanes.data`realloc ); 301 302 // Fix the moved data 303 for( idx; (size_t)lanes.count ) { 304 fix(lanes.data[idx]); 305 } 306 307 // Construct new data 308 for( idx; (size_t)lanes.count ~ ncount) { 309 (lanes.data[idx]){}; 310 } 311 312 // Update original 313 lanes.count = ncount; 314 315 lanes.caches = alloc( target, lanes.caches`realloc ); 316 } 317 318 fix_times(cltr); 319 287 /* paranoid */ check_readyQ( cltr ); 288 289 290 // Find new count 291 // Make sure we always have atleast 1 list 292 size_t ocount = cltr->sched.readyQ.count; 293 size_t ncount = max(target * __shard_factor.readyq, __readyq_single_shard); 294 295 // Do we have to do anything? 296 if( ocount != ncount ) { 297 298 // grow the ready queue 299 with( cltr->sched ) { 300 301 // Allocate new array (uses realloc and memcpies the data) 302 readyQ.data = alloc( ncount, readyQ.data`realloc ); 303 304 // Fix the moved data 305 for( idx; ocount ) { 306 fix(readyQ.data[idx]); 307 } 308 309 // Construct new data 310 for( idx; ocount ~ ncount) { 311 (readyQ.data[idx]){}; 312 } 313 314 // Update original count 315 readyQ.count = ncount; 316 } 317 318 319 fix_times(cltr->sched.readyQ.tscs, cltr->sched.readyQ.count); 320 } 321 322 // realloc the caches 323 cltr->sched.caches = alloc( target, cltr->sched.caches`realloc ); 324 325 // reassign the clusters. 320 326 reassign_cltr_id(cltr); 321 327 322 328 // Make sure that everything is consistent 323 /* paranoid */ check( cltr->ready_queue ); 329 /* paranoid */ check_readyQ( cltr ); 330 /* paranoid */ verify( (target == 0) == (cltr->sched.caches == 0p) ); 324 331 325 332 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n"); … … 334 341 335 342 // Make sure that everything is consistent 336 /* paranoid */ check ( cltr->ready_queue);343 /* paranoid */ check_readyQ( cltr ); 337 344 338 345 int target = cltr->procs.total; 339 346 340 with( cltr-> ready_queue) {347 with( cltr->sched ) { 341 348 // Remember old count 342 size_t ocount = lanes.count;349 size_t ocount = readyQ.count; 343 350 344 351 // Find new count 345 352 // Make sure we always have atleast 1 list 346 lanes.count = target >= 2 ? target * __readyq_shard_factor: __readyq_single_shard; 347 /* paranoid */ verify( ocount >= lanes.count ); 348 /* paranoid */ verify( lanes.count == target * __readyq_shard_factor || target < 2 ); 353 size_t ncount = max(target * __shard_factor.readyq, __readyq_single_shard); 354 /* paranoid */ verifyf( ocount >= ncount, "Error in shrinking size calculation, %zu >= %zu", ocount, ncount ); 355 /* paranoid */ verifyf( ncount == target * __shard_factor.readyq || ncount == __readyq_single_shard, 356 /* paranoid */ "Error in shrinking size calculation, expected %u or %u, got %zu", target * __shard_factor.readyq, ncount ); 357 358 readyQ.count = ncount; 349 359 350 360 // for printing count the number of displaced threads … … 354 364 355 365 // redistribute old data 356 for( idx; (size_t)lanes.count ~ ocount) {366 for( idx; ncount ~ ocount) { 357 367 // Lock is not strictly needed but makes checking invariants much easier 358 __attribute__((unused)) bool locked = __atomic_try_acquire(& lanes.data[idx].lock);368 __attribute__((unused)) bool locked = __atomic_try_acquire(&readyQ.data[idx].lock); 359 369 verify(locked); 360 370 361 371 // As long as we can pop from this lane to push the threads somewhere else in the queue 362 while(!is_empty( lanes.data[idx])) {372 while(!is_empty(readyQ.data[idx])) { 363 373 struct thread$ * thrd; 364 374 unsigned long long _; 365 [thrd, _] = pop( lanes.data[idx]);375 [thrd, _] = pop(readyQ.data[idx]); 366 376 367 377 push(cltr, thrd, true); … … 374 384 375 385 // Unlock the lane 376 __atomic_unlock(& lanes.data[idx].lock);386 __atomic_unlock(&readyQ.data[idx].lock); 377 387 378 388 // TODO print the queue statistics here 379 389 380 ^( lanes.data[idx]){};390 ^(readyQ.data[idx]){}; 381 391 } 382 392 … … 384 394 385 395 // Allocate new array (uses realloc and memcpies the data) 386 lanes.data = alloc( lanes.count, lanes.data`realloc );396 readyQ.data = alloc( ncount, readyQ.data`realloc ); 387 397 388 398 // Fix the moved data 389 for( idx; (size_t)lanes.count ) {390 fix( lanes.data[idx]);391 } 392 393 lanes.caches = alloc( target, lanes.caches`realloc);394 } 395 396 fix_times(cltr); 399 for( idx; ncount ) { 400 fix(readyQ.data[idx]); 401 } 402 403 fix_times(readyQ.tscs, ncount); 404 } 405 cltr->sched.caches = alloc( target, cltr->sched.caches`realloc ); 406 397 407 398 408 … … 400 410 401 411 // Make sure that everything is consistent 402 /* paranoid */ check( cltr->ready_queue ); 412 /* paranoid */ verify( (target == 0) == (cltr->sched.caches == 0p) ); 413 /* paranoid */ check_readyQ( cltr ); 403 414 404 415 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n"); 405 416 /* paranoid */ verify( ready_mutate_islocked() ); 417 } 418 419 void ready_queue_close(struct cluster * cltr) { 420 free( cltr->sched.readyQ.data ); 421 free( cltr->sched.readyQ.tscs ); 422 cltr->sched.readyQ.data = 0p; 423 cltr->sched.readyQ.tscs = 0p; 424 cltr->sched.readyQ.count = 0; 425 426 free( cltr->sched.io.tscs ); 427 free( cltr->sched.caches ); 406 428 } 407 429 -
libcfa/src/concurrency/kernel/startup.cfa
rc42b8a1 r884f3f67 515 515 this.rdq.its = 0; 516 516 this.rdq.itr = 0; 517 this.rdq.id = MAX;517 this.rdq.id = 0; 518 518 this.rdq.target = MAX; 519 519 this.rdq.last = MAX; … … 605 605 this.name = name; 606 606 this.preemption_rate = preemption_rate; 607 ready_queue{}; 607 this.sched.readyQ.data = 0p; 608 this.sched.readyQ.tscs = 0p; 609 this.sched.readyQ.count = 0; 610 this.sched.io.tscs = 0p; 611 this.sched.caches = 0p; 608 612 609 613 #if !defined(__CFA_NO_STATISTICS__) … … 644 648 // Unlock the RWlock 645 649 ready_mutate_unlock( last_size ); 650 651 ready_queue_close( &this ); 652 /* paranoid */ verify( this.sched.readyQ.data == 0p ); 653 /* paranoid */ verify( this.sched.readyQ.tscs == 0p ); 654 /* paranoid */ verify( this.sched.readyQ.count == 0 ); 655 /* paranoid */ verify( this.sched.io.tscs == 0p ); 656 /* paranoid */ verify( this.sched.caches == 0p ); 657 646 658 enable_interrupts( false ); // Don't poll, could be in main cluster 659 647 660 648 661 #if !defined(__CFA_NO_STATISTICS__) -
libcfa/src/concurrency/kernel_private.hfa
rc42b8a1 r884f3f67 366 366 367 367 //----------------------------------------------------------------------- 368 // Decrease the width of the ready queue (number of lanes) by 4 369 void ready_queue_close(struct cluster * cltr); 370 371 //----------------------------------------------------------------------- 368 372 // Calc moving average based on existing average, before and current time. 369 373 static inline unsigned long long moving_average(unsigned long long currtsc, unsigned long long instsc, unsigned long long old_avg) { … … 380 384 } 381 385 382 static const unsigned __readyq_shard_factor = 2; 386 //----------------------------------------------------------------------- 387 // Calc age a timestamp should be before needing help. 388 forall(Data_t * | { unsigned long long ts(Data_t & this); }) 389 static inline unsigned long long calc_cutoff( 390 const unsigned long long ctsc, 391 const processor * proc, 392 size_t count, 393 Data_t * data, 394 __timestamp_t * tscs, 395 const unsigned shard_factor 396 ) { 397 unsigned start = proc->rdq.id; 398 unsigned long long max = 0; 399 for(i; shard_factor) { 400 unsigned long long ptsc = ts(data[start + i]); 401 if(ptsc != -1ull) { 402 /* paranoid */ verify( start + i < count ); 403 unsigned long long tsc = moving_average(ctsc, ptsc, tscs[start + i].ma); 404 if(tsc > max) max = tsc; 405 } 406 } 407 return (max + 2 * max) / 2; 408 } 409 410 static struct { 411 const unsigned readyq; 412 } __shard_factor = { 2 }; 383 413 384 414 // Local Variables: // -
libcfa/src/concurrency/ready_queue.cfa
rc42b8a1 r884f3f67 48 48 // Cforall Ready Queue used for scheduling 49 49 //======================================================================= 50 void ?{}(__ready_queue_t & this) with (this) { 51 lanes.data = 0p; 52 lanes.tscs = 0p; 53 lanes.caches = 0p; 54 lanes.help = 0p; 55 lanes.count = 0; 56 } 57 58 void ^?{}(__ready_queue_t & this) with (this) { 59 free(lanes.data); 60 free(lanes.tscs); 61 free(lanes.caches); 62 free(lanes.help); 63 } 64 65 //----------------------------------------------------------------------- 66 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) { 50 // void ?{}(__ready_queue_t & this) with (this) { 51 // lanes.data = 0p; 52 // lanes.tscs = 0p; 53 // lanes.caches = 0p; 54 // lanes.count = 0; 55 // } 56 57 // void ^?{}(__ready_queue_t & this) with (this) { 58 // free(lanes.data); 59 // free(lanes.tscs); 60 // free(lanes.caches); 61 // } 62 63 //----------------------------------------------------------------------- 64 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->sched) { 67 65 processor * const proc = kernelTLS().this_processor; 68 66 const bool external = (!proc) || (cltr != proc->cltr); 69 67 const bool remote = hint == UNPARK_REMOTE; 68 const size_t lanes_count = readyQ.count; 69 70 /* paranoid */ verify( __shard_factor.readyq > 0 ); 71 /* paranoid */ verify( lanes_count > 0 ); 70 72 71 73 unsigned i; … … 73 75 // Figure out where thread was last time and make sure it's valid 74 76 /* paranoid */ verify(thrd->preferred >= 0); 75 if(thrd->preferred * __readyq_shard_factor < lanes.count) { 76 /* paranoid */ verify(thrd->preferred * __readyq_shard_factor < lanes.count); 77 unsigned start = thrd->preferred * __readyq_shard_factor; 77 unsigned start = thrd->preferred * __shard_factor.readyq; 78 if(start < lanes_count) { 78 79 do { 79 80 unsigned r = __tls_rand(); 80 i = start + (r % __ readyq_shard_factor);81 /* paranoid */ verify( i < lanes .count );81 i = start + (r % __shard_factor.readyq); 82 /* paranoid */ verify( i < lanes_count ); 82 83 // If we can't lock it retry 83 } while( !__atomic_try_acquire( & lanes.data[i].lock ) );84 } while( !__atomic_try_acquire( &readyQ.data[i].lock ) ); 84 85 } else { 85 86 do { 86 i = __tls_rand() % lanes .count;87 } while( !__atomic_try_acquire( & lanes.data[i].lock ) );87 i = __tls_rand() % lanes_count; 88 } while( !__atomic_try_acquire( &readyQ.data[i].lock ) ); 88 89 } 89 90 } else { 90 91 do { 91 92 unsigned r = proc->rdq.its++; 92 i = proc->rdq.id + (r % __ readyq_shard_factor);93 /* paranoid */ verify( i < lanes .count );93 i = proc->rdq.id + (r % __shard_factor.readyq); 94 /* paranoid */ verify( i < lanes_count ); 94 95 // If we can't lock it retry 95 } while( !__atomic_try_acquire( & lanes.data[i].lock ) );96 } while( !__atomic_try_acquire( &readyQ.data[i].lock ) ); 96 97 } 97 98 98 99 // Actually push it 99 push( lanes.data[i], thrd);100 push(readyQ.data[i], thrd); 100 101 101 102 // Unlock and return 102 __atomic_unlock( & lanes.data[i].lock );103 __atomic_unlock( &readyQ.data[i].lock ); 103 104 104 105 #if !defined(__CFA_NO_STATISTICS__) … … 108 109 } 109 110 110 static inline unsigned long long calc_cutoff(const unsigned long long ctsc, const processor * proc, __ready_queue_t & rdq) { 111 unsigned start = proc->rdq.id; 112 unsigned long long max = 0; 113 for(i; __readyq_shard_factor) { 114 unsigned long long ptsc = ts(rdq.lanes.data[start + i]); 115 if(ptsc != -1ull) { 116 /* paranoid */ verify( start + i < rdq.lanes.count ); 117 unsigned long long tsc = moving_average(ctsc, ptsc, rdq.lanes.tscs[start + i].ma); 118 if(tsc > max) max = tsc; 119 } 120 } 121 return (max + 2 * max) / 2; 122 } 123 124 __attribute__((hot)) struct thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) { 125 /* paranoid */ verify( lanes.count > 0 ); 111 __attribute__((hot)) struct thread$ * pop_fast(struct cluster * cltr) with (cltr->sched) { 112 const size_t lanes_count = readyQ.count; 113 114 /* paranoid */ verify( __shard_factor.readyq > 0 ); 115 /* paranoid */ verify( lanes_count > 0 ); 126 116 /* paranoid */ verify( kernelTLS().this_processor ); 127 /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes .count );117 /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes_count ); 128 118 129 119 processor * const proc = kernelTLS().this_processor; 130 120 unsigned this = proc->rdq.id; 131 /* paranoid */ verify( this < lanes .count );121 /* paranoid */ verify( this < lanes_count ); 132 122 __cfadbg_print_safe(ready_queue, "Kernel : pop from %u\n", this); 133 123 … … 140 130 // Super important: don't write the same value over and over again 141 131 // We want to maximise our chances that his particular values stays in cache 142 if( lanes.caches[this / __readyq_shard_factor].id != this_cache)143 __atomic_store_n(& lanes.caches[this / __readyq_shard_factor].id, this_cache, __ATOMIC_RELAXED);132 if(caches[this / __shard_factor.readyq].id != this_cache) 133 __atomic_store_n(&caches[this / __shard_factor.readyq].id, this_cache, __ATOMIC_RELAXED); 144 134 145 135 const unsigned long long ctsc = rdtscl(); … … 148 138 uint64_t chaos = __tls_rand(); 149 139 unsigned ext = chaos & 0xff; 150 unsigned other = (chaos >> 8) % (lanes .count);151 152 if(ext < 3 || __atomic_load_n(& lanes.caches[other / __readyq_shard_factor].id, __ATOMIC_RELAXED) == this_cache) {140 unsigned other = (chaos >> 8) % (lanes_count); 141 142 if(ext < 3 || __atomic_load_n(&caches[other / __shard_factor.readyq].id, __ATOMIC_RELAXED) == this_cache) { 153 143 proc->rdq.target = other; 154 144 } … … 156 146 else { 157 147 const unsigned target = proc->rdq.target; 158 __cfadbg_print_safe(ready_queue, "Kernel : %u considering helping %u, tcsc %llu\n", this, target, lanes.tscs[target].tv);159 /* paranoid */ verify( lanes.tscs[target].tv != MAX );160 if(target < lanes .count) {161 const unsigned long long cutoff = calc_cutoff(ctsc, proc, cltr->ready_queue);162 const unsigned long long age = moving_average(ctsc, lanes.tscs[target].tv, lanes.tscs[target].ma);148 __cfadbg_print_safe(ready_queue, "Kernel : %u considering helping %u, tcsc %llu\n", this, target, readyQ.tscs[target].tv); 149 /* paranoid */ verify( readyQ.tscs[target].tv != MAX ); 150 if(target < lanes_count) { 151 const unsigned long long cutoff = calc_cutoff(ctsc, proc, lanes_count, cltr->sched.readyQ.data, cltr->sched.readyQ.tscs, __shard_factor.readyq); 152 const unsigned long long age = moving_average(ctsc, readyQ.tscs[target].tv, readyQ.tscs[target].ma); 163 153 __cfadbg_print_safe(ready_queue, "Kernel : Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, this, age, cutoff, age > cutoff ? "yes" : "no"); 164 154 if(age > cutoff) { … … 170 160 } 171 161 172 for(__ readyq_shard_factor) {173 unsigned i = this + (proc->rdq.itr++ % __ readyq_shard_factor);162 for(__shard_factor.readyq) { 163 unsigned i = this + (proc->rdq.itr++ % __shard_factor.readyq); 174 164 if(thread$ * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t; 175 165 } … … 179 169 180 170 } 181 __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) with (cltr->ready_queue){182 unsigned i = __tls_rand() % lanes.count;171 __attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) { 172 unsigned i = __tls_rand() % (cltr->sched.readyQ.count); 183 173 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal)); 184 174 } … … 195 185 //----------------------------------------------------------------------- 196 186 // try to pop from a lane given by index w 197 static inline struct thread$ * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) { 198 /* paranoid */ verify( w < lanes.count ); 187 static inline struct thread$ * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->sched) { 188 const size_t lanes_count = readyQ.count; 189 /* paranoid */ verify( w < lanes_count ); 199 190 __STATS( stats.attempt++; ) 200 191 201 192 // Get relevant elements locally 202 __intrusive_lane_t & lane = lanes.data[w];193 __intrusive_lane_t & lane = readyQ.data[w]; 203 194 204 195 // If list looks empty retry … … 236 227 if (tsv != MAX) { 237 228 unsigned long long now = rdtscl(); 238 unsigned long long pma = __atomic_load_n(& lanes.tscs[w].ma, __ATOMIC_RELAXED);239 __atomic_store_n(& lanes.tscs[w].tv, tsv, __ATOMIC_RELAXED);240 __atomic_store_n(& lanes.tscs[w].ma, moving_average(now, tsc_before, pma), __ATOMIC_RELAXED);241 } 242 243 thrd->preferred = w / __ readyq_shard_factor;229 unsigned long long pma = __atomic_load_n(&readyQ.tscs[w].ma, __ATOMIC_RELAXED); 230 __atomic_store_n(&readyQ.tscs[w].tv, tsv, __ATOMIC_RELAXED); 231 __atomic_store_n(&readyQ.tscs[w].ma, moving_average(now, tsc_before, pma), __ATOMIC_RELAXED); 232 } 233 234 thrd->preferred = w / __shard_factor.readyq; 244 235 245 236 // return the popped thread … … 250 241 // try to pop from any lanes making sure you don't miss any threads push 251 242 // before the start of the function 252 static inline struct thread$ * search(struct cluster * cltr) with (cltr->ready_queue) { 253 /* paranoid */ verify( lanes.count > 0 ); 254 unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED ); 243 static inline struct thread$ * search(struct cluster * cltr) { 244 const size_t lanes_count = cltr->sched.readyQ.count; 245 /* paranoid */ verify( lanes_count > 0 ); 246 unsigned count = __atomic_load_n( &lanes_count, __ATOMIC_RELAXED ); 255 247 unsigned offset = __tls_rand(); 256 248 for(i; count) { … … 279 271 //----------------------------------------------------------------------- 280 272 // Given 2 indexes, pick the list with the oldest push an try to pop from it 281 static inline struct thread$ * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)) with (cltr-> ready_queue) {273 static inline struct thread$ * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->sched) { 282 274 // Pick the bet list 283 275 int w = i; 284 if( __builtin_expect(!is_empty( lanes.data[j]), true) ) {285 w = (ts( lanes.data[i]) < ts(lanes.data[j])) ? i : j;276 if( __builtin_expect(!is_empty(readyQ.data[j]), true) ) { 277 w = (ts(readyQ.data[i]) < ts(readyQ.data[j])) ? i : j; 286 278 } 287 279
Note: See TracChangeset
for help on using the changeset viewer.