Changeset c8a0210 for libcfa/src/concurrency
- Timestamp:
- Apr 16, 2021, 2:28:09 PM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum, stuck-waitfor-destruct
- Children:
- 665edf40
- Parents:
- 857a1c6 (diff), 5f6a172 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)links above to see all the changes relative to each parent. - Location:
- libcfa/src/concurrency
- Files:
-
- 15 edited
-
coroutine.cfa (modified) (2 diffs)
-
coroutine.hfa (modified) (2 diffs)
-
invoke.h (modified) (1 diff)
-
io/call.cfa.in (modified) (18 diffs)
-
kernel.cfa (modified) (12 diffs)
-
kernel.hfa (modified) (6 diffs)
-
kernel/startup.cfa (modified) (8 diffs)
-
kernel_private.hfa (modified) (5 diffs)
-
preemption.cfa (modified) (2 diffs)
-
ready_queue.cfa (modified) (18 diffs)
-
ready_subqueue.hfa (modified) (1 diff)
-
stats.cfa (modified) (3 diffs)
-
stats.hfa (modified) (4 diffs)
-
thread.cfa (modified) (5 diffs)
-
thread.hfa (modified) (3 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/coroutine.cfa
r857a1c6 rc8a0210 46 46 47 47 //----------------------------------------------------------------------------- 48 FORALL_DATA_INSTANCE(CoroutineCancelled, (coroutine_t &), (coroutine_t)) 49 50 forall(T &) 51 void mark_exception(CoroutineCancelled(T) *) {} 48 EHM_VIRTUAL_TABLE(SomeCoroutineCancelled, std_coroutine_cancelled); 52 49 53 50 forall(T &) … … 71 68 72 69 // TODO: Remove explitate vtable set once trac#186 is fixed. 73 CoroutineCancelled(T)except;74 except.virtual_table = & get_exception_vtable(&except);70 SomeCoroutineCancelled except; 71 except.virtual_table = &std_coroutine_cancelled; 75 72 except.the_coroutine = &cor; 76 73 except.the_exception = except; 77 throwResume except; 74 // Why does this need a cast? 75 throwResume (SomeCoroutineCancelled &)except; 78 76 79 77 except->virtual_table->free( except ); -
libcfa/src/concurrency/coroutine.hfa
r857a1c6 rc8a0210 22 22 //----------------------------------------------------------------------------- 23 23 // Exception thrown from resume when a coroutine stack is cancelled. 24 FORALL_DATA_EXCEPTION(CoroutineCancelled, (coroutine_t &), (coroutine_t)) ( 24 EHM_EXCEPTION(SomeCoroutineCancelled)( 25 void * the_coroutine; 26 exception_t * the_exception; 27 ); 28 29 EHM_EXTERN_VTABLE(SomeCoroutineCancelled, std_coroutine_cancelled); 30 31 EHM_FORALL_EXCEPTION(CoroutineCancelled, (coroutine_t &), (coroutine_t)) ( 25 32 coroutine_t * the_coroutine; 26 33 exception_t * the_exception; … … 37 44 // Anything that implements this trait can be resumed. 38 45 // Anything that is resumed is a coroutine. 39 trait is_coroutine(T & | IS_RESUMPTION_EXCEPTION( CoroutineCancelled, (T))) {46 trait is_coroutine(T & | IS_RESUMPTION_EXCEPTION(SomeCoroutineCancelled)) { 40 47 void main(T & this); 41 48 $coroutine * get_coroutine(T & this); -
libcfa/src/concurrency/invoke.h
r857a1c6 rc8a0210 148 148 struct $thread * prev; 149 149 volatile unsigned long long ts; 150 int preferred;151 150 }; 152 151 -
libcfa/src/concurrency/io/call.cfa.in
r857a1c6 rc8a0210 201 201 202 202 sqe->opcode = IORING_OP_{op}; 203 sqe->user_data = ( __u64)(uintptr_t)&future;203 sqe->user_data = (uintptr_t)&future; 204 204 sqe->flags = sflags; 205 205 sqe->ioprio = 0; … … 215 215 asm volatile("": : :"memory"); 216 216 217 verify( sqe->user_data == ( __u64)(uintptr_t)&future );217 verify( sqe->user_data == (uintptr_t)&future ); 218 218 cfa_io_submit( ctx, &idx, 1, 0 != (submit_flags & CFA_IO_LAZY) ); 219 219 #endif … … 238 238 'fd' : 'fd', 239 239 'off' : 'offset', 240 'addr': '( __u64)iov',240 'addr': '(uintptr_t)iov', 241 241 'len' : 'iovcnt', 242 242 }, define = 'CFA_HAVE_PREADV2'), … … 245 245 'fd' : 'fd', 246 246 'off' : 'offset', 247 'addr': '( __u64)iov',247 'addr': '(uintptr_t)iov', 248 248 'len' : 'iovcnt' 249 249 }, define = 'CFA_HAVE_PWRITEV2'), … … 257 257 'addr': 'fd', 258 258 'len': 'op', 259 'off': '( __u64)event'259 'off': '(uintptr_t)event' 260 260 }), 261 261 # CFA_HAVE_IORING_OP_SYNC_FILE_RANGE … … 269 269 Call('SENDMSG', 'ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags)', { 270 270 'fd': 'sockfd', 271 'addr': '( __u64)(struct msghdr *)msg',271 'addr': '(uintptr_t)(struct msghdr *)msg', 272 272 'len': '1', 273 273 'msg_flags': 'flags' … … 276 276 Call('RECVMSG', 'ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags)', { 277 277 'fd': 'sockfd', 278 'addr': '( __u64)(struct msghdr *)msg',278 'addr': '(uintptr_t)(struct msghdr *)msg', 279 279 'len': '1', 280 280 'msg_flags': 'flags' … … 283 283 Call('SEND', 'ssize_t send(int sockfd, const void *buf, size_t len, int flags)', { 284 284 'fd': 'sockfd', 285 'addr': '( __u64)buf',285 'addr': '(uintptr_t)buf', 286 286 'len': 'len', 287 287 'msg_flags': 'flags' … … 290 290 Call('RECV', 'ssize_t recv(int sockfd, void *buf, size_t len, int flags)', { 291 291 'fd': 'sockfd', 292 'addr': '( __u64)buf',292 'addr': '(uintptr_t)buf', 293 293 'len': 'len', 294 294 'msg_flags': 'flags' … … 297 297 Call('ACCEPT', 'int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags)', { 298 298 'fd': 'sockfd', 299 'addr': '( __u64)addr',300 'addr2': '( __u64)addrlen',299 'addr': '(uintptr_t)addr', 300 'addr2': '(uintptr_t)addrlen', 301 301 'accept_flags': 'flags' 302 302 }), … … 304 304 Call('CONNECT', 'int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen)', { 305 305 'fd': 'sockfd', 306 'addr': '( __u64)addr',306 'addr': '(uintptr_t)addr', 307 307 'off': 'addrlen' 308 308 }), … … 310 310 Call('FALLOCATE', 'int fallocate(int fd, int mode, off_t offset, off_t len)', { 311 311 'fd': 'fd', 312 'addr': '( __u64)len',312 'addr': '(uintptr_t)len', 313 313 'len': 'mode', 314 314 'off': 'offset' … … 323 323 # CFA_HAVE_IORING_OP_MADVISE 324 324 Call('MADVISE', 'int madvise(void *addr, size_t length, int advice)', { 325 'addr': '( __u64)addr',325 'addr': '(uintptr_t)addr', 326 326 'len': 'length', 327 327 'fadvise_advice': 'advice' … … 330 330 Call('OPENAT', 'int openat(int dirfd, const char *pathname, int flags, mode_t mode)', { 331 331 'fd': 'dirfd', 332 'addr': '( __u64)pathname',332 'addr': '(uintptr_t)pathname', 333 333 'len': 'mode', 334 334 'open_flags': 'flags;' … … 339 339 'addr': 'pathname', 340 340 'len': 'sizeof(*how)', 341 'off': '( __u64)how',341 'off': '(uintptr_t)how', 342 342 }, define = 'CFA_HAVE_OPENAT2'), 343 343 # CFA_HAVE_IORING_OP_CLOSE … … 348 348 Call('STATX', 'int statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf)', { 349 349 'fd': 'dirfd', 350 'off': '( __u64)statxbuf',350 'off': '(uintptr_t)statxbuf', 351 351 'addr': 'pathname', 352 352 'len': 'mask', … … 356 356 Call('READ', 'ssize_t read(int fd, void * buf, size_t count)', { 357 357 'fd': 'fd', 358 'addr': '( __u64)buf',358 'addr': '(uintptr_t)buf', 359 359 'len': 'count' 360 360 }), … … 362 362 Call('WRITE', 'ssize_t write(int fd, void * buf, size_t count)', { 363 363 'fd': 'fd', 364 'addr': '( __u64)buf',364 'addr': '(uintptr_t)buf', 365 365 'len': 'count' 366 366 }), -
libcfa/src/concurrency/kernel.cfa
r857a1c6 rc8a0210 113 113 static void __wake_one(cluster * cltr); 114 114 115 static void push (__cluster_idles& idles, processor & proc);116 static void remove(__cluster_idles& idles, processor & proc);117 static [unsigned idle, unsigned total, * processor] query ( & __cluster_idlesidles );115 static void mark_idle (__cluster_proc_list & idles, processor & proc); 116 static void mark_awake(__cluster_proc_list & idles, processor & proc); 117 static [unsigned idle, unsigned total, * processor] query_idles( & __cluster_proc_list idles ); 118 118 119 119 extern void __cfa_io_start( processor * ); … … 189 189 190 190 // Push self to idle stack 191 push(this->cltr->idles, * this);191 mark_idle(this->cltr->procs, * this); 192 192 193 193 // Confirm the ready-queue is empty … … 195 195 if( readyThread ) { 196 196 // A thread was found, cancel the halt 197 remove(this->cltr->idles, * this);197 mark_awake(this->cltr->procs, * this); 198 198 199 199 #if !defined(__CFA_NO_STATISTICS__) … … 225 225 226 226 // We were woken up, remove self from idle 227 remove(this->cltr->idles, * this);227 mark_awake(this->cltr->procs, * this); 228 228 229 229 // DON'T just proceed, start looking again … … 359 359 #if !defined(__CFA_NO_STATISTICS__) 360 360 __tls_stats()->ready.threads.threads++; 361 __push_stat( __tls_stats(), __tls_stats()->ready.threads.threads, false, "Processor", this ); 361 362 #endif 362 363 // This is case 2, the racy case, someone tried to run this thread before it finished blocking … … 376 377 #if !defined(__CFA_NO_STATISTICS__) 377 378 __tls_stats()->ready.threads.threads--; 379 __push_stat( __tls_stats(), __tls_stats()->ready.threads.threads, false, "Processor", this ); 378 380 #endif 379 381 … … 455 457 if( kernelTLS().this_stats ) { 456 458 __tls_stats()->ready.threads.threads++; 459 __push_stat( __tls_stats(), __tls_stats()->ready.threads.threads, false, "Processor", kernelTLS().this_processor ); 457 460 } 458 461 else { 459 462 __atomic_fetch_add(&cl->stats->ready.threads.threads, 1, __ATOMIC_RELAXED); 463 __push_stat( cl->stats, cl->stats->ready.threads.threads, true, "Cluster", cl ); 460 464 } 461 465 #endif … … 470 474 471 475 ready_schedule_lock(); 472 $thread * thrd = pop ( this );476 $thread * thrd = pop_fast( this ); 473 477 ready_schedule_unlock(); 474 478 … … 613 617 unsigned idle; 614 618 unsigned total; 615 [idle, total, p] = query (this->idles);619 [idle, total, p] = query_idles(this->procs); 616 620 617 621 // If no one is sleeping, we are done … … 650 654 } 651 655 652 static void push (__cluster_idles& this, processor & proc) {656 static void mark_idle(__cluster_proc_list & this, processor & proc) { 653 657 /* paranoid */ verify( ! __preemption_enabled() ); 654 658 lock( this ); 655 659 this.idle++; 656 660 /* paranoid */ verify( this.idle <= this.total ); 657 658 insert_first(this. list, proc);661 remove(proc); 662 insert_first(this.idles, proc); 659 663 unlock( this ); 660 664 /* paranoid */ verify( ! __preemption_enabled() ); 661 665 } 662 666 663 static void remove(__cluster_idles& this, processor & proc) {667 static void mark_awake(__cluster_proc_list & this, processor & proc) { 664 668 /* paranoid */ verify( ! __preemption_enabled() ); 665 669 lock( this ); 666 670 this.idle--; 667 671 /* paranoid */ verify( this.idle >= 0 ); 668 669 672 remove(proc); 673 insert_last(this.actives, proc); 670 674 unlock( this ); 671 675 /* paranoid */ verify( ! __preemption_enabled() ); 672 676 } 673 677 674 static [unsigned idle, unsigned total, * processor] query( & __cluster_idles this ) { 678 static [unsigned idle, unsigned total, * processor] query_idles( & __cluster_proc_list this ) { 679 /* paranoid */ verify( ! __preemption_enabled() ); 680 /* paranoid */ verify( ready_schedule_islocked() ); 681 675 682 for() { 676 683 uint64_t l = __atomic_load_n(&this.lock, __ATOMIC_SEQ_CST); … … 678 685 unsigned idle = this.idle; 679 686 unsigned total = this.total; 680 processor * proc = &this. list`first;687 processor * proc = &this.idles`first; 681 688 // Compiler fence is unnecessary, but gcc-8 and older incorrectly reorder code without it 682 689 asm volatile("": : :"memory"); … … 684 691 return [idle, total, proc]; 685 692 } 693 694 /* paranoid */ verify( ready_schedule_islocked() ); 695 /* paranoid */ verify( ! __preemption_enabled() ); 686 696 } 687 697 -
libcfa/src/concurrency/kernel.hfa
r857a1c6 rc8a0210 69 69 struct cluster * cltr; 70 70 71 // Id within the cluster 72 unsigned cltr_id; 71 // Ready Queue state per processor 72 struct { 73 unsigned short its; 74 unsigned short itr; 75 unsigned id; 76 unsigned target; 77 unsigned long long int cutoff; 78 } rdq; 73 79 74 80 // Set to true to notify the processor should terminate … … 140 146 // Cluster Tools 141 147 142 // Intrusives lanes which are used by the re laxed ready queue148 // Intrusives lanes which are used by the ready queue 143 149 struct __attribute__((aligned(128))) __intrusive_lane_t; 144 150 void ?{}(__intrusive_lane_t & this); 145 151 void ^?{}(__intrusive_lane_t & this); 146 152 147 // Counter used for wether or not the lanes are all empty 148 struct __attribute__((aligned(128))) __snzi_node_t; 149 struct __snzi_t { 150 unsigned mask; 151 int root; 152 __snzi_node_t * nodes; 153 }; 154 155 void ?{}( __snzi_t & this, unsigned depth ); 156 void ^?{}( __snzi_t & this ); 153 // Aligned timestamps which are used by the relaxed ready queue 154 struct __attribute__((aligned(128))) __timestamp_t; 155 void ?{}(__timestamp_t & this); 156 void ^?{}(__timestamp_t & this); 157 157 158 158 //TODO adjust cache size to ARCHITECTURE 159 159 // Structure holding the relaxed ready queue 160 160 struct __ready_queue_t { 161 // Data tracking how many/which lanes are used162 // Aligned to 128 for cache locality163 __snzi_t snzi;164 165 161 // Data tracking the actual lanes 166 162 // On a seperate cacheline from the used struct since … … 171 167 __intrusive_lane_t * volatile data; 172 168 169 // Array of times 170 __timestamp_t * volatile tscs; 171 173 172 // Number of lanes (empty or not) 174 173 volatile size_t count; … … 180 179 181 180 // Idle Sleep 182 struct __cluster_ idles{181 struct __cluster_proc_list { 183 182 // Spin lock protecting the queue 184 183 volatile uint64_t lock; … … 191 190 192 191 // List of idle processors 193 dlist(processor, processor) list; 192 dlist(processor, processor) idles; 193 194 // List of active processors 195 dlist(processor, processor) actives; 194 196 }; 195 197 … … 207 209 208 210 // List of idle processors 209 __cluster_ idles idles;211 __cluster_proc_list procs; 210 212 211 213 // List of threads -
libcfa/src/concurrency/kernel/startup.cfa
r857a1c6 rc8a0210 268 268 __print_stats( st, mainProcessor->print_stats, "Processor ", mainProcessor->name, (void*)mainProcessor ); 269 269 } 270 #if defined(CFA_STATS_ARRAY) 271 __flush_stat( st, "Processor", mainProcessor ); 272 #endif 270 273 #endif 271 274 … … 348 351 __print_stats( &local_stats, proc->print_stats, "Processor ", proc->name, (void*)proc ); 349 352 } 353 #if defined(CFA_STATS_ARRAY) 354 __flush_stat( &local_stats, "Processor", proc ); 355 #endif 350 356 #endif 351 357 … … 463 469 this.name = name; 464 470 this.cltr = &_cltr; 471 this.rdq.its = 0; 472 this.rdq.itr = 0; 473 this.rdq.id = -1u; 474 this.rdq.target = -1u; 475 this.rdq.cutoff = -1ull; 465 476 do_terminate = false; 466 477 preemption_alarm = 0p; … … 483 494 #endif 484 495 485 lock( this.cltr->idles ); 486 int target = this.cltr->idles.total += 1u; 487 unlock( this.cltr->idles ); 488 489 id = doregister((__processor_id_t*)&this); 490 496 // Register and Lock the RWlock so no-one pushes/pops while we are changing the queue 497 uint_fast32_t last_size = ready_mutate_register((__processor_id_t*)&this); 498 this.cltr->procs.total += 1u; 499 insert_last(this.cltr->procs.actives, this); 500 501 // Adjust the ready queue size 502 ready_queue_grow( cltr ); 503 504 // Unlock the RWlock 505 ready_mutate_unlock( last_size ); 506 507 __cfadbg_print_safe(runtime_core, "Kernel : core %p created\n", &this); 508 } 509 510 // Not a ctor, it just preps the destruction but should not destroy members 511 static void deinit(processor & this) { 491 512 // Lock the RWlock so no-one pushes/pops while we are changing the queue 492 513 uint_fast32_t last_size = ready_mutate_lock(); 514 this.cltr->procs.total -= 1u; 515 remove(this); 493 516 494 517 // Adjust the ready queue size 495 this.cltr_id = ready_queue_grow( cltr, target ); 496 497 // Unlock the RWlock 498 ready_mutate_unlock( last_size ); 499 500 __cfadbg_print_safe(runtime_core, "Kernel : core %p created\n", &this); 501 } 502 503 // Not a ctor, it just preps the destruction but should not destroy members 504 static void deinit(processor & this) { 505 lock( this.cltr->idles ); 506 int target = this.cltr->idles.total -= 1u; 507 unlock( this.cltr->idles ); 508 509 // Lock the RWlock so no-one pushes/pops while we are changing the queue 510 uint_fast32_t last_size = ready_mutate_lock(); 511 512 // Adjust the ready queue size 513 ready_queue_shrink( this.cltr, target ); 514 515 // Unlock the RWlock 516 ready_mutate_unlock( last_size ); 517 518 // Finally we don't need the read_lock any more 519 unregister((__processor_id_t*)&this); 518 ready_queue_shrink( this.cltr ); 519 520 // Unlock the RWlock and unregister: we don't need the read_lock any more 521 ready_mutate_unregister((__processor_id_t*)&this, last_size ); 520 522 521 523 close(this.idle); … … 560 562 //----------------------------------------------------------------------------- 561 563 // Cluster 562 static void ?{}(__cluster_ idles& this) {564 static void ?{}(__cluster_proc_list & this) { 563 565 this.lock = 0; 564 566 this.idle = 0; 565 567 this.total = 0; 566 (this.list){};567 568 } 568 569 … … 590 591 591 592 // Adjust the ready queue size 592 ready_queue_grow( &this , 0);593 ready_queue_grow( &this ); 593 594 594 595 // Unlock the RWlock … … 605 606 606 607 // Adjust the ready queue size 607 ready_queue_shrink( &this , 0);608 ready_queue_shrink( &this ); 608 609 609 610 // Unlock the RWlock … … 615 616 __print_stats( this.stats, this.print_stats, "Cluster", this.name, (void*)&this ); 616 617 } 618 #if defined(CFA_STATS_ARRAY) 619 __flush_stat( this.stats, "Cluster", &this ); 620 #endif 617 621 free( this.stats ); 618 622 #endif -
libcfa/src/concurrency/kernel_private.hfa
r857a1c6 rc8a0210 83 83 // Cluster lock API 84 84 //======================================================================= 85 // Cells use by the reader writer lock86 // while not generic it only relies on a opaque pointer87 struct __attribute__((aligned(128))) __scheduler_lock_id_t {88 // Spin lock used as the underlying lock89 volatile bool lock;90 91 // Handle pointing to the proc owning this cell92 // Used for allocating cells and debugging93 __processor_id_t * volatile handle;94 95 #ifdef __CFA_WITH_VERIFY__96 // Debug, check if this is owned for reading97 bool owned;98 #endif99 };100 101 static_assert( sizeof(struct __scheduler_lock_id_t) <= __alignof(struct __scheduler_lock_id_t));102 103 85 // Lock-Free registering/unregistering of threads 104 86 // Register a processor to a given cluster and get its unique id in return 105 unsigned doregister( struct __processor_id_t * proc);87 void register_proc_id( struct __processor_id_t * ); 106 88 107 89 // Unregister a processor from a given cluster using its id, getting back the original pointer 108 void unregister( struct __processor_id_t * proc ); 109 110 //----------------------------------------------------------------------- 111 // Cluster idle lock/unlock 112 static inline void lock(__cluster_idles & this) { 113 for() { 114 uint64_t l = this.lock; 115 if( 116 (0 == (l % 2)) 117 && __atomic_compare_exchange_n(&this.lock, &l, l + 1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) 118 ) return; 119 Pause(); 120 } 121 } 122 123 static inline void unlock(__cluster_idles & this) { 124 /* paranoid */ verify( 1 == (this.lock % 2) ); 125 __atomic_fetch_add( &this.lock, 1, __ATOMIC_SEQ_CST ); 126 } 90 void unregister_proc_id( struct __processor_id_t * proc ); 127 91 128 92 //======================================================================= … … 152 116 __atomic_store_n(ll, (bool)false, __ATOMIC_RELEASE); 153 117 } 118 119 // Cells use by the reader writer lock 120 // while not generic it only relies on a opaque pointer 121 struct __attribute__((aligned(128))) __scheduler_lock_id_t { 122 // Spin lock used as the underlying lock 123 volatile bool lock; 124 125 // Handle pointing to the proc owning this cell 126 // Used for allocating cells and debugging 127 __processor_id_t * volatile handle; 128 129 #ifdef __CFA_WITH_VERIFY__ 130 // Debug, check if this is owned for reading 131 bool owned; 132 #endif 133 }; 134 135 static_assert( sizeof(struct __scheduler_lock_id_t) <= __alignof(struct __scheduler_lock_id_t)); 154 136 155 137 //----------------------------------------------------------------------- … … 247 229 void ready_mutate_unlock( uint_fast32_t /* value returned by lock */ ); 248 230 231 //----------------------------------------------------------------------- 232 // Lock-Free registering/unregistering of threads 233 // Register a processor to a given cluster and get its unique id in return 234 // For convenience, also acquires the lock 235 static inline uint_fast32_t ready_mutate_register( struct __processor_id_t * proc ) { 236 register_proc_id( proc ); 237 return ready_mutate_lock(); 238 } 239 240 // Unregister a processor from a given cluster using its id, getting back the original pointer 241 // assumes the lock is acquired 242 static inline void ready_mutate_unregister( struct __processor_id_t * proc, uint_fast32_t last_s ) { 243 ready_mutate_unlock( last_s ); 244 unregister_proc_id( proc ); 245 } 246 247 //----------------------------------------------------------------------- 248 // Cluster idle lock/unlock 249 static inline void lock(__cluster_proc_list & this) { 250 /* paranoid */ verify( ! __preemption_enabled() ); 251 252 // Start by locking the global RWlock so that we know no-one is 253 // adding/removing processors while we mess with the idle lock 254 ready_schedule_lock(); 255 256 // Simple counting lock, acquired, acquired by incrementing the counter 257 // to an odd number 258 for() { 259 uint64_t l = this.lock; 260 if( 261 (0 == (l % 2)) 262 && __atomic_compare_exchange_n(&this.lock, &l, l + 1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) 263 ) return; 264 Pause(); 265 } 266 267 /* paranoid */ verify( ! __preemption_enabled() ); 268 } 269 270 static inline void unlock(__cluster_proc_list & this) { 271 /* paranoid */ verify( ! __preemption_enabled() ); 272 273 /* paranoid */ verify( 1 == (this.lock % 2) ); 274 // Simple couting lock, release by incrementing to an even number 275 __atomic_fetch_add( &this.lock, 1, __ATOMIC_SEQ_CST ); 276 277 // Release the global lock, which we acquired when locking 278 ready_schedule_unlock(); 279 280 /* paranoid */ verify( ! __preemption_enabled() ); 281 } 282 249 283 //======================================================================= 250 284 // Ready-Queue API 251 285 //----------------------------------------------------------------------- 252 // pop thread from the ready queue of a cluster253 // returns 0p if empty254 __attribute__((hot)) bool query(struct cluster * cltr);255 256 //-----------------------------------------------------------------------257 286 // push thread onto a ready queue for a cluster 258 287 // returns true if the list was previously empty, false otherwise 259 __attribute__((hot)) boolpush(struct cluster * cltr, struct $thread * thrd);288 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd); 260 289 261 290 //----------------------------------------------------------------------- … … 263 292 // returns 0p if empty 264 293 // May return 0p spuriously 265 __attribute__((hot)) struct $thread * pop (struct cluster * cltr);294 __attribute__((hot)) struct $thread * pop_fast(struct cluster * cltr); 266 295 267 296 //----------------------------------------------------------------------- … … 272 301 273 302 //----------------------------------------------------------------------- 274 // remove thread from the ready queue of a cluster275 // returns bool if it wasn't found276 bool remove_head(struct cluster * cltr, struct $thread * thrd);277 278 //-----------------------------------------------------------------------279 303 // Increase the width of the ready queue (number of lanes) by 4 280 unsigned ready_queue_grow (struct cluster * cltr, int target);304 void ready_queue_grow (struct cluster * cltr); 281 305 282 306 //----------------------------------------------------------------------- 283 307 // Decrease the width of the ready queue (number of lanes) by 4 284 void ready_queue_shrink(struct cluster * cltr , int target);308 void ready_queue_shrink(struct cluster * cltr); 285 309 286 310 -
libcfa/src/concurrency/preemption.cfa
r857a1c6 rc8a0210 712 712 static void * alarm_loop( __attribute__((unused)) void * args ) { 713 713 __processor_id_t id; 714 id.id = doregister(&id);714 register_proc_id(&id); 715 715 __cfaabi_tls.this_proc_id = &id; 716 716 … … 773 773 EXIT: 774 774 __cfaabi_dbg_print_safe( "Kernel : Preemption thread stopping\n" ); 775 unregister(&id);775 register_proc_id(&id); 776 776 777 777 return 0p; -
libcfa/src/concurrency/ready_queue.cfa
r857a1c6 rc8a0210 17 17 // #define __CFA_DEBUG_PRINT_READY_QUEUE__ 18 18 19 // #define USE_SNZI20 19 // #define USE_MPSC 20 21 #define USE_RELAXED_FIFO 22 // #define USE_WORK_STEALING 21 23 22 24 #include "bits/defs.hfa" … … 29 31 #include <unistd.h> 30 32 31 #include "snzi.hfa"32 33 #include "ready_subqueue.hfa" 33 34 … … 40 41 #endif 41 42 42 #define BIAS 4 43 #if defined(USE_RELAXED_FIFO) 44 #define BIAS 4 45 #define READYQ_SHARD_FACTOR 4 46 #define SEQUENTIAL_SHARD 1 47 #elif defined(USE_WORK_STEALING) 48 #define READYQ_SHARD_FACTOR 2 49 #define SEQUENTIAL_SHARD 2 50 #else 51 #error no scheduling strategy selected 52 #endif 53 54 static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred); 55 static inline struct $thread * try_pop(struct cluster * cltr, unsigned w); 56 static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j); 57 static inline struct $thread * search(struct cluster * cltr); 58 43 59 44 60 // returns the maximum number of processors the RWLock support … … 94 110 //======================================================================= 95 111 // Lock-Free registering/unregistering of threads 96 unsigned doregister( struct __processor_id_t * proc ) with(*__scheduler_lock) {112 void register_proc_id( struct __processor_id_t * proc ) with(*__scheduler_lock) { 97 113 __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc); 98 114 … … 108 124 /*paranoid*/ verify(0 == (__alignof__(data[i]) % cache_line_size)); 109 125 /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0); 110 returni;126 proc->id = i; 111 127 } 112 128 } … … 135 151 /*paranoid*/ verify(__alignof__(data[n]) == (2 * cache_line_size)); 136 152 /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0); 137 returnn;138 } 139 140 void unregister ( struct __processor_id_t * proc ) with(*__scheduler_lock) {153 proc->id = n; 154 } 155 156 void unregister_proc_id( struct __processor_id_t * proc ) with(*__scheduler_lock) { 141 157 unsigned id = proc->id; 142 158 /*paranoid*/ verify(id < ready); … … 193 209 194 210 //======================================================================= 195 // Cforall Re qdy Queue used for scheduling211 // Cforall Ready Queue used for scheduling 196 212 //======================================================================= 197 213 void ?{}(__ready_queue_t & this) with (this) { 198 214 lanes.data = 0p; 215 lanes.tscs = 0p; 199 216 lanes.count = 0; 200 217 } 201 218 202 219 void ^?{}(__ready_queue_t & this) with (this) { 203 verify( 1 == lanes.count ); 204 #ifdef USE_SNZI 205 verify( !query( snzi ) ); 206 #endif 220 verify( SEQUENTIAL_SHARD == lanes.count ); 207 221 free(lanes.data); 222 free(lanes.tscs); 208 223 } 209 224 210 225 //----------------------------------------------------------------------- 211 __attribute__((hot)) bool query(struct cluster * cltr) { 212 #ifdef USE_SNZI 213 return query(cltr->ready_queue.snzi); 214 #endif 215 return true; 216 } 217 218 static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred) { 219 unsigned i; 220 bool local; 221 #if defined(BIAS) 226 #if defined(USE_RELAXED_FIFO) 227 //----------------------------------------------------------------------- 228 // get index from random number with or without bias towards queues 229 static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred) { 230 unsigned i; 231 bool local; 222 232 unsigned rlow = r % BIAS; 223 233 unsigned rhigh = r / BIAS; … … 225 235 // (BIAS - 1) out of BIAS chances 226 236 // Use perferred queues 227 i = preferred + (rhigh % 4);237 i = preferred + (rhigh % READYQ_SHARD_FACTOR); 228 238 local = true; 229 239 } … … 234 244 local = false; 235 245 } 236 #else 237 i = r; 238 local = false; 239 #endif 240 return [i, local]; 241 } 242 243 //----------------------------------------------------------------------- 244 __attribute__((hot)) bool push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) { 245 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr); 246 247 const bool external = (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr); 248 249 // write timestamp 250 thrd->link.ts = rdtscl(); 251 252 bool first = false; 253 __attribute__((unused)) bool local; 254 __attribute__((unused)) int preferred; 255 #if defined(BIAS) 256 preferred = 257 //* 258 external ? -1 : kernelTLS().this_processor->cltr_id; 259 /*/ 260 thrd->link.preferred * 4; 261 //*/ 262 #endif 263 264 // Try to pick a lane and lock it 265 unsigned i; 266 do { 267 // Pick the index of a lane 268 // unsigned r = __tls_rand(); 269 unsigned r = __tls_rand_fwd(); 270 [i, local] = idx_from_r(r, preferred); 271 272 i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED ); 273 246 return [i, local]; 247 } 248 249 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) { 250 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr); 251 252 const bool external = (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr); 253 /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count ); 254 255 // write timestamp 256 thrd->link.ts = rdtscl(); 257 258 bool local; 259 int preferred = external ? -1 : kernelTLS().this_processor->rdq.id; 260 261 // Try to pick a lane and lock it 262 unsigned i; 263 do { 264 // Pick the index of a lane 265 unsigned r = __tls_rand_fwd(); 266 [i, local] = idx_from_r(r, preferred); 267 268 i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED ); 269 270 #if !defined(__CFA_NO_STATISTICS__) 271 if(external) { 272 if(local) __atomic_fetch_add(&cltr->stats->ready.pick.ext.local, 1, __ATOMIC_RELAXED); 273 __atomic_fetch_add(&cltr->stats->ready.pick.ext.attempt, 1, __ATOMIC_RELAXED); 274 } 275 else { 276 if(local) __tls_stats()->ready.pick.push.local++; 277 __tls_stats()->ready.pick.push.attempt++; 278 } 279 #endif 280 281 #if defined(USE_MPSC) 282 // mpsc always succeeds 283 } while( false ); 284 #else 285 // If we can't lock it retry 286 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); 287 #endif 288 289 // Actually push it 290 push(lanes.data[i], thrd); 291 292 #if !defined(USE_MPSC) 293 // Unlock and return 294 __atomic_unlock( &lanes.data[i].lock ); 295 #endif 296 297 // Mark the current index in the tls rng instance as having an item 298 __tls_rand_advance_bck(); 299 300 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first); 301 302 // Update statistics 274 303 #if !defined(__CFA_NO_STATISTICS__) 275 304 if(external) { 276 if(local) __atomic_fetch_add(&cltr->stats->ready.pick.ext.l ocal, 1, __ATOMIC_RELAXED);277 __atomic_fetch_add(&cltr->stats->ready.pick.ext. attempt, 1, __ATOMIC_RELAXED);305 if(local) __atomic_fetch_add(&cltr->stats->ready.pick.ext.lsuccess, 1, __ATOMIC_RELAXED); 306 __atomic_fetch_add(&cltr->stats->ready.pick.ext.success, 1, __ATOMIC_RELAXED); 278 307 } 279 308 else { 280 if(local) __tls_stats()->ready.pick.push.l ocal++;281 __tls_stats()->ready.pick.push. attempt++;309 if(local) __tls_stats()->ready.pick.push.lsuccess++; 310 __tls_stats()->ready.pick.push.success++; 282 311 } 283 312 #endif 284 285 #if defined(USE_MPSC) 286 // mpsc always succeeds 287 } while( false ); 288 #else 289 // If we can't lock it retry 290 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); 291 #endif 292 293 // Actually push it 294 #ifdef USE_SNZI 295 bool lane_first = 296 #endif 297 298 push(lanes.data[i], thrd); 299 300 #ifdef USE_SNZI 301 // If this lane used to be empty we need to do more 302 if(lane_first) { 303 // Check if the entire queue used to be empty 304 first = !query(snzi); 305 306 // Update the snzi 307 arrive( snzi, i ); 308 } 309 #endif 310 311 #if !defined(USE_MPSC) 312 // Unlock and return 313 __atomic_unlock( &lanes.data[i].lock ); 314 #endif 315 316 // Mark the current index in the tls rng instance as having an item 317 __tls_rand_advance_bck(); 318 319 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first); 313 } 314 315 // Pop from the ready queue from a given cluster 316 __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) { 317 /* paranoid */ verify( lanes.count > 0 ); 318 /* paranoid */ verify( kernelTLS().this_processor ); 319 /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count ); 320 321 unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED ); 322 int preferred = kernelTLS().this_processor->rdq.id; 323 324 325 // As long as the list is not empty, try finding a lane that isn't empty and pop from it 326 for(25) { 327 // Pick two lists at random 328 unsigned ri = __tls_rand_bck(); 329 unsigned rj = __tls_rand_bck(); 330 331 unsigned i, j; 332 __attribute__((unused)) bool locali, localj; 333 [i, locali] = idx_from_r(ri, preferred); 334 [j, localj] = idx_from_r(rj, preferred); 335 336 #if !defined(__CFA_NO_STATISTICS__) 337 if(locali && localj) { 338 __tls_stats()->ready.pick.pop.local++; 339 } 340 #endif 341 342 i %= count; 343 j %= count; 344 345 // try popping from the 2 picked lists 346 struct $thread * thrd = try_pop(cltr, i, j); 347 if(thrd) { 348 #if !defined(__CFA_NO_STATISTICS__) 349 if( locali || localj ) __tls_stats()->ready.pick.pop.lsuccess++; 350 #endif 351 return thrd; 352 } 353 } 354 355 // All lanes where empty return 0p 356 return 0p; 357 } 358 359 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) { 360 return search(cltr); 361 } 362 #endif 363 #if defined(USE_WORK_STEALING) 364 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) { 365 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr); 366 367 const bool external = (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr); 368 /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count ); 369 370 // write timestamp 371 thrd->link.ts = rdtscl(); 372 373 // Try to pick a lane and lock it 374 unsigned i; 375 do { 376 if(unlikely(external)) { 377 i = __tls_rand() % lanes.count; 378 } 379 else { 380 processor * proc = kernelTLS().this_processor; 381 unsigned r = proc->rdq.its++; 382 i = proc->rdq.id + (r % READYQ_SHARD_FACTOR); 383 } 384 385 386 #if defined(USE_MPSC) 387 // mpsc always succeeds 388 } while( false ); 389 #else 390 // If we can't lock it retry 391 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); 392 #endif 393 394 // Actually push it 395 push(lanes.data[i], thrd); 396 397 #if !defined(USE_MPSC) 398 // Unlock and return 399 __atomic_unlock( &lanes.data[i].lock ); 400 #endif 401 402 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first); 403 } 404 405 // Pop from the ready queue from a given cluster 406 __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) { 407 /* paranoid */ verify( lanes.count > 0 ); 408 /* paranoid */ verify( kernelTLS().this_processor ); 409 /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count ); 410 411 processor * proc = kernelTLS().this_processor; 412 413 if(proc->rdq.target == -1u) { 414 proc->rdq.target = __tls_rand() % lanes.count; 415 unsigned it1 = proc->rdq.itr; 416 unsigned it2 = proc->rdq.itr + 1; 417 unsigned idx1 = proc->rdq.id + (it1 % READYQ_SHARD_FACTOR); 418 unsigned idx2 = proc->rdq.id + (it1 % READYQ_SHARD_FACTOR); 419 unsigned long long tsc1 = ts(lanes.data[idx1]); 420 unsigned long long tsc2 = ts(lanes.data[idx2]); 421 proc->rdq.cutoff = min(tsc1, tsc2); 422 } 423 else if(lanes.tscs[proc->rdq.target].tv < proc->rdq.cutoff) { 424 $thread * t = try_pop(cltr, proc->rdq.target); 425 proc->rdq.target = -1u; 426 if(t) return t; 427 } 428 429 for(READYQ_SHARD_FACTOR) { 430 unsigned i = proc->rdq.id + (--proc->rdq.itr % READYQ_SHARD_FACTOR); 431 if($thread * t = try_pop(cltr, i)) return t; 432 } 433 return 0p; 434 } 435 436 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) { 437 for(25) { 438 unsigned i = __tls_rand() % lanes.count; 439 $thread * t = try_pop(cltr, i); 440 if(t) return t; 441 } 442 443 return search(cltr); 444 } 445 #endif 446 447 //======================================================================= 448 // Various Ready Queue utilities 449 //======================================================================= 450 // these function work the same or almost the same 451 // whether they are using work-stealing or relaxed fifo scheduling 452 453 //----------------------------------------------------------------------- 454 // try to pop from a lane given by index w 455 static inline struct $thread * try_pop(struct cluster * cltr, unsigned w) with (cltr->ready_queue) { 456 // Get relevant elements locally 457 __intrusive_lane_t & lane = lanes.data[w]; 458 459 // If list looks empty retry 460 if( is_empty(lane) ) return 0p; 461 462 // If we can't get the lock retry 463 if( !__atomic_try_acquire(&lane.lock) ) return 0p; 464 465 // If list is empty, unlock and retry 466 if( is_empty(lane) ) { 467 __atomic_unlock(&lane.lock); 468 return 0p; 469 } 470 471 // Actually pop the list 472 struct $thread * thrd; 473 thrd = pop(lane); 474 475 /* paranoid */ verify(thrd); 476 /* paranoid */ verify(lane.lock); 477 478 // Unlock and return 479 __atomic_unlock(&lane.lock); 320 480 321 481 // Update statistics 322 482 #if !defined(__CFA_NO_STATISTICS__) 323 if(external) { 324 if(local) __atomic_fetch_add(&cltr->stats->ready.pick.ext.lsuccess, 1, __ATOMIC_RELAXED); 325 __atomic_fetch_add(&cltr->stats->ready.pick.ext.success, 1, __ATOMIC_RELAXED); 326 } 327 else { 328 if(local) __tls_stats()->ready.pick.push.lsuccess++; 329 __tls_stats()->ready.pick.push.success++; 330 } 483 __tls_stats()->ready.pick.pop.success++; 331 484 #endif 332 485 333 // return whether or not the list was empty before this push 334 return first; 335 } 336 337 static struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j); 338 static struct $thread * try_pop(struct cluster * cltr, unsigned i); 339 340 // Pop from the ready queue from a given cluster 341 __attribute__((hot)) $thread * pop(struct cluster * cltr) with (cltr->ready_queue) { 342 /* paranoid */ verify( lanes.count > 0 ); 343 unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED ); 344 int preferred; 345 #if defined(BIAS) 346 // Don't bother trying locally too much 347 preferred = kernelTLS().this_processor->cltr_id; 486 #if defined(USE_WORK_STEALING) 487 lanes.tscs[w].tv = thrd->link.ts; 348 488 #endif 349 489 350 351 // As long as the list is not empty, try finding a lane that isn't empty and pop from it 352 #ifdef USE_SNZI 353 while( query(snzi) ) { 354 #else 355 for(25) { 356 #endif 357 // Pick two lists at random 358 // unsigned ri = __tls_rand(); 359 // unsigned rj = __tls_rand(); 360 unsigned ri = __tls_rand_bck(); 361 unsigned rj = __tls_rand_bck(); 362 363 unsigned i, j; 364 __attribute__((unused)) bool locali, localj; 365 [i, locali] = idx_from_r(ri, preferred); 366 [j, localj] = idx_from_r(rj, preferred); 367 368 #if !defined(__CFA_NO_STATISTICS__) 369 if(locali && localj) { 370 __tls_stats()->ready.pick.pop.local++; 371 } 372 #endif 373 374 i %= count; 375 j %= count; 376 377 // try popping from the 2 picked lists 378 struct $thread * thrd = try_pop(cltr, i, j); 379 if(thrd) { 380 #if defined(BIAS) && !defined(__CFA_NO_STATISTICS__) 381 if( locali || localj ) __tls_stats()->ready.pick.pop.lsuccess++; 382 #endif 383 return thrd; 384 } 385 } 386 387 // All lanes where empty return 0p 388 return 0p; 389 } 390 391 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) { 490 // return the popped thread 491 return thrd; 492 } 493 494 //----------------------------------------------------------------------- 495 // try to pop from any lanes making sure you don't miss any threads push 496 // before the start of the function 497 static inline struct $thread * search(struct cluster * cltr) with (cltr->ready_queue) { 392 498 /* paranoid */ verify( lanes.count > 0 ); 393 499 unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED ); … … 405 511 } 406 512 407 408 513 //----------------------------------------------------------------------- 409 // Given 2 indexes, pick the list with the oldest push an try to pop from it 410 static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j) with (cltr->ready_queue) { 411 #if !defined(__CFA_NO_STATISTICS__) 412 __tls_stats()->ready.pick.pop.attempt++; 413 #endif 414 415 // Pick the bet list 416 int w = i; 417 if( __builtin_expect(!is_empty(lanes.data[j]), true) ) { 418 w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j; 419 } 420 421 return try_pop(cltr, w); 422 } 423 424 static inline struct $thread * try_pop(struct cluster * cltr, unsigned w) with (cltr->ready_queue) { 425 // Get relevant elements locally 426 __intrusive_lane_t & lane = lanes.data[w]; 427 428 // If list looks empty retry 429 if( is_empty(lane) ) return 0p; 430 431 // If we can't get the lock retry 432 if( !__atomic_try_acquire(&lane.lock) ) return 0p; 433 434 435 // If list is empty, unlock and retry 436 if( is_empty(lane) ) { 437 __atomic_unlock(&lane.lock); 438 return 0p; 439 } 440 441 // Actually pop the list 442 struct $thread * thrd; 443 thrd = pop(lane); 444 445 /* paranoid */ verify(thrd); 446 /* paranoid */ verify(lane.lock); 447 448 #ifdef USE_SNZI 449 // If this was the last element in the lane 450 if(emptied) { 451 depart( snzi, w ); 452 } 453 #endif 454 455 // Unlock and return 456 __atomic_unlock(&lane.lock); 457 458 // Update statistics 459 #if !defined(__CFA_NO_STATISTICS__) 460 __tls_stats()->ready.pick.pop.success++; 461 #endif 462 463 // Update the thread bias 464 thrd->link.preferred = w / 4; 465 466 // return the popped thread 467 return thrd; 468 } 469 //----------------------------------------------------------------------- 470 471 bool remove_head(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) { 472 for(i; lanes.count) { 473 __intrusive_lane_t & lane = lanes.data[i]; 474 475 bool removed = false; 476 477 __atomic_acquire(&lane.lock); 478 if(head(lane)->link.next == thrd) { 479 $thread * pthrd; 480 pthrd = pop(lane); 481 482 /* paranoid */ verify( pthrd == thrd ); 483 484 removed = true; 485 #ifdef USE_SNZI 486 if(emptied) { 487 depart( snzi, i ); 488 } 489 #endif 490 } 491 __atomic_unlock(&lane.lock); 492 493 if( removed ) return true; 494 } 495 return false; 496 } 497 498 //----------------------------------------------------------------------- 499 514 // Check that all the intrusive queues in the data structure are still consistent 500 515 static void check( __ready_queue_t & q ) with (q) { 501 516 #if defined(__CFA_WITH_VERIFY__) && !defined(USE_MPSC) … … 522 537 } 523 538 539 //----------------------------------------------------------------------- 540 // Given 2 indexes, pick the list with the oldest push an try to pop from it 541 static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j) with (cltr->ready_queue) { 542 #if !defined(__CFA_NO_STATISTICS__) 543 __tls_stats()->ready.pick.pop.attempt++; 544 #endif 545 546 // Pick the bet list 547 int w = i; 548 if( __builtin_expect(!is_empty(lanes.data[j]), true) ) { 549 w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j; 550 } 551 552 return try_pop(cltr, w); 553 } 554 524 555 // Call this function of the intrusive list was moved using memcpy 525 556 // fixes the list so that the pointers back to anchors aren't left dangling … … 541 572 } 542 573 574 static void assign_list(unsigned & value, dlist(processor, processor) & list, unsigned count) { 575 processor * it = &list`first; 576 for(unsigned i = 0; i < count; i++) { 577 /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count); 578 it->rdq.id = value; 579 it->rdq.target = -1u; 580 value += READYQ_SHARD_FACTOR; 581 it = &(*it)`next; 582 } 583 } 584 585 static void reassign_cltr_id(struct cluster * cltr) { 586 unsigned preferred = 0; 587 assign_list(preferred, cltr->procs.actives, cltr->procs.total - cltr->procs.idle); 588 assign_list(preferred, cltr->procs.idles , cltr->procs.idle ); 589 } 590 591 static void fix_times( struct cluster * cltr ) with( cltr->ready_queue ) { 592 #if defined(USE_WORK_STEALING) 593 lanes.tscs = alloc(lanes.count, lanes.tscs`realloc); 594 for(i; lanes.count) { 595 lanes.tscs[i].tv = ts(lanes.data[i]); 596 } 597 #endif 598 } 599 543 600 // Grow the ready queue 544 unsigned ready_queue_grow(struct cluster * cltr, int target) { 545 unsigned preferred; 601 void ready_queue_grow(struct cluster * cltr) { 546 602 size_t ncount; 603 int target = cltr->procs.total; 547 604 548 605 /* paranoid */ verify( ready_mutate_islocked() ); … … 554 611 // grow the ready queue 555 612 with( cltr->ready_queue ) { 556 #ifdef USE_SNZI557 ^(snzi){};558 #endif559 560 613 // Find new count 561 614 // Make sure we always have atleast 1 list 562 615 if(target >= 2) { 563 ncount = target * 4; 564 preferred = ncount - 4; 616 ncount = target * READYQ_SHARD_FACTOR; 565 617 } else { 566 ncount = 1; 567 preferred = 0; 618 ncount = SEQUENTIAL_SHARD; 568 619 } 569 620 … … 583 634 // Update original 584 635 lanes.count = ncount; 585 586 #ifdef USE_SNZI 587 // Re-create the snzi 588 snzi{ log2( lanes.count / 8 ) }; 589 for( idx; (size_t)lanes.count ) { 590 if( !is_empty(lanes.data[idx]) ) { 591 arrive(snzi, idx); 592 } 593 } 594 #endif 595 } 636 } 637 638 fix_times(cltr); 639 640 reassign_cltr_id(cltr); 596 641 597 642 // Make sure that everything is consistent … … 601 646 602 647 /* paranoid */ verify( ready_mutate_islocked() ); 603 return preferred;604 648 } 605 649 606 650 // Shrink the ready queue 607 void ready_queue_shrink(struct cluster * cltr , int target) {651 void ready_queue_shrink(struct cluster * cltr) { 608 652 /* paranoid */ verify( ready_mutate_islocked() ); 609 653 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n"); … … 612 656 /* paranoid */ check( cltr->ready_queue ); 613 657 658 int target = cltr->procs.total; 659 614 660 with( cltr->ready_queue ) { 615 #ifdef USE_SNZI616 ^(snzi){};617 #endif618 619 661 // Remember old count 620 662 size_t ocount = lanes.count; … … 622 664 // Find new count 623 665 // Make sure we always have atleast 1 list 624 lanes.count = target >= 2 ? target * 4: 1;666 lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: SEQUENTIAL_SHARD; 625 667 /* paranoid */ verify( ocount >= lanes.count ); 626 /* paranoid */ verify( lanes.count == target * 4|| target < 2 );668 /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 ); 627 669 628 670 // for printing count the number of displaced threads … … 667 709 fix(lanes.data[idx]); 668 710 } 669 670 #ifdef USE_SNZI 671 // Re-create the snzi 672 snzi{ log2( lanes.count / 8 ) }; 673 for( idx; (size_t)lanes.count ) { 674 if( !is_empty(lanes.data[idx]) ) { 675 arrive(snzi, idx); 676 } 677 } 678 #endif 679 } 711 } 712 713 fix_times(cltr); 714 715 reassign_cltr_id(cltr); 680 716 681 717 // Make sure that everything is consistent -
libcfa/src/concurrency/ready_subqueue.hfa
r857a1c6 rc8a0210 246 246 #endif 247 247 } 248 249 // Aligned timestamps which are used by the relaxed ready queue 250 struct __attribute__((aligned(128))) __timestamp_t { 251 volatile unsigned long long tv; 252 }; 253 254 void ?{}(__timestamp_t & this) { this.tv = 0; } 255 void ^?{}(__timestamp_t & this) {} -
libcfa/src/concurrency/stats.cfa
r857a1c6 rc8a0210 5 5 #include <inttypes.h> 6 6 #include "bits/debug.hfa" 7 #include "bits/locks.hfa" 7 8 #include "stats.hfa" 8 9 … … 44 45 stats->io.calls.errors.busy = 0; 45 46 stats->io.poller.sleeps = 0; 47 #endif 48 49 #if defined(CFA_STATS_ARRAY) 50 stats->array.values = alloc(CFA_STATS_ARRAY); 51 stats->array.cnt = 0; 46 52 #endif 47 53 } … … 151 157 #endif 152 158 } 159 160 #if defined(CFA_STATS_ARRAY) 161 extern "C" { 162 #include <stdio.h> 163 #include <errno.h> 164 #include <sys/stat.h> 165 #include <fcntl.h> 166 } 167 168 void __flush_stat( struct __stats_t * this, const char * name, void * handle) { 169 int ret = mkdir(".cfadata", 0755); 170 if(ret < 0 && errno != EEXIST) abort("Failed to create directory .cfadata: %d\n", errno); 171 172 char filename[100]; 173 snprintf(filename, 100, ".cfadata/%s%p.data", name, handle); 174 175 int fd = open(filename, O_WRONLY | O_APPEND | O_CREAT, 0644); 176 if(fd < 0) abort("Failed to create file %s: %d\n", filename, errno); 177 178 for(i; this->array.cnt) { 179 char line[100]; 180 size_t n = snprintf(line, 100, "%llu, %lld\n", this->array.values[i].ts, this->array.values[i].value); 181 write(fd, line, n); 182 } 183 184 this->array.cnt = 0; 185 close(fd); 186 } 187 188 static __spinlock_t stats_lock; 189 190 void __push_stat( struct __stats_t * this, int64_t value, bool external, const char * name, void * handle ) { 191 if(external) lock(stats_lock __cfaabi_dbg_ctx2); 192 193 if( this->array.cnt >= CFA_STATS_ARRAY ) __flush_stat( this, name, handle ); 194 195 size_t idx = this->array.cnt; 196 this->array.cnt++; 197 198 if(external) unlock(stats_lock); 199 200 this->array.values[idx].ts = rdtscl(); 201 this->array.values[idx].value = value; 202 } 203 #endif 153 204 #endif -
libcfa/src/concurrency/stats.hfa
r857a1c6 rc8a0210 1 1 #pragma once 2 3 // #define CFA_STATS_ARRAY 10000 2 4 3 5 #include <stdint.h> … … 109 111 #endif 110 112 113 #if defined(CFA_STATS_ARRAY) 114 struct __stats_elem_t { 115 long long int ts; 116 int64_t value; 117 }; 118 #endif 119 111 120 struct __attribute__((aligned(128))) __stats_t { 112 121 __stats_readQ_t ready; … … 114 123 __stats_io_t io; 115 124 #endif 125 126 #if defined(CFA_STATS_ARRAY) 127 struct { 128 __stats_elem_t * values; 129 volatile size_t cnt; 130 } array; 131 #endif 132 116 133 }; 117 134 … … 119 136 void __tally_stats( struct __stats_t *, struct __stats_t * ); 120 137 void __print_stats( struct __stats_t *, int, const char *, const char *, void * ); 138 #if defined(CFA_STATS_ARRAY) 139 void __push_stat ( struct __stats_t *, int64_t value, bool external, const char * name, void * handle); 140 void __flush_stat( struct __stats_t *, const char *, void * ); 141 #else 142 static inline void __push_stat ( struct __stats_t *, int64_t, bool, const char *, void * ) {} 143 static inline void __flush_stat( struct __stats_t *, const char *, void * ) {} 144 #endif 121 145 #endif 122 146 -
libcfa/src/concurrency/thread.cfa
r857a1c6 rc8a0210 39 39 link.next = 0p; 40 40 link.prev = 0p; 41 link.preferred = -1;42 41 #if defined( __CFA_WITH_VERIFY__ ) 43 42 canary = 0x0D15EA5E0D15EA5Ep; … … 62 61 } 63 62 64 FORALL_DATA_INSTANCE(ThreadCancelled, (thread_t &), (thread_t)) 63 EHM_VIRTUAL_TABLE(SomeThreadCancelled, std_thread_cancelled); 65 64 66 65 forall(T &) … … 73 72 forall(T &) 74 73 const char * msg(ThreadCancelled(T) *) { 75 return "ThreadCancelled ";74 return "ThreadCancelled(...)"; 76 75 } 77 76 78 77 forall(T &) 79 78 static void default_thread_cancel_handler(ThreadCancelled(T) & ) { 79 // Improve this error message, can I do formatting? 80 80 abort( "Unhandled thread cancellation.\n" ); 81 81 } 82 82 83 forall(T & | is_thread(T) | IS_EXCEPTION(ThreadCancelled, (T))) 83 static void default_thread_cancel_handler(SomeThreadCancelled & ) { 84 // Improve this error message, can I do formatting? 85 abort( "Unhandled thread cancellation.\n" ); 86 } 87 88 forall(T & | is_thread(T) | IS_EXCEPTION(SomeThreadCancelled)) 84 89 void ?{}( thread_dtor_guard_t & this, 85 T & thrd, void(*cancelHandler)( ThreadCancelled(T)&)) {86 $monitor * m = get_monitor(thrd);90 T & thrd, void(*cancelHandler)(SomeThreadCancelled &)) { 91 $monitor * m = get_monitor(thrd); 87 92 $thread * desc = get_thread(thrd); 88 93 89 94 // Setup the monitor guard 90 95 void (*dtor)(T& mutex this) = ^?{}; 91 bool join = cancelHandler != (void(*)( ThreadCancelled(T)&))0;96 bool join = cancelHandler != (void(*)(SomeThreadCancelled&))0; 92 97 (this.mg){&m, (void(*)())dtor, join}; 93 98 … … 103 108 } 104 109 desc->state = Cancelled; 105 void(*defaultResumptionHandler)( ThreadCancelled(T) &) =110 void(*defaultResumptionHandler)(SomeThreadCancelled &) = 106 111 join ? cancelHandler : default_thread_cancel_handler; 107 112 108 ThreadCancelled(T) except;109 113 // TODO: Remove explitate vtable set once trac#186 is fixed. 110 except.virtual_table = &get_exception_vtable(&except); 114 SomeThreadCancelled except; 115 except.virtual_table = &std_thread_cancelled; 111 116 except.the_thread = &thrd; 112 117 except.the_exception = __cfaehm_cancellation_exception( cancellation ); 113 throwResume except; 118 // Why is this cast required? 119 throwResume (SomeThreadCancelled &)except; 114 120 115 121 except.the_exception->virtual_table->free( except.the_exception ); … … 158 164 159 165 //----------------------------------------------------------------------------- 160 forall(T & | is_thread(T) | IS_RESUMPTION_EXCEPTION( ThreadCancelled, (T)))166 forall(T & | is_thread(T) | IS_RESUMPTION_EXCEPTION(SomeThreadCancelled)) 161 167 T & join( T & this ) { 162 168 thread_dtor_guard_t guard = { this, defaultResumptionHandler }; -
libcfa/src/concurrency/thread.hfa
r857a1c6 rc8a0210 32 32 }; 33 33 34 FORALL_DATA_EXCEPTION(ThreadCancelled, (thread_t &), (thread_t)) ( 34 EHM_EXCEPTION(SomeThreadCancelled) ( 35 void * the_thread; 36 exception_t * the_exception; 37 ); 38 39 EHM_EXTERN_VTABLE(SomeThreadCancelled, std_thread_cancelled); 40 41 EHM_FORALL_EXCEPTION(ThreadCancelled, (thread_t &), (thread_t)) ( 35 42 thread_t * the_thread; 36 43 exception_t * the_exception; … … 79 86 }; 80 87 81 forall( T & | is_thread(T) | IS_EXCEPTION( ThreadCancelled, (T)) )82 void ?{}( thread_dtor_guard_t & this, T & thrd, void(*)( ThreadCancelled(T)&) );88 forall( T & | is_thread(T) | IS_EXCEPTION(SomeThreadCancelled) ) 89 void ?{}( thread_dtor_guard_t & this, T & thrd, void(*)(SomeThreadCancelled &) ); 83 90 void ^?{}( thread_dtor_guard_t & this ); 84 91 … … 125 132 //---------- 126 133 // join 127 forall( T & | is_thread(T) | IS_RESUMPTION_EXCEPTION( ThreadCancelled, (T)) )134 forall( T & | is_thread(T) | IS_RESUMPTION_EXCEPTION(SomeThreadCancelled) ) 128 135 T & join( T & this ); 129 136
Note:
See TracChangeset
for help on using the changeset viewer.