- Timestamp:
- Dec 10, 2019, 4:23:09 PM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- f80f840
- Parents:
- 0f9ceacb
- Location:
- libcfa/src
- Files:
-
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/bits/defs.hfa
r0f9ceacb rb798713 54 54 } 55 55 56 #define __CFA_NO_BIT_TEST_AND_SET__56 // #define __CFA_NO_BIT_TEST_AND_SET__ 57 57 58 58 static inline bool bts(volatile unsigned long long int * target, unsigned long long int bit ) { … … 65 65 asm volatile( 66 66 "LOCK btsq %[bit], %[target]\n\t" 67 : "=@ccc" (result)67 : "=@ccc" (result) 68 68 : [target] "m" (*target), [bit] "r" (bit) 69 69 ); -
libcfa/src/concurrency/invoke.h
r0f9ceacb rb798713 158 158 }; 159 159 160 // Link lists fields 161 // instrusive link field for threads 162 struct __thread_desc_link { 163 struct thread_desc * next; 164 struct thread_desc * prev; 165 unsigned long long ts; 166 }; 167 160 168 struct thread_desc { 161 169 // Core threading fields … … 188 196 // Link lists fields 189 197 // instrusive link field for threads 190 struct thread_desc * next; 191 struct thread_desc * prev; 192 unsigned long long ts; 198 struct __thread_desc_link link; 193 199 194 200 struct { … … 201 207 extern "Cforall" { 202 208 static inline thread_desc *& get_next( thread_desc & this ) { 203 return this. next;209 return this.link.next; 204 210 } 205 211 -
libcfa/src/concurrency/kernel.cfa
r0f9ceacb rb798713 185 185 self_mon.recursion = 1; 186 186 self_mon_p = &self_mon; 187 next = NULL; 187 link.next = 0p; 188 link.prev = 0p; 188 189 189 190 node.next = NULL; … … 271 272 272 273 // register the processor unless it's the main thread which is handled in the boot sequence 273 if(this != mainProcessor) 274 if(this != mainProcessor) { 274 275 this->id = doregister(this->cltr, this); 276 ready_queue_grow( this->cltr ); 277 } 278 275 279 276 280 { … … 310 314 V( this->terminated ); 311 315 316 312 317 // unregister the processor unless it's the main thread which is handled in the boot sequence 313 if(this != mainProcessor) 318 if(this != mainProcessor) { 319 ready_queue_shrink( this->cltr ); 314 320 unregister(this->cltr, this); 321 } 315 322 316 323 __cfaabi_dbg_print_safe("Kernel : core %p terminated\n", this); 324 325 stats_tls_tally(this->cltr); 317 326 } 318 327 … … 506 515 verify( ! kernelTLS.preemption_state.enabled ); 507 516 508 verifyf( thrd->next == NULL, "Expected null got %p", thrd->next ); 517 verifyf( thrd->link.next == NULL, "Expected null got %p", thrd->link.next ); 518 519 520 ready_schedule_lock(thrd->curr_cluster, kernelTLS.this_processor); 521 bool was_empty = push( thrd->curr_cluster, thrd ); 522 ready_schedule_unlock(thrd->curr_cluster, kernelTLS.this_processor); 509 523 510 524 with( *thrd->curr_cluster ) { 511 ready_schedule_lock(*thrd->curr_cluster, kernelTLS.this_processor);512 __atomic_acquire(&ready_queue.lock);513 thrd->ts = rdtscl();514 bool was_empty = push( ready_queue, thrd );515 __atomic_unlock(&ready_queue.lock);516 ready_schedule_unlock(*thrd->curr_cluster, kernelTLS.this_processor);517 518 525 if(was_empty) { 519 526 lock (proc_list_lock __cfaabi_dbg_ctx2); … … 526 533 wake_fast(idle); 527 534 } 528 529 535 } 530 536 … … 536 542 verify( ! kernelTLS.preemption_state.enabled ); 537 543 538 ready_schedule_lock(*this, kernelTLS.this_processor); 539 __atomic_acquire(&ready_queue.lock); 540 thread_desc * head; 541 __attribute__((unused)) bool _; 542 [head, _] = pop( ready_queue ); 543 __atomic_unlock(&ready_queue.lock); 544 ready_schedule_unlock(*this, kernelTLS.this_processor); 544 ready_schedule_lock(this, kernelTLS.this_processor); 545 thread_desc * head = pop( this ); 546 ready_schedule_unlock(this, kernelTLS.this_processor); 545 547 546 548 verify( ! kernelTLS.preemption_state.enabled ); … … 767 769 // Destroy the main processor and its context in reverse order of construction 768 770 // These were manually constructed so we need manually destroy them 769 ^(mainProcessor->runner){}; 771 void ^?{}(processor & this) with( this ) { 772 //don't join the main thread here, that wouldn't make any sense 773 __cfaabi_dbg_print_safe("Kernel : destroyed main processor context %p\n", &runner); 774 } 775 770 776 ^(*mainProcessor){}; 771 777 -
libcfa/src/concurrency/kernel.hfa
r0f9ceacb rb798713 190 190 // spin lock protecting the queue 191 191 volatile bool lock; 192 unsigned int last_id; 192 193 193 194 // anchor for the head and the tail of the queue 194 195 struct __sentinel_t { 195 struct thread_desc * next; 196 struct thread_desc * prev; 197 unsigned long long ts; 196 // Link lists fields 197 // instrusive link field for threads 198 // must be exactly as in thread_desc 199 __thread_desc_link link; 198 200 } before, after; 199 201 200 202 // Optional statistic counters 201 #if ndef __CFA_NO_SCHED_STATS__203 #if !defined(__CFA_NO_SCHED_STATS__) 202 204 struct __attribute__((aligned(64))) { 203 205 // difference between number of push and pops … … 214 216 void ^?{}(__intrusive_ready_queue_t & this); 215 217 218 typedef unsigned long long __cfa_readyQ_mask_t; 219 220 // enum { 221 // __cfa_ready_queue_mask_size = (64 - sizeof(size_t)) / sizeof(size_t), 222 // __cfa_max_ready_queues = __cfa_ready_queue_mask_size * 8 * sizeof(size_t) 223 // }; 224 225 #define __cfa_readyQ_mask_size ((64 - sizeof(size_t)) / sizeof(__cfa_readyQ_mask_t)) 226 #define __cfa_max_readyQs (__cfa_readyQ_mask_size * 8 * sizeof(__cfa_readyQ_mask_t)) 227 228 //TODO adjust cache size to ARCHITECTURE 229 struct __attribute__((aligned(128))) __ready_queue_t { 230 struct { 231 volatile size_t count; 232 volatile __cfa_readyQ_mask_t mask[ __cfa_readyQ_mask_size ]; 233 } empty; 234 235 struct __attribute__((aligned(64))) { 236 __intrusive_ready_queue_t * volatile data; 237 volatile size_t count; 238 } list; 239 240 #if !defined(__CFA_NO_STATISTICS__) 241 __attribute__((aligned(64))) struct { 242 struct { 243 struct { 244 volatile size_t attempt; 245 volatile size_t success; 246 } push; 247 struct { 248 volatile size_t maskrds; 249 volatile size_t attempt; 250 volatile size_t success; 251 } pop; 252 } pick; 253 struct { 254 volatile size_t value; 255 volatile size_t count; 256 } full; 257 } global_stats; 258 259 #endif 260 }; 261 262 void ?{}(__ready_queue_t & this); 263 void ^?{}(__ready_queue_t & this); 264 216 265 //----------------------------------------------------------------------------- 217 266 // Cluster … … 221 270 222 271 // Ready queue for threads 223 __ intrusive_ready_queue_t ready_queue;272 __ready_queue_t ready_queue; 224 273 225 274 // Name of the cluster -
libcfa/src/concurrency/kernel_private.hfa
r0f9ceacb rb798713 143 143 144 144 static inline bool __atomic_try_acquire(volatile bool * ll) { 145 return __atomic_exchange_n(ll, (bool)true, __ATOMIC_SEQ_CST);145 return !__atomic_exchange_n(ll, (bool)true, __ATOMIC_SEQ_CST); 146 146 } 147 147 … … 154 154 // Reader side : acquire when using the ready queue to schedule but not 155 155 // creating/destroying queues 156 static inline void ready_schedule_lock( struct cluster & cltr, struct processor * proc) with(cltr.ready_lock) {156 static inline void ready_schedule_lock( struct cluster * cltr, struct processor * proc) with(cltr->ready_lock) { 157 157 unsigned iproc = proc->id; 158 158 /*paranoid*/ verify(data[iproc].handle == proc); … … 173 173 } 174 174 175 static inline void ready_schedule_unlock( struct cluster & cltr, struct processor * proc) with(cltr.ready_lock) {175 static inline void ready_schedule_unlock( struct cluster * cltr, struct processor * proc) with(cltr->ready_lock) { 176 176 unsigned iproc = proc->id; 177 177 /*paranoid*/ verify(data[iproc].handle == proc); … … 188 188 void ready_mutate_unlock( struct cluster & cltr, uint_fast32_t ); 189 189 190 bool push(__intrusive_ready_queue_t & this, thread_desc * node); 191 [thread_desc *, bool] pop(__intrusive_ready_queue_t & this); 190 //======================================================================= 191 // Ready-Queue API 192 193 __attribute__((hot)) bool push(struct cluster * cltr, struct thread_desc * thrd); 194 __attribute__((hot)) thread_desc * pop(struct cluster * cltr); 195 void ready_queue_grow (struct cluster * cltr); 196 void ready_queue_shrink(struct cluster * cltr); 197 198 #if !defined(__CFA_NO_STATISTICS__) 199 void stats_tls_tally(struct cluster * cltr); 200 #else 201 static inline void stats_tls_tally(struct cluster * cltr) {} 202 #endif 192 203 193 204 // Local Variables: // -
libcfa/src/concurrency/monitor.cfa
r0f9ceacb rb798713 841 841 for( thread_desc ** thrd_it = &entry_queue.head; 842 842 *thrd_it; 843 thrd_it = &(*thrd_it)-> next843 thrd_it = &(*thrd_it)->link.next 844 844 ) { 845 845 // For each acceptable check if it matches -
libcfa/src/concurrency/ready_queue.cfa
r0f9ceacb rb798713 55 55 } 56 56 57 static inline unsigned rand_bit(unsigned rnum, size_t mask) { 58 verify(sizeof(mask) == 8); 59 unsigned bit = mask ? rnum % __builtin_popcountl(mask) : 0; 60 #if !defined(__BMI2__) 61 uint64_t v = mask; // Input value to find position with rank r. 62 unsigned int r = bit + 1;// Input: bit's desired rank [1-64]. 63 unsigned int s; // Output: Resulting position of bit with rank r [1-64] 64 uint64_t a, b, c, d; // Intermediate temporaries for bit count. 65 unsigned int t; // Bit count temporary. 66 67 // Do a normal parallel bit count for a 64-bit integer, 68 // but store all intermediate steps. 69 a = v - ((v >> 1) & ~0UL/3); 70 b = (a & ~0UL/5) + ((a >> 2) & ~0UL/5); 71 c = (b + (b >> 4)) & ~0UL/0x11; 72 d = (c + (c >> 8)) & ~0UL/0x101; 73 74 75 t = (d >> 32) + (d >> 48); 76 // Now do branchless select! 77 s = 64; 78 s -= ((t - r) & 256) >> 3; r -= (t & ((t - r) >> 8)); 79 t = (d >> (s - 16)) & 0xff; 80 s -= ((t - r) & 256) >> 4; r -= (t & ((t - r) >> 8)); 81 t = (c >> (s - 8)) & 0xf; 82 s -= ((t - r) & 256) >> 5; r -= (t & ((t - r) >> 8)); 83 t = (b >> (s - 4)) & 0x7; 84 s -= ((t - r) & 256) >> 6; r -= (t & ((t - r) >> 8)); 85 t = (a >> (s - 2)) & 0x3; 86 s -= ((t - r) & 256) >> 7; r -= (t & ((t - r) >> 8)); 87 t = (v >> (s - 1)) & 0x1; 88 s -= ((t - r) & 256) >> 8; 89 return s - 1; 90 #else 91 uint64_t picked = _pdep_u64(1ul << bit, mask); 92 return picked ? __builtin_ctzl(picked) : 0; 93 #endif 94 } 95 96 static inline __cfa_readyQ_mask_t readyQ_mask_full () { return (8 * sizeof(__cfa_readyQ_mask_t)) - 1; } 97 static inline __cfa_readyQ_mask_t readyQ_mask_shit_length() { return (8 * sizeof(__cfa_readyQ_mask_t)) - __builtin_clzl(readyQ_mask_full()); } 98 99 static inline [__cfa_readyQ_mask_t, __cfa_readyQ_mask_t] extract(__cfa_readyQ_mask_t idx) { 100 __cfa_readyQ_mask_t word = idx >> readyQ_mask_shit_length(); 101 __cfa_readyQ_mask_t bit = idx & readyQ_mask_full(); 102 return [bit, word]; 103 } 104 57 105 //======================================================================= 58 106 // Cluster wide reader-writer lock … … 170 218 // Intrusive Queue used by ready queue 171 219 //======================================================================= 172 static const size_t fields_offset = offsetof( thread_desc, next );173 174 220 // Get the head pointer (one before the first element) from the anchor 175 221 static inline thread_desc * head(const __intrusive_ready_queue_t & this) { 176 222 thread_desc * rhead = (thread_desc *)( 177 (uintptr_t)( &this.before ) - fields_offset223 (uintptr_t)( &this.before ) - offsetof( thread_desc, link ) 178 224 ); 179 225 /* paranoid */ verify(rhead); … … 184 230 static inline thread_desc * tail(const __intrusive_ready_queue_t & this) { 185 231 thread_desc * rtail = (thread_desc *)( 186 (uintptr_t)( &this.after ) - fields_offset232 (uintptr_t)( &this.after ) - offsetof( thread_desc, link ) 187 233 ); 188 234 /* paranoid */ verify(rtail); … … 192 238 // Ctor 193 239 void ?{}( __intrusive_ready_queue_t & this ) { 194 this.before.prev = 0p; 195 this.before.next = tail(this); 196 197 this.after .prev = head(this); 198 this.after .next = 0p; 240 this.lock = false; 241 this.last_id = -1u; 242 243 this.before.link.prev = 0p; 244 this.before.link.next = tail(this); 245 this.before.link.ts = 0; 246 247 this.after .link.prev = head(this); 248 this.after .link.next = 0p; 249 this.after .link.ts = 0; 250 251 #if !defined(__CFA_NO_SCHED_STATS__) 252 this.stat.diff = 0; 253 this.stat.push = 0; 254 this.stat.pop = 0; 255 #endif 199 256 200 257 // We add a boat-load of assertions here because the anchor code is very fragile 201 /* paranoid */ verify(((uintptr_t)( head(this) ) + fields_offset) == (uintptr_t)(&this.before));202 /* paranoid */ verify(((uintptr_t)( tail(this) ) + fields_offset) == (uintptr_t)(&this.after ));203 /* paranoid */ verify(head(this)-> prev == 0p );204 /* paranoid */ verify(head(this)-> next == tail(this) );205 /* paranoid */ verify(tail(this)-> next == 0p );206 /* paranoid */ verify(tail(this)-> prev == head(this) );207 /* paranoid */ verify(&head(this)-> prev == &this.before.prev );208 /* paranoid */ verify(&head(this)-> next == &this.before.next );209 /* paranoid */ verify(&tail(this)-> prev == &this.after.prev );210 /* paranoid */ verify(&tail(this)-> next == &this.after.next );258 /* paranoid */ verify(((uintptr_t)( head(this) ) + offsetof( thread_desc, link )) == (uintptr_t)(&this.before)); 259 /* paranoid */ verify(((uintptr_t)( tail(this) ) + offsetof( thread_desc, link )) == (uintptr_t)(&this.after )); 260 /* paranoid */ verify(head(this)->link.prev == 0p ); 261 /* paranoid */ verify(head(this)->link.next == tail(this) ); 262 /* paranoid */ verify(tail(this)->link.next == 0p ); 263 /* paranoid */ verify(tail(this)->link.prev == head(this) ); 264 /* paranoid */ verify(&head(this)->link.prev == &this.before.link.prev ); 265 /* paranoid */ verify(&head(this)->link.next == &this.before.link.next ); 266 /* paranoid */ verify(&tail(this)->link.prev == &this.after .link.prev ); 267 /* paranoid */ verify(&tail(this)->link.next == &this.after .link.next ); 211 268 /* paranoid */ verify(sizeof(__intrusive_ready_queue_t) == 128); 212 269 /* paranoid */ verify(sizeof(this) == 128); … … 214 271 /* paranoid */ verify(__alignof__(this) == 128); 215 272 /* paranoid */ verifyf(((intptr_t)(&this) % 128) == 0, "Expected address to be aligned %p %% 128 == %zd", &this, ((intptr_t)(&this) % 128)); 273 274 /* paranoid */ verifyf(readyQ_mask_shit_length() == 6 , "%zu", readyQ_mask_shit_length()); 275 /* paranoid */ verifyf(readyQ_mask_full() == 63, "%zu", readyQ_mask_full()); 216 276 } 217 277 … … 219 279 void ^?{}( __intrusive_ready_queue_t & this ) { 220 280 // Make sure the list is empty 221 /* paranoid */ verify(head(this)-> prev == 0p );222 /* paranoid */ verify(head(this)-> next == tail(this) );223 /* paranoid */ verify(tail(this)-> next == 0p );224 /* paranoid */ verify(tail(this)-> prev == head(this) );281 /* paranoid */ verify(head(this)->link.prev == 0p ); 282 /* paranoid */ verify(head(this)->link.next == tail(this) ); 283 /* paranoid */ verify(tail(this)->link.next == 0p ); 284 /* paranoid */ verify(tail(this)->link.prev == head(this) ); 225 285 } 226 286 … … 229 289 bool push(__intrusive_ready_queue_t & this, thread_desc * node) { 230 290 verify(this.lock); 231 verify(node->ts != 0); 232 verify(node->next == 0p); 233 verify(node->prev == 0p); 234 291 verify(node->link.ts != 0); 292 verify(node->link.next == 0p); 293 verify(node->link.prev == 0p); 294 295 if(this.before.link.ts == 0l) { 296 verify(tail(this)->link.next == 0p); 297 verify(tail(this)->link.prev == head(this)); 298 verify(head(this)->link.next == tail(this)); 299 verify(head(this)->link.prev == 0p); 300 } 235 301 236 302 // Get the relevant nodes locally 237 303 thread_desc * tail = tail(this); 238 thread_desc * prev = tail-> prev;304 thread_desc * prev = tail->link.prev; 239 305 240 306 // Do the push 241 node-> next = tail;242 node-> prev = prev;243 prev-> next = node;244 tail-> prev = node;307 node->link.next = tail; 308 node->link.prev = prev; 309 prev->link.next = node; 310 tail->link.prev = node; 245 311 246 312 // Update stats … … 250 316 #endif 251 317 318 verify(node->link.next == tail(this)); 319 252 320 // Check if the queue used to be empty 253 if(this.before. ts == 0l) {254 this.before. ts = node->ts;255 verify(node-> prev == head(this));321 if(this.before.link.ts == 0l) { 322 this.before.link.ts = node->link.ts; 323 verify(node->link.prev == head(this)); 256 324 return true; 257 325 } … … 261 329 [thread_desc *, bool] pop(__intrusive_ready_queue_t & this) { 262 330 verify(this.lock); 331 verify(this.before.link.ts != 0ul); 263 332 thread_desc * head = head(this); 264 333 thread_desc * tail = tail(this); 265 334 266 thread_desc * node = head->next; 267 thread_desc * next = node->next; 268 if(node == tail) return [0p, false]; 335 thread_desc * node = head->link.next; 336 thread_desc * next = node->link.next; 337 if(node == tail) { 338 verify(false); 339 verify(this.before.link.ts == 0ul); 340 verify(tail(this)->link.next == 0p); 341 verify(tail(this)->link.prev == head(this)); 342 verify(head(this)->link.next == tail(this)); 343 verify(head(this)->link.prev == 0p); 344 return [0p, false]; 345 } 269 346 270 347 /* paranoid */ verify(node); 271 348 272 head-> next = next;273 next-> prev = head;349 head->link.next = next; 350 next->link.prev = head; 274 351 275 352 #ifndef __CFA_NO_SCHED_STATS__ … … 279 356 280 357 if(next == tail) { 281 this.before.ts = 0ul; 282 node->[next, prev] = 0p; 358 this.before.link.ts = 0ul; 359 verify(tail(this)->link.next == 0p); 360 verify(tail(this)->link.prev == head(this)); 361 verify(head(this)->link.next == tail(this)); 362 verify(head(this)->link.prev == 0p); 363 node->link.[next, prev] = 0p; 283 364 return [node, true]; 284 365 } 285 366 else { 286 verify(next-> ts != 0);287 this.before. ts = next->ts;288 verify(this.before. ts != 0);289 node-> [next, prev] = 0p;367 verify(next->link.ts != 0); 368 this.before.link.ts = next->link.ts; 369 verify(this.before.link.ts != 0); 370 node->link.[next, prev] = 0p; 290 371 return [node, false]; 291 372 } … … 293 374 294 375 static inline unsigned long long ts(__intrusive_ready_queue_t & this) { 295 return this.before.ts; 296 } 376 return this.before.link.ts; 377 } 378 379 //======================================================================= 380 // Cforall Reqdy Queue used by ready queue 381 //======================================================================= 382 383 static __attribute__((aligned(128))) thread_local struct { 384 struct { 385 struct { 386 size_t attempt; 387 size_t success; 388 } push; 389 struct { 390 size_t maskrds; 391 size_t attempt; 392 size_t success; 393 } pop; 394 } pick; 395 struct { 396 size_t value; 397 size_t count; 398 } full; 399 } tls = { 400 /* pick */{ 401 /* push */{ 0, 0 }, 402 /* pop */{ 0, 0, 0 }, 403 }, 404 /* full */{ 0, 0 } 405 }; 406 407 //----------------------------------------------------------------------- 408 409 void ?{}(__ready_queue_t & this) with (this) { 410 empty.count = 0; 411 for( i ; __cfa_readyQ_mask_size ) { 412 empty.mask[i] = 0; 413 } 414 415 list.data = alloc(4); 416 for( i; 4 ) { 417 (list.data[i]){}; 418 } 419 list.count = 4; 420 421 #if !defined(__CFA_NO_STATISTICS__) 422 global_stats.pick.push.attempt = 0; 423 global_stats.pick.push.success = 0; 424 global_stats.pick.pop .maskrds = 0; 425 global_stats.pick.pop .attempt = 0; 426 global_stats.pick.pop .success = 0; 427 428 global_stats.full.value = 0; 429 global_stats.full.count = 0; 430 #endif 431 } 432 433 void ^?{}(__ready_queue_t & this) with (this) { 434 verify( 4 == list .count ); 435 verify( 0 == empty.count ); 436 437 for( i; 4 ) { 438 ^(list.data[i]){}; 439 } 440 free(list.data); 441 442 443 #if defined(__CFA_WITH_VERIFY__) 444 for( i ; __cfa_readyQ_mask_size ) { 445 assert( 0 == empty.mask[i] ); 446 } 447 #endif 448 } 449 450 //----------------------------------------------------------------------- 451 452 __attribute__((hot)) bool push(struct cluster * cltr, struct thread_desc * thrd) with (cltr->ready_queue) { 453 thrd->link.ts = rdtscl(); 454 455 while(true) { 456 // Pick a random list 457 unsigned i = tls_rand() % list.count; 458 459 #if !defined(__CFA_NO_STATISTICS__) 460 tls.pick.push.attempt++; 461 #endif 462 463 // If we can't lock it retry 464 if( !__atomic_try_acquire( &list.data[i].lock ) ) continue; 465 verify(list.data[i].last_id == -1u); 466 list.data[i].last_id = kernelTLS.this_processor->id; 467 468 __attribute__((unused)) size_t num = __atomic_load_n( &empty.count, __ATOMIC_RELAXED ); 469 bool first = false; 470 471 verify( list.data[i].last_id == kernelTLS.this_processor->id ); 472 verify( list.data[i].lock ); 473 // Actually push it 474 if(push(list.data[i], thrd)) { 475 size_t ret = __atomic_fetch_add( &empty.count, 1z, __ATOMIC_SEQ_CST); 476 first = (ret == 0); 477 478 __cfa_readyQ_mask_t word; 479 __cfa_readyQ_mask_t bit; 480 [bit, word] = extract(i); 481 verifyf((empty.mask[word] & (1ull << bit)) == 0, "Before set %llu:%llu (%u), %llx & %llx", word, bit, i, empty.mask[word], (1ull << bit)); 482 __attribute__((unused)) bool ret = bts(&empty.mask[word], bit); 483 verify(!(bool)ret); 484 verifyf((empty.mask[word] & (1ull << bit)) != 0, "After set %llu:%llu (%u), %llx & %llx", word, bit, i, empty.mask[word], (1ull << bit)); 485 } 486 verify(empty.count <= (int)list.count); 487 verify( list.data[i].last_id == kernelTLS.this_processor->id ); 488 verify( list.data[i].lock ); 489 490 // Unlock and return 491 list.data[i].last_id = -1u; 492 __atomic_unlock( &list.data[i].lock ); 493 494 #if !defined(__CFA_NO_STATISTICS__) 495 tls.pick.push.success++; 496 tls.full.value += num; 497 tls.full.count += 1; 498 #endif 499 return first; 500 } 501 } 502 503 //----------------------------------------------------------------------- 504 505 static struct thread_desc * try_pop(struct cluster * cltr, unsigned i, unsigned j) with (cltr->ready_queue) { 506 #if !defined(__CFA_NO_STATISTICS__) 507 tls.pick.pop.attempt++; 508 #endif 509 510 // Pick the bet list 511 int w = i; 512 if( __builtin_expect(ts(list.data[j]) != 0, true) ) { 513 w = (ts(list.data[i]) < ts(list.data[j])) ? i : j; 514 } 515 516 __intrusive_ready_queue_t & list = list.data[w]; 517 // If list looks empty retry 518 if( ts(list) == 0 ) return 0p; 519 520 // If we can't get the lock retry 521 if( !__atomic_try_acquire(&list.lock) ) return 0p; 522 verify(list.last_id == -1u); 523 list.last_id = kernelTLS.this_processor->id; 524 525 verify(list.last_id == kernelTLS.this_processor->id); 526 527 __attribute__((unused)) int num = __atomic_load_n( &empty.count, __ATOMIC_RELAXED ); 528 529 530 // If list is empty, unlock and retry 531 if( ts(list) == 0 ) { 532 list.last_id = -1u; 533 __atomic_unlock(&list.lock); 534 return 0p; 535 } 536 { 537 __cfa_readyQ_mask_t word; 538 __cfa_readyQ_mask_t bit; 539 [bit, word] = extract(w); 540 verify((empty.mask[word] & (1ull << bit)) != 0); 541 } 542 543 verify(list.last_id == kernelTLS.this_processor->id); 544 verify(list.lock); 545 546 // Actually pop the list 547 struct thread_desc * thrd; 548 bool emptied; 549 [thrd, emptied] = pop(list); 550 verify(thrd); 551 552 verify(list.last_id == kernelTLS.this_processor->id); 553 verify(list.lock); 554 555 if(emptied) { 556 __atomic_fetch_sub( &empty.count, 1z, __ATOMIC_SEQ_CST); 557 558 __cfa_readyQ_mask_t word; 559 __cfa_readyQ_mask_t bit; 560 [bit, word] = extract(w); 561 verify((empty.mask[word] & (1ull << bit)) != 0); 562 __attribute__((unused)) bool ret = btr(&empty.mask[word], bit); 563 verify(ret); 564 verify((empty.mask[word] & (1ull << bit)) == 0); 565 } 566 567 verify(list.lock); 568 569 // Unlock and return 570 list.last_id = -1u; 571 __atomic_unlock(&list.lock); 572 verify(empty.count >= 0); 573 574 #if !defined(__CFA_NO_STATISTICS__) 575 tls.pick.pop.success++; 576 tls.full.value += num; 577 tls.full.count += 1; 578 #endif 579 580 return thrd; 581 } 582 583 __attribute__((hot)) thread_desc * pop(struct cluster * cltr) with (cltr->ready_queue) { 584 verify( list.count > 0 ); 585 while( __atomic_load_n( &empty.count, __ATOMIC_RELAXED ) != 0) { 586 #if !defined(__CFA_READQ_NO_BITMASK__) 587 tls.pick.pop.maskrds++; 588 unsigned i, j; 589 { 590 #if !defined(__CFA_NO_SCHED_STATS__) 591 tls.pick.pop.maskrds++; 592 #endif 593 594 // Pick two lists at random 595 unsigned num = ((__atomic_load_n( &list.count, __ATOMIC_RELAXED ) - 1) >> 6) + 1; 596 597 unsigned ri = tls_rand(); 598 unsigned rj = tls_rand(); 599 600 unsigned wdxi = (ri >> 6u) % num; 601 unsigned wdxj = (rj >> 6u) % num; 602 603 size_t maski = __atomic_load_n( &empty.mask[wdxi], __ATOMIC_RELAXED ); 604 size_t maskj = __atomic_load_n( &empty.mask[wdxj], __ATOMIC_RELAXED ); 605 606 if(maski == 0 && maskj == 0) continue; 607 608 unsigned bi = rand_bit(ri, maski); 609 unsigned bj = rand_bit(rj, maskj); 610 611 verifyf(bi < 64, "%zu %u", maski, bi); 612 verifyf(bj < 64, "%zu %u", maskj, bj); 613 614 i = bi | (wdxi << 6); 615 j = bj | (wdxj << 6); 616 617 verifyf(i < list.count, "%u", wdxi << 6); 618 verifyf(j < list.count, "%u", wdxj << 6); 619 } 620 621 struct thread_desc * thrd = try_pop(cltr, i, j); 622 if(thrd) return thrd; 623 #else 624 // Pick two lists at random 625 int i = tls_rand() % __atomic_load_n( &list.count, __ATOMIC_RELAXED ); 626 int j = tls_rand() % __atomic_load_n( &list.count, __ATOMIC_RELAXED ); 627 628 struct thread_desc * thrd = try_pop(cltr, i, j); 629 if(thrd) return thrd; 630 #endif 631 } 632 633 return 0p; 634 } 635 636 //----------------------------------------------------------------------- 637 638 static void check( __ready_queue_t & q ) with (q) { 639 #if defined(__CFA_WITH_VERIFY__) 640 { 641 int idx = 0; 642 for( w ; __cfa_readyQ_mask_size ) { 643 for( b ; 8 * sizeof(__cfa_readyQ_mask_t) ) { 644 bool is_empty = idx < list.count ? (ts(list.data[idx]) == 0) : true; 645 bool should_be_empty = 0 == (empty.mask[w] & (1z << b)); 646 assertf(should_be_empty == is_empty, "Inconsistent list %d, mask expect : %d, actual is got %d", idx, should_be_empty, (bool)is_empty); 647 assert(__cfa_max_readyQs > idx); 648 idx++; 649 } 650 } 651 } 652 653 { 654 for( idx ; list.count ) { 655 __intrusive_ready_queue_t & sl = list.data[idx]; 656 assert(!list.data[idx].lock); 657 658 assert(head(sl)->link.prev == 0p ); 659 assert(head(sl)->link.next->link.prev == head(sl) ); 660 assert(tail(sl)->link.next == 0p ); 661 assert(tail(sl)->link.prev->link.next == tail(sl) ); 662 663 if(sl.before.link.ts == 0l) { 664 assert(tail(sl)->link.next == 0p); 665 assert(tail(sl)->link.prev == head(sl)); 666 assert(head(sl)->link.next == tail(sl)); 667 assert(head(sl)->link.prev == 0p); 668 } 669 } 670 } 671 #endif 672 } 673 674 // Call this function of the intrusive list was moved using memcpy 675 // fixes the list so that the pointers back to anchors aren't left 676 // dangling 677 static inline void fix(__intrusive_ready_queue_t & ll) { 678 // if the list is not empty then follow he pointer 679 // and fix its reverse 680 if(ll.before.link.ts != 0l) { 681 head(ll)->link.next->link.prev = head(ll); 682 tail(ll)->link.prev->link.next = tail(ll); 683 } 684 // Otherwise just reset the list 685 else { 686 tail(ll)->link.next = 0p; 687 tail(ll)->link.prev = head(ll); 688 head(ll)->link.next = tail(ll); 689 head(ll)->link.prev = 0p; 690 } 691 } 692 693 void ready_queue_grow (struct cluster * cltr) { 694 uint_fast32_t last_size = ready_mutate_lock( *cltr ); 695 check( cltr->ready_queue ); 696 697 with( cltr->ready_queue ) { 698 size_t ncount = list.count; 699 700 // Check that we have some space left 701 if(ncount + 4 >= __cfa_max_readyQs) abort("Program attempted to create more than maximum number of Ready Queues (%zu)", __cfa_max_readyQs); 702 703 ncount += 4; 704 705 // Allocate new array 706 list.data = alloc(list.data, ncount); 707 708 // Fix the moved data 709 for( idx; (size_t)list.count ) { 710 fix(list.data[idx]); 711 } 712 713 // Construct new data 714 for( idx; (size_t)list.count ~ ncount) { 715 (list.data[idx]){}; 716 } 717 718 // Update original 719 list.count = ncount; 720 // fields in empty don't need to change 721 } 722 723 // Make sure that everything is consistent 724 check( cltr->ready_queue ); 725 ready_mutate_unlock( *cltr, last_size ); 726 } 727 728 void ready_queue_shrink(struct cluster * cltr) { 729 uint_fast32_t last_size = ready_mutate_lock( *cltr ); 730 with( cltr->ready_queue ) { 731 size_t ocount = list.count; 732 // Check that we have some space left 733 if(ocount < 8) abort("Program attempted to destroy more Ready Queues than were created"); 734 735 list.count -= 4; 736 737 // redistribute old data 738 verify(ocount > list.count); 739 for( idx; (size_t)list.count ~ ocount) { 740 // This is not strictly needed but makes checking invariants much easier 741 bool locked = __atomic_try_acquire(&list.data[idx].lock); 742 verify(locked); 743 while(0 != ts(list.data[idx])) { 744 struct thread_desc * thrd; 745 __attribute__((unused)) bool _; 746 [thrd, _] = pop(list.data[idx]); 747 verify(thrd); 748 push(cltr, thrd); 749 } 750 751 __atomic_unlock(&list.data[idx].lock); 752 753 // TODO print the queue statistics here 754 755 ^(list.data[idx]){}; 756 } 757 758 // clear the now unused masks 759 { 760 __cfa_readyQ_mask_t fword, fbit, lword, lbit; 761 [fbit, fword] = extract(ocount); 762 [lbit, lword] = extract(list.count); 763 764 // For now assume that all queues where coverd by the same bitmask 765 // This is highly probable as long as grow and shrink use groups of 4 766 // exclusively 767 verify(fword == lword); 768 __cfa_readyQ_mask_t clears = ~0; 769 770 for( b ; fbit ~ lbit ) { 771 clears ^= 1 << b; 772 } 773 774 empty.mask[fword] &= clears; 775 } 776 777 // Allocate new array 778 list.data = alloc(list.data, list.count); 779 780 // Fix the moved data 781 for( idx; (size_t)list.count ) { 782 fix(list.data[idx]); 783 } 784 } 785 786 // Make sure that everything is consistent 787 check( cltr->ready_queue ); 788 ready_mutate_unlock( *cltr, last_size ); 789 } 790 791 //----------------------------------------------------------------------- 792 793 #if !defined(__CFA_NO_STATISTICS__) 794 void stats_tls_tally(struct cluster * cltr) with (cltr->ready_queue) { 795 __atomic_fetch_add( &global_stats.pick.push.attempt, tls.pick.push.attempt, __ATOMIC_SEQ_CST ); 796 __atomic_fetch_add( &global_stats.pick.push.success, tls.pick.push.success, __ATOMIC_SEQ_CST ); 797 __atomic_fetch_add( &global_stats.pick.pop .maskrds, tls.pick.pop .maskrds, __ATOMIC_SEQ_CST ); 798 __atomic_fetch_add( &global_stats.pick.pop .attempt, tls.pick.pop .attempt, __ATOMIC_SEQ_CST ); 799 __atomic_fetch_add( &global_stats.pick.pop .success, tls.pick.pop .success, __ATOMIC_SEQ_CST ); 800 801 __atomic_fetch_add( &global_stats.full.value, tls.full.value, __ATOMIC_SEQ_CST ); 802 __atomic_fetch_add( &global_stats.full.count, tls.full.count, __ATOMIC_SEQ_CST ); 803 } 804 #endif -
libcfa/src/concurrency/thread.cfa
r0f9ceacb rb798713 41 41 self_mon_p = &self_mon; 42 42 curr_cluster = &cl; 43 next = 0p;44 prev = 0p;43 link.next = 0p; 44 link.prev = 0p; 45 45 46 46 node.next = NULL;
Note: See TracChangeset
for help on using the changeset viewer.