Changeset e67a82d for libcfa/src/concurrency
- Timestamp:
- Aug 20, 2020, 11:48:15 PM (4 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- d685cb0
- Parents:
- 67ca73e (diff), 013b028 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - Location:
- libcfa/src/concurrency
- Files:
-
- 17 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/alarm.hfa
r67ca73e re67a82d 23 23 #include "time.hfa" 24 24 25 #include <containers/list.hfa>25 #include "containers/list.hfa" 26 26 27 27 struct $thread; -
libcfa/src/concurrency/coroutine.cfa
r67ca73e re67a82d 215 215 return cor; 216 216 } 217 218 struct $coroutine * __cfactx_cor_active(void) { 219 return active_coroutine(); 220 } 217 221 } 218 222 -
libcfa/src/concurrency/invoke.c
r67ca73e re67a82d 10 10 // Created On : Tue Jan 17 12:27:26 2016 11 11 // Last Modified By : Peter A. Buhr 12 // Last Modified On : Thu Aug 20 18:54:34202013 // Update Count : 3 012 // Last Modified On : Thu Aug 20 23:43:23 2020 13 // Update Count : 31 14 14 // 15 15 … … 29 29 // Called from the kernel when starting a coroutine or task so must switch back to user mode. 30 30 31 extern struct $coroutine * __cfactx_cor_active(void); 31 32 extern struct $coroutine * __cfactx_cor_finish(void); 32 33 extern void __cfactx_cor_leave ( struct $coroutine * ); … … 35 36 extern void disable_interrupts() OPTIONAL_THREAD; 36 37 extern void enable_interrupts( __cfaabi_dbg_ctx_param ); 38 39 struct exception_context_t * this_exception_context() { 40 return &__get_stack( __cfactx_cor_active() )->exception_context; 41 } 37 42 38 43 void __cfactx_invoke_coroutine( … … 146 151 147 152 #elif defined( __ARM_ARCH_32 ) 148 #warning ARM needs to be upgrade to use two parameters like X86/X64 (A.K.A. : I broke this and do not know how to fix it) 153 #error ARM needs to be upgrade to use two parameters like X86/X64 (A.K.A. : I broke this and do not know how to fix it) 154 // More details about the error: 155 // To avoid the thunk problem, I changed the invoke routine to pass the main explicitly 156 // instead of relying on an assertion. This effectively hoists any required thunk one level 157 // which was enough to get to global scope in most cases. 158 // This means that __cfactx_invoke_... now takes two parameters and the FakeStack needs 159 // to be adjusted as a consequence of that. 160 // I don't know how to do that for ARM, hence the #error 161 149 162 struct FakeStack { 150 163 float fpRegs[16]; // floating point registers -
libcfa/src/concurrency/invoke.h
r67ca73e re67a82d 26 26 #ifndef _INVOKE_H_ 27 27 #define _INVOKE_H_ 28 29 struct __cfaehm_try_resume_node; 30 struct __cfaehm_base_exception_t; 31 struct exception_context_t { 32 struct __cfaehm_try_resume_node * top_resume; 33 struct __cfaehm_base_exception_t * current_exception; 34 }; 28 35 29 36 struct __stack_context_t { … … 51 58 // base of stack 52 59 void * base; 60 61 // Information for exception handling. 62 struct exception_context_t exception_context; 53 63 }; 54 64 … … 84 94 }; 85 95 86 static inline struct __stack_t * __get_stack( struct $coroutine * cor ) { return (struct __stack_t*)(((uintptr_t)cor->stack.storage) & ((uintptr_t)-2)); } 96 static inline struct __stack_t * __get_stack( struct $coroutine * cor ) { 97 return (struct __stack_t*)(((uintptr_t)cor->stack.storage) & ((uintptr_t)-2)); 98 } 99 100 struct exception_context_t * this_exception_context(); 87 101 88 102 // struct which calls the monitor is accepting -
libcfa/src/concurrency/io.cfa
r67ca73e re67a82d 41 41 #include "kernel/fwd.hfa" 42 42 #include "io/types.hfa" 43 44 // returns true of acquired as leader or second leader 45 static inline bool try_lock( __leaderlock_t & this ) { 46 const uintptr_t thrd = 1z | (uintptr_t)active_thread(); 47 bool block; 48 disable_interrupts(); 49 for() { 50 struct $thread * expected = this.value; 51 if( 1p != expected && 0p != expected ) { 52 /* paranoid */ verify( thrd != (uintptr_t)expected ); // We better not already be the next leader 53 enable_interrupts( __cfaabi_dbg_ctx ); 54 return false; 55 } 56 struct $thread * desired; 57 if( 0p == expected ) { 58 // If the lock isn't locked acquire it, no need to block 59 desired = 1p; 60 block = false; 61 } 62 else { 63 // If the lock is already locked try becomming the next leader 64 desired = (struct $thread *)thrd; 65 block = true; 66 } 67 if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break; 68 } 69 if( block ) { 70 enable_interrupts( __cfaabi_dbg_ctx ); 71 park( __cfaabi_dbg_ctx ); 72 disable_interrupts(); 73 } 74 return true; 75 } 76 77 static inline bool next( __leaderlock_t & this ) { 78 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 79 struct $thread * nextt; 80 for() { 81 struct $thread * expected = this.value; 82 /* paranoid */ verify( (1 & (uintptr_t)expected) == 1 ); // The lock better be locked 83 84 struct $thread * desired; 85 if( 1p == expected ) { 86 // No next leader, just unlock 87 desired = 0p; 88 nextt = 0p; 89 } 90 else { 91 // There is a next leader, remove but keep locked 92 desired = 1p; 93 nextt = (struct $thread *)(~1z & (uintptr_t)expected); 94 } 95 if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break; 96 } 97 98 if(nextt) { 99 unpark( nextt __cfaabi_dbg_ctx2 ); 100 enable_interrupts( __cfaabi_dbg_ctx ); 101 return true; 102 } 103 enable_interrupts( __cfaabi_dbg_ctx ); 104 return false; 105 } 43 106 44 107 //============================================================================================= … … 93 156 //============================================================================================= 94 157 static unsigned __collect_submitions( struct __io_data & ring ); 95 static uint32_t__release_consumed_submission( struct __io_data & ring );158 static __u32 __release_consumed_submission( struct __io_data & ring ); 96 159 97 160 static inline void process(struct io_uring_cqe & cqe ) { … … 100 163 101 164 data->result = cqe.res; 102 unpark( data->thrd __cfaabi_dbg_ctx2);165 post( data->sem ); 103 166 } 104 167 … … 136 199 unsigned head = *ring.completion_q.head; 137 200 unsigned tail = *ring.completion_q.tail; 138 const uint32_tmask = *ring.completion_q.mask;201 const __u32 mask = *ring.completion_q.mask; 139 202 140 203 // Nothing was new return 0 … … 143 206 } 144 207 145 uint32_tcount = tail - head;208 __u32 count = tail - head; 146 209 /* paranoid */ verify( count != 0 ); 147 210 for(i; count) { … … 182 245 __STATS__( true, 183 246 io.complete_q.completed_avg.val += count; 184 io.complete_q.completed_avg. fast_cnt += 1;247 io.complete_q.completed_avg.cnt += 1; 185 248 ) 186 249 enable_interrupts( __cfaabi_dbg_ctx ); … … 192 255 // We didn't get anything baton pass to the slow poller 193 256 else { 257 __STATS__( false, 258 io.complete_q.blocks += 1; 259 ) 194 260 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self); 195 261 reset = 0; … … 224 290 // 225 291 226 [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_tdata ) {292 [* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) { 227 293 /* paranoid */ verify( data != 0 ); 228 294 … … 230 296 __attribute((unused)) int len = 0; 231 297 __attribute((unused)) int block = 0; 232 uint32_tcnt = *ring.submit_q.num;233 uint32_tmask = *ring.submit_q.mask;298 __u32 cnt = *ring.submit_q.num; 299 __u32 mask = *ring.submit_q.mask; 234 300 235 301 disable_interrupts(); 236 uint32_toff = __tls_rand();302 __u32 off = __tls_rand(); 237 303 enable_interrupts( __cfaabi_dbg_ctx ); 238 304 … … 241 307 // Look through the list starting at some offset 242 308 for(i; cnt) { 243 uint64_texpected = 0;244 uint32_tidx = (i + off) & mask;309 __u64 expected = 0; 310 __u32 idx = (i + off) & mask; 245 311 struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx]; 246 volatile uint64_t * udata = (volatile uint64_t *)&sqe->user_data;312 volatile __u64 * udata = &sqe->user_data; 247 313 248 314 if( *udata == expected && … … 270 336 } 271 337 272 static inline uint32_t __submit_to_ready_array( struct __io_data & ring, uint32_t idx, const uint32_tmask ) {338 static inline __u32 __submit_to_ready_array( struct __io_data & ring, __u32 idx, const __u32 mask ) { 273 339 /* paranoid */ verify( idx <= mask ); 274 340 /* paranoid */ verify( idx != -1ul32 ); … … 277 343 __attribute((unused)) int len = 0; 278 344 __attribute((unused)) int block = 0; 279 uint32_tready_mask = ring.submit_q.ready_cnt - 1;345 __u32 ready_mask = ring.submit_q.ready_cnt - 1; 280 346 281 347 disable_interrupts(); 282 uint32_toff = __tls_rand();348 __u32 off = __tls_rand(); 283 349 enable_interrupts( __cfaabi_dbg_ctx ); 284 350 285 uint32_tpicked;351 __u32 picked; 286 352 LOOKING: for() { 287 353 for(i; ring.submit_q.ready_cnt) { 288 354 picked = (i + off) & ready_mask; 289 uint32_texpected = -1ul32;355 __u32 expected = -1ul32; 290 356 if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) { 291 357 break LOOKING; … … 297 363 298 364 block++; 299 if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) { 300 __release_consumed_submission( ring ); 301 unlock( ring.submit_q.lock ); 302 } 303 else { 365 366 __u32 released = __release_consumed_submission( ring ); 367 if( released == 0 ) { 304 368 yield(); 305 369 } … … 316 380 } 317 381 318 void __submit( struct io_context * ctx, uint32_tidx ) __attribute__((nonnull (1))) {382 void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) { 319 383 __io_data & ring = *ctx->thrd.ring; 320 384 // Get now the data we definetely need 321 volatile uint32_t* const tail = ring.submit_q.tail;322 const uint32_tmask = *ring.submit_q.mask;385 volatile __u32 * const tail = ring.submit_q.tail; 386 const __u32 mask = *ring.submit_q.mask; 323 387 324 388 // There are 2 submission schemes, check which one we are using … … 332 396 } 333 397 else if( ring.eager_submits ) { 334 uint32_t picked = __submit_to_ready_array( ring, idx, mask ); 335 336 for() { 337 yield(); 338 339 // If some one else collected our index, we are done 340 #warning ABA problem 341 if( ring.submit_q.ready[picked] != idx ) { 398 __u32 picked = __submit_to_ready_array( ring, idx, mask ); 399 400 #if defined(LEADER_LOCK) 401 if( !try_lock(ring.submit_q.submit_lock) ) { 342 402 __STATS__( false, 343 403 io.submit_q.helped += 1; … … 345 405 return; 346 406 } 347 348 if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) { 407 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 408 __STATS__( true, 409 io.submit_q.leader += 1; 410 ) 411 #else 412 for() { 413 yield(); 414 415 if( try_lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2) ) { 416 __STATS__( false, 417 io.submit_q.leader += 1; 418 ) 419 break; 420 } 421 422 // If some one else collected our index, we are done 423 #warning ABA problem 424 if( ring.submit_q.ready[picked] != idx ) { 425 __STATS__( false, 426 io.submit_q.helped += 1; 427 ) 428 return; 429 } 430 349 431 __STATS__( false, 350 io.submit_q. leader+= 1;432 io.submit_q.busy += 1; 351 433 ) 352 break; 353 } 354 355 __STATS__( false, 356 io.submit_q.busy += 1; 357 ) 358 } 434 } 435 #endif 359 436 360 437 // We got the lock 438 // Collect the submissions 361 439 unsigned to_submit = __collect_submitions( ring ); 440 441 // Actually submit 362 442 int ret = __io_uring_enter( ring, to_submit, false ); 363 if( ret < 0 ) { 364 unlock(ring.submit_q.lock); 365 return; 366 } 367 368 /* paranoid */ verify( ret > 0 || to_submit == 0 || (ring.ring_flags & IORING_SETUP_SQPOLL) ); 443 444 #if defined(LEADER_LOCK) 445 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 446 next(ring.submit_q.submit_lock); 447 #else 448 unlock(ring.submit_q.submit_lock); 449 #endif 450 if( ret < 0 ) return; 369 451 370 452 // Release the consumed SQEs … … 372 454 373 455 // update statistics 374 __STATS__( true,456 __STATS__( false, 375 457 io.submit_q.submit_avg.rdy += to_submit; 376 458 io.submit_q.submit_avg.csm += ret; 377 459 io.submit_q.submit_avg.cnt += 1; 378 460 ) 379 380 unlock(ring.submit_q.lock);381 461 } 382 462 else { 383 463 // get mutual exclusion 384 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); 464 #if defined(LEADER_LOCK) 465 while(!try_lock(ring.submit_q.submit_lock)); 466 #else 467 lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2); 468 #endif 385 469 386 470 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0, … … 420 504 __release_consumed_submission( ring ); 421 505 422 unlock(ring.submit_q.lock); 506 #if defined(LEADER_LOCK) 507 next(ring.submit_q.submit_lock); 508 #else 509 unlock(ring.submit_q.submit_lock); 510 #endif 423 511 424 512 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret ); … … 426 514 } 427 515 516 // #define PARTIAL_SUBMIT 32 428 517 static unsigned __collect_submitions( struct __io_data & ring ) { 429 518 /* paranoid */ verify( ring.submit_q.ready != 0p ); … … 431 520 432 521 unsigned to_submit = 0; 433 uint32_t tail = *ring.submit_q.tail; 434 const uint32_t mask = *ring.submit_q.mask; 522 __u32 tail = *ring.submit_q.tail; 523 const __u32 mask = *ring.submit_q.mask; 524 #if defined(PARTIAL_SUBMIT) 525 #if defined(LEADER_LOCK) 526 #error PARTIAL_SUBMIT and LEADER_LOCK cannot co-exist 527 #endif 528 const __u32 cnt = ring.submit_q.ready_cnt > PARTIAL_SUBMIT ? PARTIAL_SUBMIT : ring.submit_q.ready_cnt; 529 const __u32 offset = ring.submit_q.prev_ready; 530 ring.submit_q.prev_ready += cnt; 531 #else 532 const __u32 cnt = ring.submit_q.ready_cnt; 533 const __u32 offset = 0; 534 #endif 435 535 436 536 // Go through the list of ready submissions 437 for( i; ring.submit_q.ready_cnt ) { 537 for( c; cnt ) { 538 __u32 i = (offset + c) % ring.submit_q.ready_cnt; 539 438 540 // replace any submission with the sentinel, to consume it. 439 uint32_tidx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);541 __u32 idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED); 440 542 441 543 // If it was already the sentinel, then we are done … … 453 555 } 454 556 455 static uint32_t__release_consumed_submission( struct __io_data & ring ) {456 const uint32_tsmask = *ring.submit_q.mask;557 static __u32 __release_consumed_submission( struct __io_data & ring ) { 558 const __u32 smask = *ring.submit_q.mask; 457 559 458 560 if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0; 459 uint32_tchead = *ring.submit_q.head;460 uint32_tphead = ring.submit_q.prev_head;561 __u32 chead = *ring.submit_q.head; 562 __u32 phead = ring.submit_q.prev_head; 461 563 ring.submit_q.prev_head = chead; 462 564 unlock(ring.submit_q.release_lock); 463 565 464 uint32_tcount = chead - phead;566 __u32 count = chead - phead; 465 567 for( i; count ) { 466 uint32_tidx = ring.submit_q.array[ (phead + i) & smask ];568 __u32 idx = ring.submit_q.array[ (phead + i) & smask ]; 467 569 ring.submit_q.sqes[ idx ].user_data = 0; 468 570 } -
libcfa/src/concurrency/io/setup.cfa
r67ca73e re67a82d 228 228 if( cluster_context ) { 229 229 cluster & cltr = *thrd.curr_cluster; 230 /* paranoid */ verify( cltr. nprocessors== 0 || &cltr == mainCluster );230 /* paranoid */ verify( cltr.idles.total == 0 || &cltr == mainCluster ); 231 231 /* paranoid */ verify( !ready_mutate_islocked() ); 232 232 … … 298 298 if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL; 299 299 300 uint32_t nentries = params_in.num_entries; 300 __u32 nentries = params_in.num_entries != 0 ? params_in.num_entries : 256; 301 if( !is_pow2(nentries) ) { 302 abort("ERROR: I/O setup 'num_entries' must be a power of 2\n"); 303 } 304 if( params_in.poller_submits && params_in.eager_submits ) { 305 abort("ERROR: I/O setup 'poller_submits' and 'eager_submits' cannot be used together\n"); 306 } 301 307 302 308 int fd = syscall(__NR_io_uring_setup, nentries, ¶ms ); … … 356 362 // Get the pointers from the kernel to fill the structure 357 363 // submit queue 358 sq.head = (volatile uint32_t*)(((intptr_t)sq.ring_ptr) + params.sq_off.head);359 sq.tail = (volatile uint32_t*)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);360 sq.mask = ( const uint32_t*)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);361 sq.num = ( const uint32_t*)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);362 sq.flags = ( uint32_t*)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);363 sq.dropped = ( uint32_t*)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);364 sq.array = ( uint32_t*)(((intptr_t)sq.ring_ptr) + params.sq_off.array);364 sq.head = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.head); 365 sq.tail = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail); 366 sq.mask = ( const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask); 367 sq.num = ( const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries); 368 sq.flags = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags); 369 sq.dropped = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped); 370 sq.array = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); 365 371 sq.prev_head = *sq.head; 366 372 367 373 { 368 const uint32_tnum = *sq.num;374 const __u32 num = *sq.num; 369 375 for( i; num ) { 370 376 sq.sqes[i].user_data = 0ul64; … … 372 378 } 373 379 374 (sq. lock){};380 (sq.submit_lock){}; 375 381 (sq.release_lock){}; 376 382 … … 382 388 sq.ready[i] = -1ul32; 383 389 } 390 sq.prev_ready = 0; 384 391 } 385 392 else { 386 393 sq.ready_cnt = 0; 387 394 sq.ready = 0p; 395 sq.prev_ready = 0; 388 396 } 389 397 390 398 // completion queue 391 cq.head = (volatile uint32_t*)(((intptr_t)cq.ring_ptr) + params.cq_off.head);392 cq.tail = (volatile uint32_t*)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);393 cq.mask = ( const uint32_t*)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);394 cq.num = ( const uint32_t*)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);395 cq.overflow = ( uint32_t*)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);396 cq.cqes 399 cq.head = (volatile __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.head); 400 cq.tail = (volatile __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail); 401 cq.mask = ( const __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask); 402 cq.num = ( const __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries); 403 cq.overflow = ( __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow); 404 cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes); 397 405 398 406 // some paranoid checks … … 442 450 void __ioctx_register($io_ctx_thread & ctx, struct epoll_event & ev) { 443 451 ev.events = EPOLLIN | EPOLLONESHOT; 444 ev.data.u64 = ( uint64_t)&ctx;452 ev.data.u64 = (__u64)&ctx; 445 453 int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_ADD, ctx.ring->fd, &ev); 446 454 if (ret < 0) { -
libcfa/src/concurrency/io/types.hfa
r67ca73e re67a82d 17 17 18 18 #if defined(CFA_HAVE_LINUX_IO_URING_H) 19 extern "C" { 20 #include <linux/types.h> 21 } 22 19 23 #include "bits/locks.hfa" 24 25 #define LEADER_LOCK 26 struct __leaderlock_t { 27 struct $thread * volatile value; // ($thread) next_leader | (bool:1) is_locked 28 }; 29 30 static inline void ?{}( __leaderlock_t & this ) { this.value = 0p; } 20 31 21 32 //----------------------------------------------------------------------- … … 23 34 struct __submition_data { 24 35 // Head and tail of the ring (associated with array) 25 volatile uint32_t* head;26 volatile uint32_t* tail;27 volatile uint32_tprev_head;36 volatile __u32 * head; 37 volatile __u32 * tail; 38 volatile __u32 prev_head; 28 39 29 40 // The actual kernel ring which uses head/tail 30 41 // indexes into the sqes arrays 31 uint32_t* array;42 __u32 * array; 32 43 33 44 // number of entries and mask to go with it 34 const uint32_t* num;35 const uint32_t* mask;45 const __u32 * num; 46 const __u32 * mask; 36 47 37 48 // Submission flags (Not sure what for) 38 uint32_t* flags;49 __u32 * flags; 39 50 40 51 // number of sqes not submitted (whatever that means) 41 uint32_t* dropped;52 __u32 * dropped; 42 53 43 54 // Like head/tail but not seen by the kernel 44 volatile uint32_t * ready; 45 uint32_t ready_cnt; 55 volatile __u32 * ready; 56 __u32 ready_cnt; 57 __u32 prev_ready; 46 58 47 __spinlock_t lock; 48 __spinlock_t release_lock; 59 #if defined(LEADER_LOCK) 60 __leaderlock_t submit_lock; 61 #else 62 __spinlock_t submit_lock; 63 #endif 64 __spinlock_t release_lock; 49 65 50 66 // A buffer of sqes (not the actual ring) … … 58 74 struct __completion_data { 59 75 // Head and tail of the ring 60 volatile uint32_t* head;61 volatile uint32_t* tail;76 volatile __u32 * head; 77 volatile __u32 * tail; 62 78 63 79 // number of entries and mask to go with it 64 const uint32_t* mask;65 const uint32_t* num;80 const __u32 * mask; 81 const __u32 * num; 66 82 67 83 // number of cqes not submitted (whatever that means) 68 uint32_t* overflow;84 __u32 * overflow; 69 85 70 86 // the kernel ring … … 79 95 struct __submition_data submit_q; 80 96 struct __completion_data completion_q; 81 uint32_tring_flags;97 __u32 ring_flags; 82 98 int fd; 83 99 bool eager_submits:1; … … 89 105 // IO user data 90 106 struct __io_user_data_t { 91 int32_tresult;92 $thread * thrd;107 __s32 result; 108 oneshot sem; 93 109 }; 94 110 -
libcfa/src/concurrency/iocall.cfa
r67ca73e re67a82d 32 32 #include "io/types.hfa" 33 33 34 extern [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_tdata );35 extern void __submit( struct io_context * ctx, uint32_tidx ) __attribute__((nonnull (1)));36 37 static inline void ?{}(struct io_uring_sqe & this, uint8_topcode, int fd) {34 extern [* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ); 35 extern void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))); 36 37 static inline void ?{}(struct io_uring_sqe & this, __u8 opcode, int fd) { 38 38 this.opcode = opcode; 39 39 #if !defined(IOSQE_ASYNC) … … 51 51 } 52 52 53 static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd, void * addr, uint32_t len, uint64_toff ) {53 static inline void ?{}(struct io_uring_sqe & this, __u8 opcode, int fd, void * addr, __u32 len, __u64 off ) { 54 54 (this){ opcode, fd }; 55 55 this.off = off; 56 this.addr = ( uint64_t)(uintptr_t)addr;56 this.addr = (__u64)(uintptr_t)addr; 57 57 this.len = len; 58 58 } … … 101 101 #endif 102 102 103 104 103 #define __submit_prelude \ 105 104 if( 0 != (submit_flags & LINK_FLAGS) ) { errno = ENOTSUP; return -1; } \ 106 105 (void)timeout; (void)cancellation; \ 107 106 if( !context ) context = __get_io_context(); \ 108 __io_user_data_t data = { 0 , active_thread()}; \107 __io_user_data_t data = { 0 }; \ 109 108 struct __io_data & ring = *context->thrd.ring; \ 110 109 struct io_uring_sqe * sqe; \ 111 uint32_t idx; \ 112 [sqe, idx] = __submit_alloc( ring, (uint64_t)(uintptr_t)&data ); \ 113 sqe->flags = REGULAR_FLAGS & submit_flags; 110 __u32 idx; \ 111 __u8 sflags = REGULAR_FLAGS & submit_flags; \ 112 [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&data ); \ 113 sqe->flags = sflags; 114 114 115 115 #define __submit_wait \ 116 116 /*__cfaabi_bits_print_safe( STDERR_FILENO, "Preparing user data %p for %p\n", &data, data.thrd );*/ \ 117 verify( sqe->user_data == ( uint64_t)(uintptr_t)&data ); \117 verify( sqe->user_data == (__u64)(uintptr_t)&data ); \ 118 118 __submit( context, idx ); \ 119 park( __cfaabi_dbg_ctx); \119 wait( data.sem ); \ 120 120 if( data.result < 0 ) { \ 121 121 errno = -data.result; \ … … 149 149 150 150 extern int fsync(int fd); 151 extern int sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags); 151 152 #if __OFF_T_MATCHES_OFF64_T 153 typedef __off64_t off_t; 154 #else 155 typedef __off_t off_t; 156 #endif 157 typedef __off64_t off64_t; 158 extern int sync_file_range(int fd, off64_t offset, off64_t nbytes, unsigned int flags); 152 159 153 160 struct msghdr; … … 160 167 extern int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen); 161 168 162 extern int fallocate(int fd, int mode, uint64_t offset, uint64_t len);163 extern int posix_fadvise(int fd, uint64_t offset, uint64_t len, int advice);169 extern int fallocate(int fd, int mode, off_t offset, off_t len); 170 extern int posix_fadvise(int fd, off_t offset, off_t len, int advice); 164 171 extern int madvise(void *addr, size_t length, int advice); 165 172 … … 186 193 __submit_prelude 187 194 188 (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset }; 195 sqe->opcode = IORING_OP_READV; 196 sqe->ioprio = 0; 197 sqe->fd = fd; 198 sqe->off = offset; 199 sqe->addr = (__u64)iov; 200 sqe->len = iovcnt; 201 sqe->rw_flags = 0; 202 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0; 189 203 190 204 __submit_wait … … 200 214 __submit_prelude 201 215 202 (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset }; 216 sqe->opcode = IORING_OP_WRITEV; 217 sqe->ioprio = 0; 218 sqe->fd = fd; 219 sqe->off = offset; 220 sqe->addr = (__u64)iov; 221 sqe->len = iovcnt; 222 sqe->rw_flags = 0; 223 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0; 203 224 204 225 __submit_wait … … 213 234 __submit_prelude 214 235 215 (*sqe){ IORING_OP_FSYNC, fd }; 216 217 __submit_wait 218 #endif 219 } 220 221 int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 236 sqe->opcode = IORING_OP_FSYNC; 237 sqe->ioprio = 0; 238 sqe->fd = fd; 239 sqe->off = 0; 240 sqe->addr = 0; 241 sqe->len = 0; 242 sqe->rw_flags = 0; 243 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0; 244 245 __submit_wait 246 #endif 247 } 248 249 int cfa_sync_file_range(int fd, off64_t offset, off64_t nbytes, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 222 250 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SYNC_FILE_RANGE) 223 251 return sync_file_range(fd, offset, nbytes, flags); … … 268 296 269 297 (*sqe){ IORING_OP_SEND, sockfd }; 270 sqe->addr = ( uint64_t)buf;298 sqe->addr = (__u64)buf; 271 299 sqe->len = len; 272 300 sqe->msg_flags = flags; … … 283 311 284 312 (*sqe){ IORING_OP_RECV, sockfd }; 285 sqe->addr = ( uint64_t)buf;313 sqe->addr = (__u64)buf; 286 314 sqe->len = len; 287 315 sqe->msg_flags = flags; … … 298 326 299 327 (*sqe){ IORING_OP_ACCEPT, sockfd }; 300 sqe->addr = (uint64_t)(uintptr_t)addr;301 sqe->addr2 = ( uint64_t)(uintptr_t)addrlen;328 sqe->addr = (__u64)addr; 329 sqe->addr2 = (__u64)addrlen; 302 330 sqe->accept_flags = flags; 303 331 … … 313 341 314 342 (*sqe){ IORING_OP_CONNECT, sockfd }; 315 sqe->addr = ( uint64_t)(uintptr_t)addr;316 sqe->off = ( uint64_t)(uintptr_t)addrlen;317 318 __submit_wait 319 #endif 320 } 321 322 int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {343 sqe->addr = (__u64)addr; 344 sqe->off = (__u64)addrlen; 345 346 __submit_wait 347 #endif 348 } 349 350 int cfa_fallocate(int fd, int mode, off_t offset, off_t len, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 323 351 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FALLOCATE) 324 352 return fallocate( fd, mode, offset, len ); … … 337 365 } 338 366 339 int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {367 int cfa_fadvise(int fd, off_t offset, off_t len, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 340 368 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FADVISE) 341 369 return posix_fadvise( fd, offset, len, advice ); … … 344 372 345 373 (*sqe){ IORING_OP_FADVISE, fd }; 346 sqe->off = ( uint64_t)offset;374 sqe->off = (__u64)offset; 347 375 sqe->len = len; 348 376 sqe->fadvise_advice = advice; … … 359 387 360 388 (*sqe){ IORING_OP_MADVISE, 0 }; 361 sqe->addr = ( uint64_t)addr;389 sqe->addr = (__u64)addr; 362 390 sqe->len = length; 363 391 sqe->fadvise_advice = advice; … … 374 402 375 403 (*sqe){ IORING_OP_OPENAT, dirfd }; 376 sqe->addr = ( uint64_t)pathname;404 sqe->addr = (__u64)pathname; 377 405 sqe->open_flags = flags; 378 406 sqe->len = mode; … … 407 435 __submit_prelude 408 436 409 (*sqe){ IORING_OP_STATX, dirfd, pathname, mask, ( uint64_t)statxbuf };437 (*sqe){ IORING_OP_STATX, dirfd, pathname, mask, (__u64)statxbuf }; 410 438 sqe->statx_flags = flags; 411 439 … … 449 477 } 450 478 else { 451 sqe->off = ( uint64_t)-1;479 sqe->off = (__u64)-1; 452 480 } 453 481 sqe->len = len; … … 457 485 } 458 486 else { 459 sqe->splice_off_in = ( uint64_t)-1;487 sqe->splice_off_in = (__u64)-1; 460 488 } 461 489 sqe->splice_flags = flags | (SPLICE_FLAGS & submit_flags); -
libcfa/src/concurrency/kernel.cfa
r67ca73e re67a82d 102 102 // Kernel Scheduling logic 103 103 static $thread * __next_thread(cluster * this); 104 static bool __has_next_thread(cluster * this);104 static $thread * __next_thread_slow(cluster * this); 105 105 static void __run_thread(processor * this, $thread * dst); 106 static bool __wake_one(struct __processor_id_t * id, cluster * cltr); 107 static void __halt(processor * this); 108 bool __wake_proc(processor *); 106 static void __wake_one(struct __processor_id_t * id, cluster * cltr); 107 108 static void push (__cluster_idles & idles, processor & proc); 109 static void remove(__cluster_idles & idles, processor & proc); 110 static [unsigned idle, unsigned total, * processor] query( & __cluster_idles idles ); 111 109 112 110 113 //============================================================================================= … … 116 119 // Do it here 117 120 kernelTLS.rand_seed ^= rdtscl(); 121 kernelTLS.ready_rng.fwd_seed = 25214903917_l64u * (rdtscl() ^ (uintptr_t)&runner); 122 __tls_rand_advance_bck(); 118 123 119 124 processor * this = runner.proc; … … 134 139 135 140 $thread * readyThread = 0p; 136 for( unsigned int spin_count = 0;; spin_count++ ) { 141 MAIN_LOOP: 142 for() { 137 143 // Try to get the next thread 138 144 readyThread = __next_thread( this->cltr ); 139 145 140 // Check if we actually found a thread 141 if( readyThread ) { 142 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 143 /* paranoid */ verifyf( readyThread->state == Ready || readyThread->preempted != __NO_PREEMPTION, "state : %d, preempted %d\n", readyThread->state, readyThread->preempted); 144 /* paranoid */ verifyf( readyThread->link.next == 0p, "Expected null got %p", readyThread->link.next ); 145 __builtin_prefetch( readyThread->context.SP ); 146 147 // We found a thread run it 148 __run_thread(this, readyThread); 149 150 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 146 if( !readyThread ) { 147 readyThread = __next_thread_slow( this->cltr ); 151 148 } 152 149 153 if(__atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST)) break; 154 150 HALT: 155 151 if( !readyThread ) { 156 // Block until a thread is ready 157 __halt(this); 152 // Don't block if we are done 153 if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP; 154 155 #if !defined(__CFA_NO_STATISTICS__) 156 __tls_stats()->ready.sleep.halts++; 157 #endif 158 159 // Push self to idle stack 160 push(this->cltr->idles, * this); 161 162 // Confirm the ready-queue is empty 163 readyThread = __next_thread_slow( this->cltr ); 164 if( readyThread ) { 165 // A thread was found, cancel the halt 166 remove(this->cltr->idles, * this); 167 168 #if !defined(__CFA_NO_STATISTICS__) 169 __tls_stats()->ready.sleep.cancels++; 170 #endif 171 172 // continue the mai loop 173 break HALT; 174 } 175 176 #if !defined(__CFA_NO_STATISTICS__) 177 if(this->print_halts) { 178 __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 0\n", this->id, rdtscl()); 179 } 180 #endif 181 182 wait( this->idle ); 183 184 #if !defined(__CFA_NO_STATISTICS__) 185 if(this->print_halts) { 186 __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 1\n", this->id, rdtscl()); 187 } 188 #endif 189 190 // We were woken up, remove self from idle 191 remove(this->cltr->idles, * this); 192 193 // DON'T just proceed, start looking again 194 continue MAIN_LOOP; 158 195 } 196 197 /* paranoid */ verify( readyThread ); 198 199 // We found a thread run it 200 __run_thread(this, readyThread); 201 202 // Are we done? 203 if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP; 159 204 } 160 205 … … 181 226 // from the processor coroutine to the target thread 182 227 static void __run_thread(processor * this, $thread * thrd_dst) { 228 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 229 /* paranoid */ verifyf( thrd_dst->state == Ready || thrd_dst->preempted != __NO_PREEMPTION, "state : %d, preempted %d\n", thrd_dst->state, thrd_dst->preempted); 230 /* paranoid */ verifyf( thrd_dst->link.next == 0p, "Expected null got %p", thrd_dst->link.next ); 231 __builtin_prefetch( thrd_dst->context.SP ); 232 183 233 $coroutine * proc_cor = get_coroutine(this->runner); 184 234 … … 260 310 proc_cor->state = Active; 261 311 kernelTLS.this_thread = 0p; 312 313 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 262 314 } 263 315 … … 316 368 ready_schedule_lock ( id ); 317 369 push( thrd->curr_cluster, thrd ); 318 319 #if !defined(__CFA_NO_STATISTICS__) 320 bool woke = 321 #endif 322 __wake_one(id, thrd->curr_cluster); 323 324 #if !defined(__CFA_NO_STATISTICS__) 325 if(woke) __tls_stats()->ready.sleep.wakes++; 326 #endif 370 __wake_one(id, thrd->curr_cluster); 327 371 ready_schedule_unlock( id ); 328 372 … … 331 375 332 376 // KERNEL ONLY 333 static $thread * __next_thread(cluster * this) with( *this ) {377 static inline $thread * __next_thread(cluster * this) with( *this ) { 334 378 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 335 379 336 380 ready_schedule_lock ( (__processor_id_t*)kernelTLS.this_processor ); 337 $thread * head = pop( this );381 $thread * thrd = pop( this ); 338 382 ready_schedule_unlock( (__processor_id_t*)kernelTLS.this_processor ); 339 383 340 384 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 341 return head;385 return thrd; 342 386 } 343 387 344 388 // KERNEL ONLY 345 static bool __has_next_thread(cluster * this) with( *this ) {389 static inline $thread * __next_thread_slow(cluster * this) with( *this ) { 346 390 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 347 391 348 392 ready_schedule_lock ( (__processor_id_t*)kernelTLS.this_processor ); 349 bool not_empty = query( this );393 $thread * thrd = pop_slow( this ); 350 394 ready_schedule_unlock( (__processor_id_t*)kernelTLS.this_processor ); 351 395 352 396 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 353 return not_empty;397 return thrd; 354 398 } 355 399 … … 441 485 //============================================================================================= 442 486 // Wake a thread from the front if there are any 443 static bool __wake_one(struct __processor_id_t * id, cluster * this) { 487 static void __wake_one(struct __processor_id_t * id, cluster * this) { 488 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 444 489 /* paranoid */ verify( ready_schedule_islocked( id ) ); 445 490 446 491 // Check if there is a sleeping processor 447 processor * p = pop(this->idles); 492 processor * p; 493 unsigned idle; 494 unsigned total; 495 [idle, total, p] = query(this->idles); 448 496 449 497 // If no one is sleeping, we are done 450 if( 0p == p ) return false;498 if( idle == 0 ) return; 451 499 452 500 // We found a processor, wake it up 453 501 post( p->idle ); 454 502 455 return true; 503 #if !defined(__CFA_NO_STATISTICS__) 504 __tls_stats()->ready.sleep.wakes++; 505 #endif 506 507 /* paranoid */ verify( ready_schedule_islocked( id ) ); 508 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 509 510 return; 456 511 } 457 512 458 513 // Unconditionnaly wake a thread 459 bool__wake_proc(processor * this) {514 void __wake_proc(processor * this) { 460 515 __cfadbg_print_safe(runtime_core, "Kernel : waking Processor %p\n", this); 461 516 … … 464 519 bool ret = post( this->idle ); 465 520 enable_interrupts( __cfaabi_dbg_ctx ); 466 467 return ret; 468 } 469 470 static void __halt(processor * this) with( *this ) { 471 if( do_terminate ) return; 472 473 #if !defined(__CFA_NO_STATISTICS__) 474 __tls_stats()->ready.sleep.halts++; 475 #endif 476 // Push self to queue 477 push(cltr->idles, *this); 478 479 // Makre sure we don't miss a thread 480 if( __has_next_thread(cltr) ) { 481 // A thread was posted, make sure a processor is woken up 482 struct __processor_id_t *id = (struct __processor_id_t *) this; 483 ready_schedule_lock ( id ); 484 __wake_one( id, cltr ); 485 ready_schedule_unlock( id ); 486 #if !defined(__CFA_NO_STATISTICS__) 487 __tls_stats()->ready.sleep.cancels++; 488 #endif 489 } 490 491 #if !defined(__CFA_NO_STATISTICS__) 492 if(this->print_halts) { 493 __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 0\n", this->id, rdtscl()); 494 } 495 #endif 496 497 wait( idle ); 498 499 #if !defined(__CFA_NO_STATISTICS__) 500 if(this->print_halts) { 501 __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 1\n", this->id, rdtscl()); 502 } 503 #endif 521 } 522 523 static void push (__cluster_idles & this, processor & proc) { 524 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 525 lock( this ); 526 this.idle++; 527 /* paranoid */ verify( this.idle <= this.total ); 528 529 insert_first(this.list, proc); 530 unlock( this ); 531 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 532 } 533 534 static void remove(__cluster_idles & this, processor & proc) { 535 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 536 lock( this ); 537 this.idle--; 538 /* paranoid */ verify( this.idle >= 0 ); 539 540 remove(proc); 541 unlock( this ); 542 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 543 } 544 545 static [unsigned idle, unsigned total, * processor] query( & __cluster_idles this ) { 546 for() { 547 uint64_t l = __atomic_load_n(&this.lock, __ATOMIC_SEQ_CST); 548 if( 1 == (l % 2) ) { Pause(); continue; } 549 unsigned idle = this.idle; 550 unsigned total = this.total; 551 processor * proc = &this.list`first; 552 // Compiler fence is unnecessary, but gcc-8 and older incorrectly reorder code without it 553 asm volatile("": : :"memory"); 554 if(l != __atomic_load_n(&this.lock, __ATOMIC_SEQ_CST)) { Pause(); continue; } 555 return [idle, total, proc]; 556 } 504 557 } 505 558 -
libcfa/src/concurrency/kernel.hfa
r67ca73e re67a82d 20 20 #include "coroutine.hfa" 21 21 22 #include "containers/ stackLockFree.hfa"22 #include "containers/list.hfa" 23 23 24 24 extern "C" { … … 99 99 100 100 // Link lists fields 101 Link(processor) link;101 DLISTED_MGD_IMPL_IN(processor) 102 102 103 103 #if !defined(__CFA_NO_STATISTICS__) … … 119 119 static inline void ?{}(processor & this, const char name[]) { this{name, *mainCluster }; } 120 120 121 static inline Link(processor) * ?`next( processor * this ) { return &this->link; } 121 DLISTED_MGD_IMPL_OUT(processor) 122 122 123 123 //----------------------------------------------------------------------------- … … 206 206 void ^?{}(__ready_queue_t & this); 207 207 208 // Idle Sleep 209 struct __cluster_idles { 210 // Spin lock protecting the queue 211 volatile uint64_t lock; 212 213 // Total number of processors 214 unsigned total; 215 216 // Total number of idle processors 217 unsigned idle; 218 219 // List of idle processors 220 dlist(processor, processor) list; 221 }; 222 208 223 //----------------------------------------------------------------------------- 209 224 // Cluster … … 219 234 220 235 // List of idle processors 221 StackLF(processor) idles; 222 volatile unsigned int nprocessors; 236 __cluster_idles idles; 223 237 224 238 // List of threads -
libcfa/src/concurrency/kernel/fwd.hfa
r67ca73e re67a82d 50 50 uint64_t rand_seed; 51 51 #endif 52 struct { 53 uint64_t fwd_seed; 54 uint64_t bck_seed; 55 } ready_rng; 52 56 } kernelTLS __attribute__ ((tls_model ( "initial-exec" ))); 57 58 53 59 54 60 static inline uint64_t __tls_rand() { … … 58 64 return __xorshift64( kernelTLS.rand_seed ); 59 65 #endif 66 } 67 68 #define M (1_l64u << 48_l64u) 69 #define A (25214903917_l64u) 70 #define AI (18446708753438544741_l64u) 71 #define C (11_l64u) 72 #define D (16_l64u) 73 74 static inline unsigned __tls_rand_fwd() { 75 76 kernelTLS.ready_rng.fwd_seed = (A * kernelTLS.ready_rng.fwd_seed + C) & (M - 1); 77 return kernelTLS.ready_rng.fwd_seed >> D; 78 } 79 80 static inline unsigned __tls_rand_bck() { 81 unsigned int r = kernelTLS.ready_rng.bck_seed >> D; 82 kernelTLS.ready_rng.bck_seed = AI * (kernelTLS.ready_rng.bck_seed - C) & (M - 1); 83 return r; 84 } 85 86 #undef M 87 #undef A 88 #undef AI 89 #undef C 90 #undef D 91 92 static inline void __tls_rand_advance_bck(void) { 93 kernelTLS.ready_rng.bck_seed = kernelTLS.ready_rng.fwd_seed; 60 94 } 61 95 } -
libcfa/src/concurrency/kernel/startup.cfa
r67ca73e re67a82d 78 78 static void ?{}(processorCtx_t & this, processor * proc, current_stack_info_t * info); 79 79 80 #if defined(__CFA_WITH_VERIFY__) 81 static bool verify_fwd_bck_rng(void); 82 #endif 83 80 84 //----------------------------------------------------------------------------- 81 85 // Forward Declarations for other modules … … 87 91 //----------------------------------------------------------------------------- 88 92 // Other Forward Declarations 89 extern bool__wake_proc(processor *);93 extern void __wake_proc(processor *); 90 94 91 95 //----------------------------------------------------------------------------- … … 158 162 __cfa_dbg_global_clusters.list{ __get }; 159 163 __cfa_dbg_global_clusters.lock{}; 164 165 /* paranoid */ verify( verify_fwd_bck_rng() ); 160 166 161 167 // Initialize the global scheduler lock … … 475 481 #endif 476 482 477 int target = __atomic_add_fetch( &cltr->nprocessors, 1u, __ATOMIC_SEQ_CST ); 483 lock( this.cltr->idles ); 484 int target = this.cltr->idles.total += 1u; 485 unlock( this.cltr->idles ); 478 486 479 487 id = doregister((__processor_id_t*)&this); … … 493 501 // Not a ctor, it just preps the destruction but should not destroy members 494 502 static void deinit(processor & this) { 495 496 int target = __atomic_sub_fetch( &this.cltr->nprocessors, 1u, __ATOMIC_SEQ_CST ); 503 lock( this.cltr->idles ); 504 int target = this.cltr->idles.total -= 1u; 505 unlock( this.cltr->idles ); 497 506 498 507 // Lock the RWlock so no-one pushes/pops while we are changing the queue … … 501 510 // Adjust the ready queue size 502 511 ready_queue_shrink( this.cltr, target ); 503 504 // Make sure we aren't on the idle queue505 unsafe_remove( this.cltr->idles, &this );506 512 507 513 // Unlock the RWlock … … 516 522 ( this.terminated ){ 0 }; 517 523 ( this.runner ){}; 518 init( this, name, _cltr ); 524 525 disable_interrupts(); 526 init( this, name, _cltr ); 527 enable_interrupts( __cfaabi_dbg_ctx ); 519 528 520 529 __cfadbg_print_safe(runtime_core, "Kernel : Starting core %p\n", &this); … … 540 549 free( this.stack ); 541 550 542 deinit( this ); 551 disable_interrupts(); 552 deinit( this ); 553 enable_interrupts( __cfaabi_dbg_ctx ); 543 554 } 544 555 545 556 //----------------------------------------------------------------------------- 546 557 // Cluster 558 static void ?{}(__cluster_idles & this) { 559 this.lock = 0; 560 this.idle = 0; 561 this.total = 0; 562 (this.list){}; 563 } 564 547 565 void ?{}(cluster & this, const char name[], Duration preemption_rate, unsigned num_io, const io_context_params & io_params) with( this ) { 548 566 this.name = name; 549 567 this.preemption_rate = preemption_rate; 550 this.nprocessors = 0;551 568 ready_queue{}; 552 569 … … 666 683 return stack; 667 684 } 685 686 #if defined(__CFA_WITH_VERIFY__) 687 static bool verify_fwd_bck_rng(void) { 688 kernelTLS.ready_rng.fwd_seed = 25214903917_l64u * (rdtscl() ^ (uintptr_t)&verify_fwd_bck_rng); 689 690 unsigned values[10]; 691 for(i; 10) { 692 values[i] = __tls_rand_fwd(); 693 } 694 695 __tls_rand_advance_bck(); 696 697 for ( i; 9 -~= 0 ) { 698 if(values[i] != __tls_rand_bck()) { 699 return false; 700 } 701 } 702 703 return true; 704 } 705 #endif -
libcfa/src/concurrency/kernel_private.hfa
r67ca73e re67a82d 121 121 void unregister( struct __processor_id_t * proc ); 122 122 123 //----------------------------------------------------------------------- 124 // Cluster idle lock/unlock 125 static inline void lock(__cluster_idles & this) { 126 for() { 127 uint64_t l = this.lock; 128 if( 129 (0 == (l % 2)) 130 && __atomic_compare_exchange_n(&this.lock, &l, l + 1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) 131 ) return; 132 Pause(); 133 } 134 } 135 136 static inline void unlock(__cluster_idles & this) { 137 /* paranoid */ verify( 1 == (this.lock % 2) ); 138 __atomic_fetch_add( &this.lock, 1, __ATOMIC_SEQ_CST ); 139 } 140 123 141 //======================================================================= 124 142 // Reader-writer lock implementation … … 248 266 // pop thread from the ready queue of a cluster 249 267 // returns 0p if empty 268 // May return 0p spuriously 250 269 __attribute__((hot)) struct $thread * pop(struct cluster * cltr); 270 271 //----------------------------------------------------------------------- 272 // pop thread from the ready queue of a cluster 273 // returns 0p if empty 274 // guaranteed to find any threads added before this call 275 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr); 251 276 252 277 //----------------------------------------------------------------------- -
libcfa/src/concurrency/ready_queue.cfa
r67ca73e re67a82d 17 17 // #define __CFA_DEBUG_PRINT_READY_QUEUE__ 18 18 19 // #define USE_SNZI 20 19 21 #include "bits/defs.hfa" 20 22 #include "kernel_private.hfa" … … 148 150 // queues or removing them. 149 151 uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) { 152 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 153 150 154 // Step 1 : lock global lock 151 155 // It is needed to avoid processors that register mid Critical-Section … … 162 166 } 163 167 168 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 164 169 return s; 165 170 } 166 171 167 172 void ready_mutate_unlock( uint_fast32_t last_s ) with(*__scheduler_lock) { 173 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 174 168 175 // Step 1 : release local locks 169 176 // This must be done while the global lock is held to avoid … … 180 187 /*paranoid*/ assert(true == lock); 181 188 __atomic_store_n(&lock, (bool)false, __ATOMIC_RELEASE); 189 190 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 182 191 } 183 192 … … 192 201 void ^?{}(__ready_queue_t & this) with (this) { 193 202 verify( 1 == lanes.count ); 194 verify( !query( snzi ) ); 203 #ifdef USE_SNZI 204 verify( !query( snzi ) ); 205 #endif 195 206 free(lanes.data); 196 207 } … … 198 209 //----------------------------------------------------------------------- 199 210 __attribute__((hot)) bool query(struct cluster * cltr) { 200 return query(cltr->ready_queue.snzi); 211 #ifdef USE_SNZI 212 return query(cltr->ready_queue.snzi); 213 #endif 214 return true; 201 215 } 202 216 … … 262 276 bool lane_first = push(lanes.data[i], thrd); 263 277 264 // If this lane used to be empty we need to do more 265 if(lane_first) { 266 // Check if the entire queue used to be empty 267 first = !query(snzi); 268 269 // Update the snzi 270 arrive( snzi, i ); 271 } 278 #ifdef USE_SNZI 279 // If this lane used to be empty we need to do more 280 if(lane_first) { 281 // Check if the entire queue used to be empty 282 first = !query(snzi); 283 284 // Update the snzi 285 arrive( snzi, i ); 286 } 287 #endif 272 288 273 289 // Unlock and return … … 294 310 __attribute__((hot)) $thread * pop(struct cluster * cltr) with (cltr->ready_queue) { 295 311 /* paranoid */ verify( lanes.count > 0 ); 312 unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED ); 296 313 #if defined(BIAS) 297 314 // Don't bother trying locally too much … … 300 317 301 318 // As long as the list is not empty, try finding a lane that isn't empty and pop from it 302 while( query(snzi) ) { 319 #ifdef USE_SNZI 320 while( query(snzi) ) { 321 #else 322 for(25) { 323 #endif 303 324 // Pick two lists at random 304 325 unsigned i,j; … … 336 357 #endif 337 358 338 i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );339 j %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );359 i %= count; 360 j %= count; 340 361 341 362 // try popping from the 2 picked lists … … 353 374 } 354 375 376 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) { 377 /* paranoid */ verify( lanes.count > 0 ); 378 unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED ); 379 unsigned offset = __tls_rand(); 380 for(i; count) { 381 unsigned idx = (offset + i) % count; 382 struct $thread * thrd = try_pop(cltr, idx); 383 if(thrd) { 384 return thrd; 385 } 386 } 387 388 // All lanes where empty return 0p 389 return 0p; 390 } 391 392 355 393 //----------------------------------------------------------------------- 356 394 // Given 2 indexes, pick the list with the oldest push an try to pop from it … … 388 426 // Actually pop the list 389 427 struct $thread * thrd; 390 bool emptied; 391 [thrd, emptied] = pop(lane); 428 thrd = pop(lane); 392 429 393 430 /* paranoid */ verify(thrd); 394 431 /* paranoid */ verify(lane.lock); 395 432 396 // If this was the last element in the lane 397 if(emptied) { 398 depart( snzi, w ); 399 } 433 #ifdef USE_SNZI 434 // If this was the last element in the lane 435 if(emptied) { 436 depart( snzi, w ); 437 } 438 #endif 400 439 401 440 // Unlock and return … … 424 463 if(head(lane)->link.next == thrd) { 425 464 $thread * pthrd; 426 bool emptied; 427 [pthrd, emptied] = pop(lane); 465 pthrd = pop(lane); 428 466 429 467 /* paranoid */ verify( pthrd == thrd ); 430 468 431 469 removed = true; 432 if(emptied) { 433 depart( snzi, i ); 434 } 470 #ifdef USE_SNZI 471 if(emptied) { 472 depart( snzi, i ); 473 } 474 #endif 435 475 } 436 476 __atomic_unlock(&lane.lock); … … 494 534 // grow the ready queue 495 535 with( cltr->ready_queue ) { 496 ^(snzi){}; 536 #ifdef USE_SNZI 537 ^(snzi){}; 538 #endif 497 539 498 540 // Find new count … … 516 558 lanes.count = ncount; 517 559 518 // Re-create the snzi 519 snzi{ log2( lanes.count / 8 ) }; 520 for( idx; (size_t)lanes.count ) { 521 if( !is_empty(lanes.data[idx]) ) { 522 arrive(snzi, idx); 523 } 524 } 560 #ifdef USE_SNZI 561 // Re-create the snzi 562 snzi{ log2( lanes.count / 8 ) }; 563 for( idx; (size_t)lanes.count ) { 564 if( !is_empty(lanes.data[idx]) ) { 565 arrive(snzi, idx); 566 } 567 } 568 #endif 525 569 } 526 570 … … 542 586 543 587 with( cltr->ready_queue ) { 544 ^(snzi){}; 588 #ifdef USE_SNZI 589 ^(snzi){}; 590 #endif 545 591 546 592 // Remember old count … … 567 613 while(!is_empty(lanes.data[idx])) { 568 614 struct $thread * thrd; 569 __attribute__((unused)) bool _; 570 [thrd, _] = pop(lanes.data[idx]); 615 thrd = pop(lanes.data[idx]); 571 616 572 617 push(cltr, thrd); … … 596 641 } 597 642 598 // Re-create the snzi 599 snzi{ log2( lanes.count / 8 ) }; 600 for( idx; (size_t)lanes.count ) { 601 if( !is_empty(lanes.data[idx]) ) { 602 arrive(snzi, idx); 603 } 604 } 643 #ifdef USE_SNZI 644 // Re-create the snzi 645 snzi{ log2( lanes.count / 8 ) }; 646 for( idx; (size_t)lanes.count ) { 647 if( !is_empty(lanes.data[idx]) ) { 648 arrive(snzi, idx); 649 } 650 } 651 #endif 605 652 } 606 653 -
libcfa/src/concurrency/ready_subqueue.hfa
r67ca73e re67a82d 144 144 // returns popped 145 145 // returns true of lane was empty before push, false otherwise 146 [$thread *, bool]pop(__intrusive_lane_t & this) {146 $thread * pop(__intrusive_lane_t & this) { 147 147 /* paranoid */ verify(this.lock); 148 148 /* paranoid */ verify(this.before.link.ts != 0ul); … … 162 162 head->link.next = next; 163 163 next->link.prev = head; 164 node->link.[next, prev] = 0p; 164 node->link.next = 0p; 165 node->link.prev = 0p; 165 166 166 167 // Update head time stamp … … 180 181 /* paranoid */ verify(tail(this)->link.prev == head(this)); 181 182 /* paranoid */ verify(head(this)->link.next == tail(this)); 182 return [node, true];183 return node; 183 184 } 184 185 else { … … 187 188 /* paranoid */ verify(head(this)->link.next != tail(this)); 188 189 /* paranoid */ verify(this.before.link.ts != 0); 189 return [node, false];190 return node; 190 191 } 191 192 } -
libcfa/src/concurrency/stats.cfa
r67ca73e re67a82d 38 38 stats->io.submit_q.busy = 0; 39 39 stats->io.complete_q.completed_avg.val = 0; 40 stats->io.complete_q.completed_avg. slow_cnt = 0;41 stats->io.complete_q. completed_avg.fast_cnt= 0;40 stats->io.complete_q.completed_avg.cnt = 0; 41 stats->io.complete_q.blocks = 0; 42 42 #endif 43 43 } … … 60 60 61 61 #if defined(CFA_HAVE_LINUX_IO_URING_H) 62 __atomic_fetch_add( &cltr->io.submit_q.submit_avg.rdy , proc->io.submit_q.submit_avg.rdy, __ATOMIC_SEQ_CST );63 __atomic_fetch_add( &cltr->io.submit_q.submit_avg.csm , proc->io.submit_q.submit_avg.csm, __ATOMIC_SEQ_CST );64 __atomic_fetch_add( &cltr->io.submit_q.submit_avg.avl , proc->io.submit_q.submit_avg.avl, __ATOMIC_SEQ_CST );65 __atomic_fetch_add( &cltr->io.submit_q.submit_avg.cnt , proc->io.submit_q.submit_avg.cnt, __ATOMIC_SEQ_CST );66 __atomic_fetch_add( &cltr->io.submit_q.look_avg.val , proc->io.submit_q.look_avg.val, __ATOMIC_SEQ_CST );67 __atomic_fetch_add( &cltr->io.submit_q.look_avg.cnt , proc->io.submit_q.look_avg.cnt, __ATOMIC_SEQ_CST );68 __atomic_fetch_add( &cltr->io.submit_q.look_avg.block , proc->io.submit_q.look_avg.block, __ATOMIC_SEQ_CST );69 __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.val , proc->io.submit_q.alloc_avg.val, __ATOMIC_SEQ_CST );70 __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.cnt , proc->io.submit_q.alloc_avg.cnt, __ATOMIC_SEQ_CST );71 __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.block , proc->io.submit_q.alloc_avg.block, __ATOMIC_SEQ_CST );72 __atomic_fetch_add( &cltr->io.submit_q.helped , proc->io.submit_q.helped, __ATOMIC_SEQ_CST );73 __atomic_fetch_add( &cltr->io.submit_q.leader , proc->io.submit_q.leader, __ATOMIC_SEQ_CST );74 __atomic_fetch_add( &cltr->io.submit_q.busy , proc->io.submit_q.busy, __ATOMIC_SEQ_CST );75 __atomic_fetch_add( &cltr->io.complete_q.completed_avg.val , proc->io.complete_q.completed_avg.val, __ATOMIC_SEQ_CST );76 __atomic_fetch_add( &cltr->io.complete_q.completed_avg. slow_cnt, proc->io.complete_q.completed_avg.slow_cnt, __ATOMIC_SEQ_CST );77 __atomic_fetch_add( &cltr->io.complete_q. completed_avg.fast_cnt, proc->io.complete_q.completed_avg.fast_cnt, __ATOMIC_SEQ_CST );62 __atomic_fetch_add( &cltr->io.submit_q.submit_avg.rdy , proc->io.submit_q.submit_avg.rdy , __ATOMIC_SEQ_CST ); 63 __atomic_fetch_add( &cltr->io.submit_q.submit_avg.csm , proc->io.submit_q.submit_avg.csm , __ATOMIC_SEQ_CST ); 64 __atomic_fetch_add( &cltr->io.submit_q.submit_avg.avl , proc->io.submit_q.submit_avg.avl , __ATOMIC_SEQ_CST ); 65 __atomic_fetch_add( &cltr->io.submit_q.submit_avg.cnt , proc->io.submit_q.submit_avg.cnt , __ATOMIC_SEQ_CST ); 66 __atomic_fetch_add( &cltr->io.submit_q.look_avg.val , proc->io.submit_q.look_avg.val , __ATOMIC_SEQ_CST ); 67 __atomic_fetch_add( &cltr->io.submit_q.look_avg.cnt , proc->io.submit_q.look_avg.cnt , __ATOMIC_SEQ_CST ); 68 __atomic_fetch_add( &cltr->io.submit_q.look_avg.block , proc->io.submit_q.look_avg.block , __ATOMIC_SEQ_CST ); 69 __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.val , proc->io.submit_q.alloc_avg.val , __ATOMIC_SEQ_CST ); 70 __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.cnt , proc->io.submit_q.alloc_avg.cnt , __ATOMIC_SEQ_CST ); 71 __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.block , proc->io.submit_q.alloc_avg.block , __ATOMIC_SEQ_CST ); 72 __atomic_fetch_add( &cltr->io.submit_q.helped , proc->io.submit_q.helped , __ATOMIC_SEQ_CST ); 73 __atomic_fetch_add( &cltr->io.submit_q.leader , proc->io.submit_q.leader , __ATOMIC_SEQ_CST ); 74 __atomic_fetch_add( &cltr->io.submit_q.busy , proc->io.submit_q.busy , __ATOMIC_SEQ_CST ); 75 __atomic_fetch_add( &cltr->io.complete_q.completed_avg.val, proc->io.complete_q.completed_avg.val, __ATOMIC_SEQ_CST ); 76 __atomic_fetch_add( &cltr->io.complete_q.completed_avg.cnt, proc->io.complete_q.completed_avg.cnt, __ATOMIC_SEQ_CST ); 77 __atomic_fetch_add( &cltr->io.complete_q.blocks , proc->io.complete_q.blocks , __ATOMIC_SEQ_CST ); 78 78 #endif 79 79 } … … 154 154 "- avg alloc search len : %'18.2lf\n" 155 155 "- avg alloc search block : %'18.2lf\n" 156 "- total wait calls : %'15" PRIu64 " (%'" PRIu64 " slow, %'" PRIu64 " fast)\n"156 "- total wait calls : %'15" PRIu64 "\n" 157 157 "- avg completion/wait : %'18.2lf\n" 158 "- total completion blocks: %'15" PRIu64 "\n" 158 159 "\n" 159 160 , cluster ? "Cluster" : "Processor", name, id … … 165 166 , io.submit_q.alloc_avg.cnt 166 167 , aavgv, aavgb 167 , io.complete_q.completed_avg. slow_cnt + io.complete_q.completed_avg.fast_cnt168 , io.complete_q.completed_avg.slow_cnt, io.complete_q.completed_avg.fast_cnt169 , ((double)io.complete_q.completed_avg.val) / (io.complete_q.completed_avg.slow_cnt + io.complete_q.completed_avg.fast_cnt)168 , io.complete_q.completed_avg.cnt 169 , ((double)io.complete_q.completed_avg.val) / io.complete_q.completed_avg.cnt 170 , io.complete_q.blocks 170 171 ); 171 172 } -
libcfa/src/concurrency/stats.hfa
r67ca73e re67a82d 90 90 struct { 91 91 volatile uint64_t val; 92 volatile uint64_t slow_cnt; 93 volatile uint64_t fast_cnt; 92 volatile uint64_t cnt; 94 93 } completed_avg; 94 volatile uint64_t blocks; 95 95 } complete_q; 96 96 };
Note: See TracChangeset
for help on using the changeset viewer.