Changeset 78da4ab
- Timestamp:
- Feb 19, 2021, 1:47:09 PM (3 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 4f762d3
- Parents:
- b44959f
- Location:
- libcfa/src/concurrency
- Files:
-
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
rb44959f r78da4ab 17 17 18 18 #if defined(__CFA_DEBUG__) 19 //#define __CFA_DEBUG_PRINT_IO__20 //#define __CFA_DEBUG_PRINT_IO_CORE__19 #define __CFA_DEBUG_PRINT_IO__ 20 #define __CFA_DEBUG_PRINT_IO_CORE__ 21 21 #endif 22 22 … … 79 79 }; 80 80 81 // returns true of acquired as leader or second leader82 static inline bool try_lock( __leaderlock_t & this ) {83 const uintptr_t thrd = 1z | (uintptr_t)active_thread();84 bool block;85 disable_interrupts();86 for() {87 struct $thread * expected = this.value;88 if( 1p != expected && 0p != expected ) {89 /* paranoid */ verify( thrd != (uintptr_t)expected ); // We better not already be the next leader90 enable_interrupts( __cfaabi_dbg_ctx );91 return false;92 }93 struct $thread * desired;94 if( 0p == expected ) {95 // If the lock isn't locked acquire it, no need to block96 desired = 1p;97 block = false;98 }99 else {100 // If the lock is already locked try becomming the next leader101 desired = (struct $thread *)thrd;102 block = true;103 }104 if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break;105 }106 if( block ) {107 enable_interrupts( __cfaabi_dbg_ctx );108 park();109 disable_interrupts();110 }111 return true;112 }113 114 static inline bool next( __leaderlock_t & this ) {115 /* paranoid */ verify( ! __preemption_enabled() );116 struct $thread * nextt;117 for() {118 struct $thread * expected = this.value;119 /* paranoid */ verify( (1 & (uintptr_t)expected) == 1 ); // The lock better be locked120 121 struct $thread * desired;122 if( 1p == expected ) {123 // No next leader, just unlock124 desired = 0p;125 nextt = 0p;126 }127 else {128 // There is a next leader, remove but keep locked129 desired = 1p;130 nextt = (struct $thread *)(~1z & (uintptr_t)expected);131 }132 if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break;133 }134 135 if(nextt) {136 unpark( nextt );137 enable_interrupts( __cfaabi_dbg_ctx );138 return true;139 }140 enable_interrupts( __cfaabi_dbg_ctx );141 return false;142 }143 144 81 //============================================================================================= 145 82 // I/O Syscall 146 83 //============================================================================================= 147 static int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get ) {84 static int __io_uring_enter( struct $io_context & ctx, unsigned to_submit, bool get ) { 148 85 bool need_sys_to_submit = false; 149 86 bool need_sys_to_complete = false; … … 152 89 TO_SUBMIT: 153 90 if( to_submit > 0 ) { 154 if( !( ring.ring_flags & IORING_SETUP_SQPOLL) ) {91 if( !(ctx.ring_flags & IORING_SETUP_SQPOLL) ) { 155 92 need_sys_to_submit = true; 156 93 break TO_SUBMIT; 157 94 } 158 if( (* ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) {95 if( (*ctx.sq.flags) & IORING_SQ_NEED_WAKEUP ) { 159 96 need_sys_to_submit = true; 160 97 flags |= IORING_ENTER_SQ_WAKEUP; … … 162 99 } 163 100 164 if( get && !( ring.ring_flags & IORING_SETUP_SQPOLL) ) {101 if( get && !(ctx.ring_flags & IORING_SETUP_SQPOLL) ) { 165 102 flags |= IORING_ENTER_GETEVENTS; 166 if( ( ring.ring_flags & IORING_SETUP_IOPOLL) ) {103 if( (ctx.ring_flags & IORING_SETUP_IOPOLL) ) { 167 104 need_sys_to_complete = true; 168 105 } … … 171 108 int ret = 0; 172 109 if( need_sys_to_submit || need_sys_to_complete ) { 173 __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ring.fd, to_submit, flags); 174 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8); 175 __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING %d returned %d\n", ring.fd, ret); 176 177 if( ret < 0 ) { 178 switch((int)errno) { 179 case EAGAIN: 180 case EINTR: 181 case EBUSY: 182 ret = -1; 183 break; 184 default: 185 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 186 } 187 } 110 __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ctx.fd, to_submit, flags); 111 ret = syscall( __NR_io_uring_enter, ctx.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8); 112 __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING %d returned %d\n", ctx.fd, ret); 188 113 } 189 114 … … 196 121 // I/O Polling 197 122 //============================================================================================= 198 static unsigned __collect_submitions( struct __io_data & ring ); 199 static __u32 __release_consumed_submission( struct __io_data & ring ); 200 static inline void __clean( volatile struct io_uring_sqe * sqe ); 201 202 // Process a single completion message from the io_uring 203 // This is NOT thread-safe 204 static inline void process( volatile struct io_uring_cqe & cqe ) { 205 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 206 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); 207 208 fulfil( *future, cqe.res ); 209 } 210 211 static [int, bool] __drain_io( & struct __io_data ring ) { 212 /* paranoid */ verify( ! __preemption_enabled() ); 213 214 unsigned to_submit = 0; 215 if( ring.poller_submits ) { 216 // If the poller thread also submits, then we need to aggregate the submissions which are ready 217 to_submit = __collect_submitions( ring ); 218 } 219 220 int ret = __io_uring_enter(ring, to_submit, true); 123 static inline unsigned __flush( struct $io_context & ); 124 static inline __u32 __release_sqes( struct $io_context & ); 125 126 static [int, bool] __drain_io( & struct $io_context ctx ) { 127 unsigned to_submit = __flush( ctx ); 128 int ret = __io_uring_enter( ctx, to_submit, true ); 221 129 if( ret < 0 ) { 222 return [0, true]; 130 switch((int)errno) { 131 case EAGAIN: 132 case EINTR: 133 case EBUSY: 134 return [0, true]; 135 break; 136 default: 137 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 138 } 223 139 } 224 140 225 141 // update statistics 226 142 if (to_submit > 0) { 227 __STATS__( true,143 __STATS__( false, 228 144 if( to_submit > 0 ) { 229 145 io.submit_q.submit_avg.rdy += to_submit; … … 232 148 } 233 149 ) 234 } 235 236 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 150 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 151 152 /* paranoid */ verify( ctx.sq.to_submit >= ret ); 153 ctx.sq.to_submit -= ret; 154 155 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 156 157 if(ret) { 158 __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring\n", ret); 159 } 160 } 237 161 238 162 // Release the consumed SQEs 239 __release_ consumed_submission( ring);163 __release_sqes( ctx ); 240 164 241 165 // Drain the queue 242 unsigned head = * ring.completion_q.head;243 unsigned tail = * ring.completion_q.tail;244 const __u32 mask = * ring.completion_q.mask;166 unsigned head = *ctx.cq.head; 167 unsigned tail = *ctx.cq.tail; 168 const __u32 mask = *ctx.cq.mask; 245 169 246 170 // Nothing was new return 0 … … 253 177 for(i; count) { 254 178 unsigned idx = (head + i) & mask; 255 volatile struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];179 volatile struct io_uring_cqe & cqe = ctx.cq.cqes[idx]; 256 180 257 181 /* paranoid */ verify(&cqe); 258 182 259 process( cqe ); 183 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 184 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); 185 186 fulfil( *future, cqe.res ); 187 } 188 189 if(count) { 190 __cfadbg_print_safe(io, "Kernel I/O : %u completed\n", count); 260 191 } 261 192 262 193 // Mark to the kernel that the cqe has been seen 263 194 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 264 __atomic_ fetch_add( ring.completion_q.head,count, __ATOMIC_SEQ_CST );195 __atomic_store_n( ctx.cq.head, head + count, __ATOMIC_SEQ_CST ); 265 196 266 197 return [count, count > 0 || to_submit > 0]; 267 198 } 268 199 269 void main( $io_ctx_thread & this ) { 270 __ioctx_register( this ); 271 272 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.ring->fd, &this); 200 void main( $io_context & this ) { 201 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.fd, &this); 273 202 274 203 const int reset_cnt = 5; … … 276 205 // Then loop until we need to start 277 206 LOOP: 278 while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) { 207 while() { 208 waitfor( ^?{} : this) { 209 break LOOP; 210 } 211 or else {} 212 279 213 // Drain the io 280 214 int count; 281 215 bool again; 282 disable_interrupts(); 283 [count, again] = __drain_io( *this.ring ); 284 285 if(!again) reset--; 286 287 // Update statistics 288 __STATS__( true, 289 io.complete_q.completed_avg.val += count; 290 io.complete_q.completed_avg.cnt += 1; 291 ) 292 enable_interrupts( __cfaabi_dbg_ctx ); 216 [count, again] = __drain_io( this ); 217 218 if(!again) reset--; 219 220 // Update statistics 221 __STATS__( false, 222 io.complete_q.completed_avg.val += count; 223 io.complete_q.completed_avg.cnt += 1; 224 ) 293 225 294 226 // If we got something, just yield and check again … … 308 240 } 309 241 310 311 312 313 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.ring->fd, &this);314 315 316 242 __STATS__( false, 243 io.complete_q.blocks += 1; 244 ) 245 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.fd, &this); 246 247 // block this thread 248 wait( this.sem ); 317 249 318 250 // restore counter … … 320 252 } 321 253 322 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.ring->fd, &this); 323 324 __ioctx_unregister( this ); 254 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.fd, &this); 325 255 } 326 256 … … 345 275 // 346 276 277 static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor *, __u32 idxs[], __u32 want ); 278 static void __ioarbiter_submit ( $io_arbiter & mutex this, $io_context * , __u32 idxs[], __u32 have ); 279 static void __ioarbiter_flush ( $io_arbiter & mutex this, $io_context * ); 280 static inline void __ioarbiter_notify( $io_context & ctx ); 281 282 //============================================================================================= 283 // Allocation 284 // for user's convenience fill the sqes from the indexes 285 static inline void __fill(struct io_uring_sqe * out_sqes[], __u32 want, __u32 idxs[], struct $io_context * ctx) { 286 struct io_uring_sqe * sqes = ctx->sq.sqes; 287 for(i; want) { 288 out_sqes[i] = &sqes[idxs[i]]; 289 } 290 } 291 292 // Try to directly allocate from the a given context 293 // Not thread-safe 294 static inline bool __alloc(struct $io_context * ctx, __u32 idxs[], __u32 want) { 295 __sub_ring_t & sq = ctx->sq; 296 const __u32 mask = *sq.mask; 297 __u32 fhead = sq.free_ring.head; // get the current head of the queue 298 __u32 ftail = sq.free_ring.tail; // get the current tail of the queue 299 300 // If we don't have enough sqes, fail 301 if((ftail - fhead) < want) { return false; } 302 303 // copy all the indexes we want from the available list 304 for(i; want) { 305 idxs[i] = sq.free_ring.array[(fhead + i) & mask]; 306 } 307 308 // Advance the head to mark the indexes as consumed 309 __atomic_store_n(&sq.free_ring.head, fhead + want, __ATOMIC_RELEASE); 310 311 // return success 312 return true; 313 } 314 347 315 // Allocate an submit queue entry. 348 316 // The kernel cannot see these entries until they are submitted, but other threads must be … … 350 318 // for convenience, return both the index and the pointer to the sqe 351 319 // sqe == &sqes[idx] 352 [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) { 353 /* paranoid */ verify( data != 0 ); 354 355 // Prepare the data we need 356 __attribute((unused)) int len = 0; 357 __attribute((unused)) int block = 0; 358 __u32 cnt = *ring.submit_q.num; 359 __u32 mask = *ring.submit_q.mask; 360 361 __u32 off = thread_rand(); 362 363 // Loop around looking for an available spot 364 for() { 365 // Look through the list starting at some offset 366 for(i; cnt) { 367 __u64 expected = 3; 368 __u32 idx = (i + off) & mask; // Get an index from a random 369 volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx]; 370 volatile __u64 * udata = &sqe->user_data; 371 372 // Allocate the entry by CASing the user_data field from 0 to the future address 373 if( *udata == expected && 374 __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) 375 { 376 // update statistics 377 __STATS__( false, 378 io.submit_q.alloc_avg.val += len; 379 io.submit_q.alloc_avg.block += block; 380 io.submit_q.alloc_avg.cnt += 1; 381 ) 382 383 // debug log 384 __cfadbg_print_safe( io, "Kernel I/O : allocated [%p, %u] for %p (%p)\n", sqe, idx, active_thread(), (void*)data ); 385 386 // Success return the data 387 return [sqe, idx]; 388 } 389 verify(expected != data); 390 391 // This one was used 392 len ++; 393 } 394 395 block++; 396 397 yield(); 398 } 399 } 400 401 static inline __u32 __submit_to_ready_array( struct __io_data & ring, __u32 idx, const __u32 mask ) { 402 /* paranoid */ verify( idx <= mask ); 403 /* paranoid */ verify( idx != -1ul32 ); 404 405 // We need to find a spot in the ready array 406 __attribute((unused)) int len = 0; 407 __attribute((unused)) int block = 0; 408 __u32 ready_mask = ring.submit_q.ready_cnt - 1; 409 410 __u32 off = thread_rand(); 411 412 __u32 picked; 413 LOOKING: for() { 414 for(i; ring.submit_q.ready_cnt) { 415 picked = (i + off) & ready_mask; 416 __u32 expected = -1ul32; 417 if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) { 418 break LOOKING; 419 } 420 verify(expected != idx); 421 422 len ++; 423 } 424 425 block++; 426 427 __u32 released = __release_consumed_submission( ring ); 428 if( released == 0 ) { 429 yield(); 430 } 431 } 432 433 // update statistics 434 __STATS__( false, 435 io.submit_q.look_avg.val += len; 436 io.submit_q.look_avg.block += block; 437 io.submit_q.look_avg.cnt += 1; 438 ) 439 440 return picked; 441 } 442 443 void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) { 444 __io_data & ring = *ctx->thrd.ring; 445 320 struct $io_context * cfa_io_allocate(struct io_uring_sqe * sqes[], __u32 idxs[], __u32 want) { 321 __cfadbg_print_safe(io, "Kernel I/O : attempting to allocate %u\n", want); 322 323 disable_interrupts(); 324 processor * proc = __cfaabi_tls.this_processor; 325 /* paranoid */ verify( __cfaabi_tls.this_processor ); 326 /* paranoid */ verify( proc->io.lock == false ); 327 328 __atomic_store_n( &proc->io.lock, true, __ATOMIC_SEQ_CST ); 329 $io_context * ctx = proc->io.ctx; 330 $io_arbiter * ioarb = proc->cltr->io.arbiter; 331 /* paranoid */ verify( ioarb ); 332 333 // Can we proceed to the fast path 334 if( ctx // We alreay have an instance? 335 && !ctx->revoked ) // Our instance is still valid? 446 336 { 447 __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx]; 448 __cfadbg_print_safe( io, 449 "Kernel I/O : submitting %u (%p) for %p\n" 450 " data: %p\n" 451 " opcode: %s\n" 452 " fd: %d\n" 453 " flags: %d\n" 454 " prio: %d\n" 455 " off: %p\n" 456 " addr: %p\n" 457 " len: %d\n" 458 " other flags: %d\n" 459 " splice fd: %d\n" 460 " pad[0]: %llu\n" 461 " pad[1]: %llu\n" 462 " pad[2]: %llu\n", 463 idx, sqe, 464 active_thread(), 465 (void*)sqe->user_data, 466 opcodes[sqe->opcode], 467 sqe->fd, 468 sqe->flags, 469 sqe->ioprio, 470 (void*)sqe->off, 471 (void*)sqe->addr, 472 sqe->len, 473 sqe->accept_flags, 474 sqe->splice_fd_in, 475 sqe->__pad2[0], 476 sqe->__pad2[1], 477 sqe->__pad2[2] 478 ); 479 } 480 481 482 // Get now the data we definetely need 483 volatile __u32 * const tail = ring.submit_q.tail; 484 const __u32 mask = *ring.submit_q.mask; 485 486 // There are 2 submission schemes, check which one we are using 487 if( ring.poller_submits ) { 488 // If the poller thread submits, then we just need to add this to the ready array 489 __submit_to_ready_array( ring, idx, mask ); 490 491 post( ctx->thrd.sem ); 492 493 __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() ); 494 } 495 else if( ring.eager_submits ) { 496 __attribute__((unused)) __u32 picked = __submit_to_ready_array( ring, idx, mask ); 497 498 #if defined(LEADER_LOCK) 499 if( !try_lock(ring.submit_q.submit_lock) ) { 500 __STATS__( false, 501 io.submit_q.helped += 1; 502 ) 503 return; 504 } 505 /* paranoid */ verify( ! __preemption_enabled() ); 506 __STATS__( true, 507 io.submit_q.leader += 1; 508 ) 509 #else 510 for() { 511 yield(); 512 513 if( try_lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2) ) { 514 __STATS__( false, 515 io.submit_q.leader += 1; 516 ) 517 break; 518 } 519 520 // If some one else collected our index, we are done 521 #warning ABA problem 522 if( ring.submit_q.ready[picked] != idx ) { 523 __STATS__( false, 524 io.submit_q.helped += 1; 525 ) 526 return; 527 } 528 529 __STATS__( false, 530 io.submit_q.busy += 1; 531 ) 532 } 533 #endif 534 535 // We got the lock 536 // Collect the submissions 537 unsigned to_submit = __collect_submitions( ring ); 538 539 // Actually submit 540 int ret = __io_uring_enter( ring, to_submit, false ); 541 542 #if defined(LEADER_LOCK) 543 /* paranoid */ verify( ! __preemption_enabled() ); 544 next(ring.submit_q.submit_lock); 545 #else 546 unlock(ring.submit_q.submit_lock); 547 #endif 548 if( ret < 0 ) { 549 return; 550 } 551 552 // Release the consumed SQEs 553 __release_consumed_submission( ring ); 554 555 // update statistics 556 __STATS__( false, 557 io.submit_q.submit_avg.rdy += to_submit; 558 io.submit_q.submit_avg.csm += ret; 559 io.submit_q.submit_avg.cnt += 1; 560 ) 561 562 __cfadbg_print_safe( io, "Kernel I/O : submitted %u (among %u) for %p\n", idx, ret, active_thread() ); 563 } 564 else 337 __cfadbg_print_safe(io, "Kernel I/O : attempting to fast allocation\n"); 338 339 // We can proceed to the fast path 340 if( __alloc(ctx, idxs, want) ) { 341 // Allocation was successful 342 // Mark the instance as no longer in-use and re-enable interrupts 343 __atomic_store_n( &proc->io.lock, false, __ATOMIC_RELEASE ); 344 enable_interrupts( __cfaabi_dbg_ctx ); 345 346 __cfadbg_print_safe(io, "Kernel I/O : fast allocation successful\n"); 347 348 __fill( sqes, want, idxs, ctx ); 349 return ctx; 350 } 351 // The fast path failed, fallback 352 } 353 354 // Fast path failed, fallback on arbitration 355 __atomic_store_n( &proc->io.lock, false, __ATOMIC_RELEASE ); 356 enable_interrupts( __cfaabi_dbg_ctx ); 357 358 __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for allocation\n"); 359 360 struct $io_context * ret = __ioarbiter_allocate(*ioarb, proc, idxs, want); 361 362 __cfadbg_print_safe(io, "Kernel I/O : slow allocation completed\n"); 363 364 __fill( sqes, want, idxs,ret ); 365 return ret; 366 } 367 368 369 //============================================================================================= 370 // submission 371 static inline void __submit( struct $io_context * ctx, __u32 idxs[], __u32 have) { 372 // We can proceed to the fast path 373 // Get the right objects 374 __sub_ring_t & sq = ctx->sq; 375 const __u32 mask = *sq.mask; 376 __u32 tail = sq.kring.ready; 377 378 // Add the sqes to the array 379 for( i; have ) { 380 sq.kring.array[ (tail + i) & mask ] = idxs[i]; 381 } 382 383 // Make the sqes visible to the submitter 384 __atomic_store_n(&sq.kring.ready, tail + have, __ATOMIC_RELEASE); 385 386 // Make sure the poller is awake 387 __cfadbg_print_safe(io, "Kernel I/O : waking the poller\n"); 388 post( ctx->sem ); 389 } 390 391 void cfa_io_submit( struct $io_context * inctx, __u32 idxs[], __u32 have ) __attribute__((nonnull (1))) { 392 __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u\n", have); 393 394 disable_interrupts(); 395 processor * proc = __cfaabi_tls.this_processor; 396 /* paranoid */ verify( __cfaabi_tls.this_processor ); 397 /* paranoid */ verify( proc->io.lock == false ); 398 399 __atomic_store_n( &proc->io.lock, true, __ATOMIC_SEQ_CST ); 400 $io_context * ctx = proc->io.ctx; 401 402 // Can we proceed to the fast path 403 if( ctx // We alreay have an instance? 404 && !ctx->revoked // Our instance is still valid? 405 && ctx == inctx ) // We have the right instance? 565 406 { 566 // get mutual exclusion 567 #if defined(LEADER_LOCK) 568 while(!try_lock(ring.submit_q.submit_lock)); 569 #else 570 lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2); 571 #endif 572 573 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 3ul64, 574 /* paranoid */ "index %u already reclaimed\n" 575 /* paranoid */ "head %u, prev %u, tail %u\n" 576 /* paranoid */ "[-0: %u,-1: %u,-2: %u,-3: %u]\n", 577 /* paranoid */ idx, 578 /* paranoid */ *ring.submit_q.head, ring.submit_q.prev_head, *tail 579 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ] 580 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ] 581 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ] 582 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ] 583 /* paranoid */ ); 584 585 // Append to the list of ready entries 586 587 /* paranoid */ verify( idx <= mask ); 588 ring.submit_q.array[ (*tail) & mask ] = idx; 589 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 590 591 // Submit however, many entries need to be submitted 592 int ret = __io_uring_enter( ring, 1, false ); 593 if( ret < 0 ) { 594 switch((int)errno) { 595 default: 596 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) ); 597 } 598 } 599 600 /* paranoid */ verify(ret == 1); 601 602 // update statistics 603 __STATS__( false, 604 io.submit_q.submit_avg.csm += 1; 605 io.submit_q.submit_avg.cnt += 1; 606 ) 607 608 { 609 __attribute__((unused)) volatile __u32 * const head = ring.submit_q.head; 610 __attribute__((unused)) __u32 last_idx = ring.submit_q.array[ ((*head) - 1) & mask ]; 611 __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[last_idx]; 612 613 __cfadbg_print_safe( io, 614 "Kernel I/O : last submitted is %u (%p)\n" 615 " data: %p\n" 616 " opcode: %s\n" 617 " fd: %d\n" 618 " flags: %d\n" 619 " prio: %d\n" 620 " off: %p\n" 621 " addr: %p\n" 622 " len: %d\n" 623 " other flags: %d\n" 624 " splice fd: %d\n" 625 " pad[0]: %llu\n" 626 " pad[1]: %llu\n" 627 " pad[2]: %llu\n", 628 last_idx, sqe, 629 (void*)sqe->user_data, 630 opcodes[sqe->opcode], 631 sqe->fd, 632 sqe->flags, 633 sqe->ioprio, 634 (void*)sqe->off, 635 (void*)sqe->addr, 636 sqe->len, 637 sqe->accept_flags, 638 sqe->splice_fd_in, 639 sqe->__pad2[0], 640 sqe->__pad2[1], 641 sqe->__pad2[2] 642 ); 643 } 644 645 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 646 // Release the consumed SQEs 647 648 __release_consumed_submission( ring ); 649 // ring.submit_q.sqes[idx].user_data = 3ul64; 650 651 #if defined(LEADER_LOCK) 652 next(ring.submit_q.submit_lock); 653 #else 654 unlock(ring.submit_q.submit_lock); 655 #endif 656 657 __cfadbg_print_safe( io, "Kernel I/O : submitted %u for %p\n", idx, active_thread() ); 658 } 659 } 660 661 // #define PARTIAL_SUBMIT 32 662 663 // go through the list of submissions in the ready array and moved them into 664 // the ring's submit queue 665 static unsigned __collect_submitions( struct __io_data & ring ) { 666 /* paranoid */ verify( ring.submit_q.ready != 0p ); 667 /* paranoid */ verify( ring.submit_q.ready_cnt > 0 ); 668 669 unsigned to_submit = 0; 670 __u32 tail = *ring.submit_q.tail; 671 const __u32 mask = *ring.submit_q.mask; 672 #if defined(PARTIAL_SUBMIT) 673 #if defined(LEADER_LOCK) 674 #error PARTIAL_SUBMIT and LEADER_LOCK cannot co-exist 675 #endif 676 const __u32 cnt = ring.submit_q.ready_cnt > PARTIAL_SUBMIT ? PARTIAL_SUBMIT : ring.submit_q.ready_cnt; 677 const __u32 offset = ring.submit_q.prev_ready; 678 ring.submit_q.prev_ready += cnt; 679 #else 680 const __u32 cnt = ring.submit_q.ready_cnt; 681 const __u32 offset = 0; 682 #endif 683 684 // Go through the list of ready submissions 685 for( c; cnt ) { 686 __u32 i = (offset + c) % ring.submit_q.ready_cnt; 687 688 // replace any submission with the sentinel, to consume it. 689 __u32 idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED); 690 691 // If it was already the sentinel, then we are done 692 if( idx == -1ul32 ) continue; 693 694 // If we got a real submission, append it to the list 695 ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask; 696 to_submit++; 697 } 698 699 // Increment the tail based on how many we are ready to submit 700 __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST); 701 702 return to_submit; 703 } 407 __submit(ctx, idxs, have); 408 409 // Mark the instance as no longer in-use, re-enable interrupts and return 410 __atomic_store_n( &proc->io.lock, false, __ATOMIC_RELEASE ); 411 enable_interrupts( __cfaabi_dbg_ctx ); 412 413 __cfadbg_print_safe(io, "Kernel I/O : submitted on fast path\n"); 414 return; 415 } 416 417 // Fast path failed, fallback on arbitration 418 __atomic_store_n( &proc->io.lock, false, __ATOMIC_RELEASE ); 419 enable_interrupts( __cfaabi_dbg_ctx ); 420 421 __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for submission\n"); 422 423 __ioarbiter_submit(*inctx->arbiter, inctx, idxs, have); 424 } 425 426 //============================================================================================= 427 // Flushing 428 static unsigned __flush( struct $io_context & ctx ) { 429 // First check for external 430 if( !__atomic_load_n(&ctx.ext_sq.empty, __ATOMIC_SEQ_CST) ) { 431 // We have external submissions, delegate to the arbiter 432 __ioarbiter_flush( *ctx.arbiter, &ctx ); 433 } 434 435 __u32 tail = *ctx.sq.kring.tail; 436 __u32 ready = ctx.sq.kring.ready; 437 438 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 439 ctx.sq.to_submit += (ready - tail); 440 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 441 442 if(ctx.sq.to_submit) { 443 __cfadbg_print_safe(io, "Kernel I/O : %u ready to submit\n", ctx.sq.to_submit); 444 } 445 446 __atomic_store_n(ctx.sq.kring.tail, ready, __ATOMIC_RELEASE); 447 448 return ctx.sq.to_submit; 449 } 450 704 451 705 452 // Go through the ring's submit queue and release everything that has already been consumed 706 453 // by io_uring 707 static __u32 __release_consumed_submission( struct __io_data & ring ) { 708 const __u32 smask = *ring.submit_q.mask; 709 710 // We need to get the lock to copy the old head and new head 711 if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0; 454 // This cannot be done by multiple threads 455 static __u32 __release_sqes( struct $io_context & ctx ) { 456 const __u32 mask = *ctx.sq.mask; 457 712 458 __attribute__((unused)) 713 __u32 ctail = * ring.submit_q.tail;// get the current tail of the queue714 __u32 chead = * ring.submit_q.head;// get the current head of the queue715 __u32 phead = ring.submit_q.prev_head;// get the head the last time we were here716 ring.submit_q.prev_head = chead; // note up to were we processed 717 unlock(ring.submit_q.release_lock);459 __u32 ctail = *ctx.sq.kring.tail; // get the current tail of the queue 460 __u32 chead = *ctx.sq.kring.head; // get the current head of the queue 461 __u32 phead = ctx.sq.kring.released; // get the head the last time we were here 462 463 __u32 ftail = ctx.sq.free_ring.tail; // get the current tail of the queue 718 464 719 465 // the 3 fields are organized like this diagram … … 734 480 __u32 count = chead - phead; 735 481 482 if(count == 0) { 483 return 0; 484 } 485 736 486 // We acquired an previous-head/current-head range 737 487 // go through the range and release the sqes 738 488 for( i; count ) { 739 __u32 idx = ring.submit_q.array[ (phead + i) & smask ]; 740 741 /* paranoid */ verify( 0 != ring.submit_q.sqes[ idx ].user_data ); 742 __clean( &ring.submit_q.sqes[ idx ] ); 743 } 489 __u32 idx = ctx.sq.kring.array[ (phead + i) & mask ]; 490 ctx.sq.free_ring.array[ (ftail + i) & mask ] = idx; 491 } 492 493 ctx.sq.kring.released = chead; // note up to were we processed 494 __atomic_store_n(&ctx.sq.free_ring.tail, ftail + count, __ATOMIC_SEQ_CST); 495 496 __ioarbiter_notify(ctx); 497 744 498 return count; 745 499 } 746 500 747 void __sqe_clean( volatile struct io_uring_sqe * sqe ) { 748 __clean( sqe ); 749 } 750 751 static inline void __clean( volatile struct io_uring_sqe * sqe ) { 752 // If we are in debug mode, thrash the fields to make sure we catch reclamation errors 753 __cfaabi_dbg_debug_do( 754 memset(sqe, 0xde, sizeof(*sqe)); 755 sqe->opcode = (sizeof(opcodes) / sizeof(const char *)) - 1; 756 ); 757 758 // Mark the entry as unused 759 __atomic_store_n(&sqe->user_data, 3ul64, __ATOMIC_SEQ_CST); 501 //============================================================================================= 502 // I/O Arbiter 503 //============================================================================================= 504 static inline void __revoke( $io_arbiter & this, $io_context * ctx ) { 505 if(ctx->revoked) return; 506 507 remove( this.assigned, *ctx ); 508 509 // Mark as revoked 510 __atomic_store_n(&ctx->revoked, true, __ATOMIC_SEQ_CST); 511 512 // Wait for the processor to no longer use it 513 while(ctx->proc->io.lock) Pause(); 514 515 // Remove the coupling with the processor 516 ctx->proc->io.ctx = 0p; 517 ctx->proc = 0p; 518 519 // add to available contexts 520 addHead( this.available, *ctx ); 521 } 522 523 static inline void __assign( $io_arbiter & this, $io_context * ctx, processor * proc ) { 524 remove( this.available, *ctx ); 525 526 ctx->revoked = false; 527 ctx->proc = proc; 528 __atomic_store_n(&proc->io.ctx, ctx, __ATOMIC_SEQ_CST); 529 530 // add to assigned contexts 531 addTail( this.assigned, *ctx ); 532 } 533 534 static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor * proc, __u32 idxs[], __u32 want ) { 535 __cfadbg_print_safe(io, "Kernel I/O : arbiter allocating\n"); 536 537 SeqIter($io_context) iter; 538 $io_context & ci; 539 // Do we already have something available? 540 for( over( iter, this.available ); iter | ci;) { 541 __cfadbg_print_safe(io, "Kernel I/O : attempting available context\n"); 542 543 $io_context * c = &ci; 544 if(__alloc(c, idxs, want)) { 545 __assign( this, c, proc); 546 return c; 547 } 548 } 549 550 551 // Otherwise, we have no choice but to revoke everyone to check if other instance have available data 552 for( over( iter, this.assigned ); iter | ci; ) { 553 __cfadbg_print_safe(io, "Kernel I/O : revoking context for allocation\n"); 554 555 $io_context * c = &ci; 556 __revoke( this, c ); 557 558 if(__alloc(c, idxs, want)) { 559 __assign( this, c, proc); 560 return c; 561 } 562 } 563 564 __cfadbg_print_safe(io, "Kernel I/O : waiting for available resources\n"); 565 566 // No one has any resources left, wait for something to finish 567 // Mark as pending 568 __atomic_store_n( &this.pending.flag, true, __ATOMIC_SEQ_CST ); 569 570 // Wait for our turn to submit 571 wait( this.pending.blocked, want ); 572 573 __attribute((unused)) bool ret = 574 __alloc( this.pending.ctx, idxs, want); 575 /* paranoid */ verify( ret ); 576 577 __assign( this, this.pending.ctx, proc); 578 return this.pending.ctx; 579 } 580 581 static void __ioarbiter_notify( $io_arbiter & mutex this, $io_context * ctx ) { 582 /* paranoid */ verify( !is_empty(this.pending.blocked) ); 583 this.pending.ctx = ctx; 584 585 while( !is_empty(this.pending.blocked) ) { 586 __u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head; 587 __u32 want = front( this.pending.blocked ); 588 589 if( have > want ) return; 590 591 signal_block( this.pending.blocked ); 592 } 593 594 this.pending.flag = false; 595 } 596 597 static void __ioarbiter_notify( $io_context & ctx ) { 598 if(__atomic_load_n( &ctx.arbiter->pending.flag, __ATOMIC_SEQ_CST)) { 599 __ioarbiter_notify( *ctx.arbiter, &ctx ); 600 } 601 } 602 603 // Simply append to the pending 604 static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * ctx, __u32 idxs[], __u32 have ) { 605 __cfadbg_print_safe(io, "Kernel I/O : submitting %u from the arbiter to context %u\n", have, ctx->fd); 606 607 /* paranoid */ verify( &this == ctx->arbiter ); 608 609 // Mark as pending 610 __atomic_store_n( &ctx->ext_sq.empty, false, __ATOMIC_SEQ_CST ); 611 612 // Wake-up the poller 613 post( ctx->sem ); 614 615 __cfadbg_print_safe(io, "Kernel I/O : waiting to submit %u\n", have); 616 617 // Wait for our turn to submit 618 wait( ctx->ext_sq.blocked ); 619 620 // Submit our indexes 621 __submit(ctx, idxs, have); 622 623 __cfadbg_print_safe(io, "Kernel I/O : %u submitted from arbiter\n", have); 624 } 625 626 static void __ioarbiter_flush( $io_arbiter & mutex this, $io_context * ctx ) { 627 /* paranoid */ verify( &this == ctx->arbiter ); 628 629 __revoke( this, ctx ); 630 631 __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n"); 632 633 condition & blcked = ctx->ext_sq.blocked; 634 /* paranoid */ verify( ctx->ext_sq.empty == is_empty( blcked ) ); 635 while(!is_empty( blcked )) { 636 signal_block( blcked ); 637 } 638 639 ctx->ext_sq.empty = true; 640 } 641 642 void __ioarbiter_register( $io_arbiter & mutex this, $io_context & ctx ) { 643 __cfadbg_print_safe(io, "Kernel I/O : registering new context\n"); 644 645 ctx.arbiter = &this; 646 647 // add to available contexts 648 addHead( this.available, ctx ); 649 650 // Check if this solves pending allocations 651 if(this.pending.flag) { 652 __ioarbiter_notify( ctx ); 653 } 654 } 655 656 void __ioarbiter_unregister( $io_arbiter & mutex this, $io_context & ctx ) { 657 /* paranoid */ verify( &this == ctx.arbiter ); 658 659 __revoke( this, &ctx ); 660 661 remove( this.available, ctx ); 760 662 } 761 663 #endif -
libcfa/src/concurrency/io/call.cfa.in
rb44959f r78da4ab 54 54 | IOSQE_IO_DRAIN 55 55 #endif 56 #if defined(CFA_HAVE_IOSQE_IO_LINK) 57 | IOSQE_IO_LINK 58 #endif 59 #if defined(CFA_HAVE_IOSQE_IO_HARDLINK) 60 | IOSQE_IO_HARDLINK 61 #endif 56 62 #if defined(CFA_HAVE_IOSQE_ASYNC) 57 63 | IOSQE_ASYNC 58 64 #endif 59 ; 60 61 static const __u32 LINK_FLAGS = 0 62 #if defined(CFA_HAVE_IOSQE_IO_LINK) 63 | IOSQE_IO_LINK 64 #endif 65 #if defined(CFA_HAVE_IOSQE_IO_HARDLINK) 66 | IOSQE_IO_HARDLINK 65 #if defined(CFA_HAVE_IOSQE_BUFFER_SELECTED) 66 | IOSQE_BUFFER_SELECTED 67 67 #endif 68 68 ; … … 74 74 ; 75 75 76 extern [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ); 77 extern void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))); 78 79 static inline io_context * __get_io_context( void ) { 80 cluster * cltr = active_cluster(); 81 82 /* paranoid */ verifyf( cltr, "No active cluster for io operation\\n"); 83 assertf( cltr->io.cnt > 0, "Cluster %p has no default io contexts and no context was specified\\n", cltr ); 84 85 /* paranoid */ verifyf( cltr->io.ctxs, "default io contexts for cluster %p are missing\\n", cltr); 86 return &cltr->io.ctxs[ thread_rand() % cltr->io.cnt ]; 87 } 76 extern struct $io_context * cfa_io_allocate(struct io_uring_sqe * out_sqes[], __u32 out_idxs[], __u32 want) __attribute__((nonnull (1,2))); 77 extern void cfa_io_submit( struct $io_context * in_ctx, __u32 in_idxs[], __u32 have ) __attribute__((nonnull (1,2))); 88 78 #endif 89 79 … … 195 185 return ', '.join(args_a) 196 186 197 AsyncTemplate = """inline void async_{name}(io_future_t & future, {params}, int submit_flags , io_cancellation * cancellation, io_context * context) {{187 AsyncTemplate = """inline void async_{name}(io_future_t & future, {params}, int submit_flags) {{ 198 188 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_{op}) 199 189 ssize_t res = {name}({args}); … … 205 195 }} 206 196 #else 207 // we don't support LINK yet208 if( 0 != (submit_flags & LINK_FLAGS) ) {{209 errno = ENOTSUP; return -1;210 }}211 212 if( !context ) {{213 context = __get_io_context();214 }}215 if(cancellation) {{216 cancellation->target = (__u64)(uintptr_t)&future;217 }}218 219 197 __u8 sflags = REGULAR_FLAGS & submit_flags; 220 struct __io_data & ring = *context->thrd.ring;221 222 198 __u32 idx; 223 199 struct io_uring_sqe * sqe; 224 [(volatile struct io_uring_sqe *) sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future);200 struct $io_context * ctx = cfa_io_allocate( &sqe, &idx, 1 ); 225 201 226 202 sqe->opcode = IORING_OP_{op}; 203 sqe->user_data = (__u64)(uintptr_t)&future; 227 204 sqe->flags = sflags; 228 205 sqe->ioprio = 0; … … 239 216 240 217 verify( sqe->user_data == (__u64)(uintptr_t)&future ); 241 __submit( context, idx);218 cfa_io_submit( ctx, &idx, 1 ); 242 219 #endif 243 220 }}""" 244 221 245 SyncTemplate = """{ret} cfa_{name}({params}, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {{ 246 if( timeout >= 0 ) {{ 247 errno = ENOTSUP; 248 return -1; 249 }} 222 SyncTemplate = """{ret} cfa_{name}({params}, int submit_flags) {{ 250 223 io_future_t future; 251 224 252 async_{name}( future, {args}, submit_flags , cancellation, context);225 async_{name}( future, {args}, submit_flags ); 253 226 254 227 wait( future ); … … 415 388 if c.define: 416 389 print("""#if defined({define}) 417 {ret} cfa_{name}({params}, int submit_flags , Duration timeout, io_cancellation * cancellation, io_context * context);390 {ret} cfa_{name}({params}, int submit_flags); 418 391 #endif""".format(define=c.define,ret=c.ret, name=c.name, params=c.params)) 419 392 else: 420 print("{ret} cfa_{name}({params}, int submit_flags , Duration timeout, io_cancellation * cancellation, io_context * context);"393 print("{ret} cfa_{name}({params}, int submit_flags);" 421 394 .format(ret=c.ret, name=c.name, params=c.params)) 422 395 … … 426 399 if c.define: 427 400 print("""#if defined({define}) 428 void async_{name}(io_future_t & future, {params}, int submit_flags , io_cancellation * cancellation, io_context * context);401 void async_{name}(io_future_t & future, {params}, int submit_flags); 429 402 #endif""".format(define=c.define,name=c.name, params=c.params)) 430 403 else: 431 print("void async_{name}(io_future_t & future, {params}, int submit_flags , io_cancellation * cancellation, io_context * context);"404 print("void async_{name}(io_future_t & future, {params}, int submit_flags);" 432 405 .format(name=c.name, params=c.params)) 433 406 print("\n") … … 474 447 475 448 print(""" 476 //-----------------------------------------------------------------------------477 bool cancel(io_cancellation & this) {478 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ASYNC_CANCEL)479 return false;480 #else481 io_future_t future;482 483 io_context * context = __get_io_context();484 485 __u8 sflags = 0;486 struct __io_data & ring = *context->thrd.ring;487 488 __u32 idx;489 volatile struct io_uring_sqe * sqe;490 [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );491 492 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;493 sqe->opcode = IORING_OP_ASYNC_CANCEL;494 sqe->flags = sflags;495 sqe->addr = this.target;496 497 verify( sqe->user_data == (__u64)(uintptr_t)&future );498 __submit( context, idx );499 500 wait(future);501 502 if( future.result == 0 ) return true; // Entry found503 if( future.result == -EALREADY) return true; // Entry found but in progress504 if( future.result == -ENOENT ) return false; // Entry not found505 return false;506 #endif507 }508 509 449 //----------------------------------------------------------------------------- 510 450 // Check if a function is has asynchronous -
libcfa/src/concurrency/io/setup.cfa
rb44959f r78da4ab 36 36 void ?{}(io_context_params & this) {} 37 37 38 void ?{}(io_context & this, struct cluster & cl) {} 39 void ?{}(io_context & this, struct cluster & cl, const io_context_params & params) {} 40 41 void ^?{}(io_context & this) {} 42 void ^?{}(io_context & this, bool cluster_context) {} 43 44 void register_fixed_files( io_context &, int *, unsigned ) {} 45 void register_fixed_files( cluster &, int *, unsigned ) {} 38 void ?{}($io_context & this, struct cluster & cl) {} 39 void ^?{}($io_context & this) {} 40 41 $io_arbiter * create(void) { return 0p; } 42 void destroy($io_arbiter *) {} 46 43 47 44 #else … … 68 65 void ?{}(io_context_params & this) { 69 66 this.num_entries = 256; 70 this.num_ready = 256;71 this.submit_aff = -1;72 this.eager_submits = false;73 this.poller_submits = false;74 this.poll_submit = false;75 this.poll_complete = false;76 67 } 77 68 … … 194 185 195 186 for(i; nfds) { 196 $io_c tx_thread * io_ctx = ($io_ctx_thread*)(uintptr_t)events[i].data.u64;187 $io_context * io_ctx = ($io_context *)(uintptr_t)events[i].data.u64; 197 188 /* paranoid */ verify( io_ctx ); 198 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx-> ring->fd, io_ctx);189 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->fd, io_ctx); 199 190 #if !defined( __CFA_NO_STATISTICS__ ) 200 191 __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats; … … 202 193 203 194 eventfd_t v; 204 eventfd_read(io_ctx-> ring->efd, &v);195 eventfd_read(io_ctx->efd, &v); 205 196 206 197 post( io_ctx->sem ); … … 219 210 //============================================================================================= 220 211 221 void ?{}($io_ctx_thread & this, struct cluster & cl) { (this.self){ "IO Poller", cl }; } 222 void main( $io_ctx_thread & this ); 223 static inline $thread * get_thread( $io_ctx_thread & this ) { return &this.self; } 224 void ^?{}( $io_ctx_thread & mutex this ) {} 225 226 static void __io_create ( __io_data & this, const io_context_params & params_in ); 227 static void __io_destroy( __io_data & this ); 228 229 void ?{}(io_context & this, struct cluster & cl, const io_context_params & params) { 230 (this.thrd){ cl }; 231 this.thrd.ring = malloc(); 232 __cfadbg_print_safe(io_core, "Kernel I/O : Creating ring for io_context %p\n", &this); 233 __io_create( *this.thrd.ring, params ); 234 235 __cfadbg_print_safe(io_core, "Kernel I/O : Starting poller thread for io_context %p\n", &this); 236 this.thrd.done = false; 237 __thrd_start( this.thrd, main ); 238 239 __cfadbg_print_safe(io_core, "Kernel I/O : io_context %p ready\n", &this); 212 static void __io_uring_setup ( $io_context & this, const io_context_params & params_in ); 213 static void __io_uring_teardown( $io_context & this ); 214 static void __epoll_register($io_context & ctx); 215 static void __epoll_unregister($io_context & ctx); 216 void __ioarbiter_register( $io_arbiter & mutex, $io_context & ctx ); 217 void __ioarbiter_unregister( $io_arbiter & mutex, $io_context & ctx ); 218 219 void ?{}($io_context & this, struct cluster & cl) { 220 (this.self){ "IO Poller", cl }; 221 this.ext_sq.empty = true; 222 __io_uring_setup( this, cl.io.params ); 223 __cfadbg_print_safe(io_core, "Kernel I/O : Created ring for io_context %u (%p)\n", this.fd, &this); 224 225 __epoll_register(this); 226 227 __ioarbiter_register(*cl.io.arbiter, this); 228 229 __thrd_start( this, main ); 230 __cfadbg_print_safe(io_core, "Kernel I/O : Started poller thread for io_context %u\n", this.fd); 231 } 232 233 void ^?{}($io_context & mutex this) { 234 __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %u\n", this.fd); 235 236 ^(this.self){}; 237 __cfadbg_print_safe(io_core, "Kernel I/O : Stopped poller thread for io_context %u\n", this.fd); 238 239 __ioarbiter_unregister(*this.arbiter, this); 240 241 __epoll_unregister(this); 242 243 __io_uring_teardown( this ); 244 __cfadbg_print_safe(io_core, "Kernel I/O : Destroyed ring for io_context %u\n", this.fd); 240 245 } 241 246 242 247 void ?{}(io_context & this, struct cluster & cl) { 243 io_context_params params; 244 (this){ cl, params }; 245 } 246 247 void ^?{}(io_context & this, bool cluster_context) { 248 __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %p\n", &this); 249 250 // Notify the thread of the shutdown 251 __atomic_store_n(&this.thrd.done, true, __ATOMIC_SEQ_CST); 252 253 // If this is an io_context within a cluster, things get trickier 254 $thread & thrd = this.thrd.self; 255 if( cluster_context ) { 256 // We are about to do weird things with the threads 257 // we don't need interrupts to complicate everything 258 disable_interrupts(); 259 260 // Get cluster info 261 cluster & cltr = *thrd.curr_cluster; 262 /* paranoid */ verify( cltr.idles.total == 0 || &cltr == mainCluster ); 263 /* paranoid */ verify( !ready_mutate_islocked() ); 264 265 // We need to adjust the clean-up based on where the thread is 266 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) { 267 // This is the tricky case 268 // The thread was preempted or ready to run and now it is on the ready queue 269 // but the cluster is shutting down, so there aren't any processors to run the ready queue 270 // the solution is to steal the thread from the ready-queue and pretend it was blocked all along 271 272 ready_schedule_lock(); 273 // The thread should on the list 274 /* paranoid */ verify( thrd.link.next != 0p ); 275 276 // Remove the thread from the ready queue of this cluster 277 // The thread should be the last on the list 278 __attribute__((unused)) bool removed = remove_head( &cltr, &thrd ); 279 /* paranoid */ verify( removed ); 280 thrd.link.next = 0p; 281 thrd.link.prev = 0p; 282 283 // Fixup the thread state 284 thrd.state = Blocked; 285 thrd.ticket = TICKET_BLOCKED; 286 thrd.preempted = __NO_PREEMPTION; 287 288 ready_schedule_unlock(); 289 290 // Pretend like the thread was blocked all along 291 } 292 // !!! This is not an else if !!! 293 // Ok, now the thread is blocked (whether we cheated to get here or not) 294 if( thrd.state == Blocked ) { 295 // This is the "easy case" 296 // The thread is parked and can easily be moved to active cluster 297 verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster ); 298 thrd.curr_cluster = active_cluster(); 299 300 // unpark the fast io_poller 301 unpark( &thrd ); 302 } 303 else { 304 // The thread is in a weird state 305 // I don't know what to do here 306 abort("io_context poller thread is in unexpected state, cannot clean-up correctly\n"); 307 } 308 309 // The weird thread kidnapping stuff is over, restore interrupts. 310 enable_interrupts( __cfaabi_dbg_ctx ); 311 } else { 312 post( this.thrd.sem ); 313 } 314 315 ^(this.thrd){}; 316 __cfadbg_print_safe(io_core, "Kernel I/O : Stopped poller thread for io_context %p\n", &this); 317 318 __io_destroy( *this.thrd.ring ); 319 __cfadbg_print_safe(io_core, "Kernel I/O : Destroyed ring for io_context %p\n", &this); 320 321 free(this.thrd.ring); 248 // this.ctx = new(cl); 249 this.ctx = alloc(); 250 (*this.ctx){ cl }; 251 252 __cfadbg_print_safe(io_core, "Kernel I/O : io_context %u ready\n", this.ctx->fd); 322 253 } 323 254 324 255 void ^?{}(io_context & this) { 325 ^(this){ false }; 256 post( this.ctx->sem ); 257 258 delete(this.ctx); 326 259 } 327 260 … … 329 262 extern void __enable_interrupts_hard(); 330 263 331 static void __io_ create( __io_data& this, const io_context_params & params_in ) {264 static void __io_uring_setup( $io_context & this, const io_context_params & params_in ) { 332 265 // Step 1 : call to setup 333 266 struct io_uring_params params; 334 267 memset(¶ms, 0, sizeof(params)); 335 if( params_in.poll_submit ) params.flags |= IORING_SETUP_SQPOLL;336 if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL;268 // if( params_in.poll_submit ) params.flags |= IORING_SETUP_SQPOLL; 269 // if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL; 337 270 338 271 __u32 nentries = params_in.num_entries != 0 ? params_in.num_entries : 256; … … 340 273 abort("ERROR: I/O setup 'num_entries' must be a power of 2\n"); 341 274 } 342 if( params_in.poller_submits && params_in.eager_submits ) {343 abort("ERROR: I/O setup 'poller_submits' and 'eager_submits' cannot be used together\n");344 }345 275 346 276 int fd = syscall(__NR_io_uring_setup, nentries, ¶ms ); … … 350 280 351 281 // Step 2 : mmap result 352 memset( &this, 0, sizeof(struct __io_data) ); 353 struct __submition_data & sq = this.submit_q; 354 struct __completion_data & cq = this.completion_q; 282 struct __sub_ring_t & sq = this.sq; 283 struct __cmp_ring_t & cq = this.cq; 355 284 356 285 // calculate the right ring size … … 401 330 // Get the pointers from the kernel to fill the structure 402 331 // submit queue 403 sq.head = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.head); 404 sq.tail = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail); 405 sq.mask = ( const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask); 406 sq.num = ( const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries); 407 sq.flags = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags); 408 sq.dropped = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped); 409 sq.array = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); 410 sq.prev_head = *sq.head; 411 412 { 413 const __u32 num = *sq.num; 414 for( i; num ) { 415 __sqe_clean( &sq.sqes[i] ); 416 } 417 } 418 419 (sq.submit_lock){}; 420 (sq.release_lock){}; 421 422 if( params_in.poller_submits || params_in.eager_submits ) { 423 /* paranoid */ verify( is_pow2( params_in.num_ready ) || (params_in.num_ready < 8) ); 424 sq.ready_cnt = max( params_in.num_ready, 8 ); 425 sq.ready = alloc( sq.ready_cnt, 64`align ); 426 for(i; sq.ready_cnt) { 427 sq.ready[i] = -1ul32; 428 } 429 sq.prev_ready = 0; 430 } 431 else { 432 sq.ready_cnt = 0; 433 sq.ready = 0p; 434 sq.prev_ready = 0; 435 } 332 sq.kring.head = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.head); 333 sq.kring.tail = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail); 334 sq.kring.array = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); 335 sq.mask = ( const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask); 336 sq.num = ( const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries); 337 sq.flags = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags); 338 sq.dropped = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped); 339 340 sq.kring.ready = 0; 341 sq.kring.released = 0; 342 343 sq.free_ring.head = 0; 344 sq.free_ring.tail = *sq.num; 345 sq.free_ring.array = alloc( *sq.num, 128`align ); 346 for(i; (__u32)*sq.num) { 347 sq.free_ring.array[i] = i; 348 } 349 350 sq.to_submit = 0; 436 351 437 352 // completion queue … … 468 383 /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask ); 469 384 /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num ); 470 /* paranoid */ verifyf( (*sq. head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );471 /* paranoid */ verifyf( (*sq. tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );385 /* paranoid */ verifyf( (*sq.kring.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.kring.head ); 386 /* paranoid */ verifyf( (*sq.kring.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.kring.tail ); 472 387 473 388 // Update the global ring info 474 this.ring_flags = params.flags;389 this.ring_flags = 0; 475 390 this.fd = fd; 476 391 this.efd = efd; 477 this.eager_submits = params_in.eager_submits; 478 this.poller_submits = params_in.poller_submits; 479 } 480 481 static void __io_destroy( __io_data & this ) { 392 } 393 394 static void __io_uring_teardown( $io_context & this ) { 482 395 // Shutdown the io rings 483 struct __sub mition_data & sq = this.submit_q;484 struct __c ompletion_data & cq = this.completion_q;396 struct __sub_ring_t & sq = this.sq; 397 struct __cmp_ring_t & cq = this.cq; 485 398 486 399 // unmap the submit queue entries … … 499 412 close(this.efd); 500 413 501 free( this.s ubmit_q.ready ); // Maybe null, doesn't matter414 free( this.sq.free_ring.array ); // Maybe null, doesn't matter 502 415 } 503 416 … … 505 418 // I/O Context Sleep 506 419 //============================================================================================= 507 static inline void __ ioctx_epoll_ctl($io_ctx_thread& ctx, int op, const char * error) {420 static inline void __epoll_ctl($io_context & ctx, int op, const char * error) { 508 421 struct epoll_event ev; 509 422 ev.events = EPOLLIN | EPOLLONESHOT; 510 423 ev.data.u64 = (__u64)&ctx; 511 int ret = epoll_ctl(iopoll.epollfd, op, ctx. ring->efd, &ev);424 int ret = epoll_ctl(iopoll.epollfd, op, ctx.efd, &ev); 512 425 if (ret < 0) { 513 426 abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) ); … … 515 428 } 516 429 517 void __ioctx_register($io_ctx_thread & ctx) { 518 __ioctx_epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD"); 519 } 520 521 void __ioctx_prepare_block($io_ctx_thread & ctx) { 522 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.ring->fd, &ctx); 523 __ioctx_epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM"); 524 } 525 526 void __ioctx_unregister($io_ctx_thread & ctx) { 430 static void __epoll_register($io_context & ctx) { 431 __epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD"); 432 } 433 434 static void __epoll_unregister($io_context & ctx) { 527 435 // Read the current epoch so we know when to stop 528 436 size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST); 529 437 530 438 // Remove the fd from the iopoller 531 __ ioctx_epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE");439 __epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE"); 532 440 533 441 // Notify the io poller thread of the shutdown … … 543 451 } 544 452 453 void __ioctx_prepare_block($io_context & ctx) { 454 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.fd, &ctx); 455 __epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM"); 456 } 457 458 545 459 //============================================================================================= 546 460 // I/O Context Misc Setup 547 461 //============================================================================================= 548 void register_fixed_files( io_context & ctx, int * files, unsigned count ) { 549 int ret = syscall( __NR_io_uring_register, ctx.thrd.ring->fd, IORING_REGISTER_FILES, files, count ); 550 if( ret < 0 ) { 551 abort( "KERNEL ERROR: IO_URING REGISTER - (%d) %s\n", (int)errno, strerror(errno) ); 552 } 553 554 __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret ); 555 } 556 557 void register_fixed_files( cluster & cltr, int * files, unsigned count ) { 558 for(i; cltr.io.cnt) { 559 register_fixed_files( cltr.io.ctxs[i], files, count ); 560 } 561 } 462 void ?{}( $io_arbiter & this ) { 463 this.pending.flag = false; 464 } 465 466 void ^?{}( $io_arbiter & mutex this ) { 467 /* paranoid */ verify( empty(this.assigned) ); 468 /* paranoid */ verify( empty(this.available) ); 469 /* paranoid */ verify( is_empty(this.pending.blocked) ); 470 } 471 472 $io_arbiter * create(void) { 473 return new(); 474 } 475 void destroy($io_arbiter * arbiter) { 476 delete(arbiter); 477 } 478 479 //============================================================================================= 480 // I/O Context Misc Setup 481 //============================================================================================= 482 562 483 #endif -
libcfa/src/concurrency/io/types.hfa
rb44959f r78da4ab 25 25 26 26 #if defined(CFA_HAVE_LINUX_IO_URING_H) 27 #define LEADER_LOCK 28 struct __leaderlock_t { 29 struct $thread * volatile value; // ($thread) next_leader | (bool:1) is_locked 30 }; 27 #include "bits/sequence.hfa" 28 #include "monitor.hfa" 31 29 32 static inline void ?{}( __leaderlock_t & this ) { this.value = 0p; } 30 struct processor; 31 monitor $io_arbiter; 33 32 34 33 //----------------------------------------------------------------------- 35 34 // Ring Data structure 36 struct __submition_data { 37 // Head and tail of the ring (associated with array) 38 volatile __u32 * head; 39 volatile __u32 * tail; 40 volatile __u32 prev_head; 35 struct __sub_ring_t { 36 struct { 37 // Head and tail of the ring (associated with array) 38 volatile __u32 * head; // one passed last index consumed by the kernel 39 volatile __u32 * tail; // one passed last index visible to the kernel 40 volatile __u32 ready; // one passed last index added to array () 41 volatile __u32 released; // one passed last index released back to the free list 41 42 42 // The actual kernel ring which uses head/tail 43 // indexes into the sqes arrays 44 __u32 * array; 43 // The actual kernel ring which uses head/tail 44 // indexes into the sqes arrays 45 __u32 * array; 46 } kring; 47 48 struct { 49 volatile __u32 head; 50 volatile __u32 tail; 51 // The ring which contains free allocations 52 // indexes into the sqes arrays 53 __u32 * array; 54 } free_ring; 55 56 // number of sqes to submit on next system call. 57 __u32 to_submit; 45 58 46 59 // number of entries and mask to go with it … … 48 61 const __u32 * mask; 49 62 50 // Submission flags (Not sure what for)63 // Submission flags, currently only IORING_SETUP_SQPOLL 51 64 __u32 * flags; 52 65 53 // number of sqes not submitted (whatever that means) 66 // number of sqes not submitted 67 // From documentation : [dropped] is incremented for each invalid submission queue entry encountered in the ring buffer. 54 68 __u32 * dropped; 55 69 56 // Like head/tail but not seen by the kernel57 volatile __u32 * ready;58 __u32 ready_cnt;59 __u32 prev_ready;60 61 #if defined(LEADER_LOCK)62 __leaderlock_t submit_lock;63 #else64 __spinlock_t submit_lock;65 #endif66 __spinlock_t release_lock;67 68 70 // A buffer of sqes (not the actual ring) 69 volatilestruct io_uring_sqe * sqes;71 struct io_uring_sqe * sqes; 70 72 71 73 // The location and size of the mmaped area … … 74 76 }; 75 77 76 struct __c ompletion_data{78 struct __cmp_ring_t { 77 79 // Head and tail of the ring 78 80 volatile __u32 * head; … … 83 85 const __u32 * num; 84 86 85 // number of cqes not submitted (whatever that means)87 // I don't know what this value is for 86 88 __u32 * overflow; 87 89 … … 94 96 }; 95 97 96 struct __io_data { 97 struct __submition_data submit_q; 98 struct __completion_data completion_q; 98 struct __attribute__((aligned(128))) $io_context { 99 inline Seqable; 100 101 volatile bool revoked; 102 processor * proc; 103 104 $io_arbiter * arbiter; 105 106 struct { 107 volatile bool empty; 108 condition blocked; 109 } ext_sq; 110 111 struct __sub_ring_t sq; 112 struct __cmp_ring_t cq; 99 113 __u32 ring_flags; 100 114 int fd; 101 115 int efd; 102 bool eager_submits:1; 103 bool poller_submits:1; 116 117 single_sem sem; 118 $thread self; 119 }; 120 121 void main( $io_context & this ); 122 static inline $thread * get_thread ( $io_context & this ) __attribute__((const)) { return &this.self; } 123 static inline $monitor * get_monitor( $io_context & this ) __attribute__((const)) { return &this.self.self_mon; } 124 static inline $io_context *& Back( $io_context * n ) { return ($io_context *)Back( (Seqable *)n ); } 125 static inline $io_context *& Next( $io_context * n ) { return ($io_context *)Next( (Colable *)n ); } 126 void ^?{}( $io_context & mutex this ); 127 128 monitor __attribute__((aligned(128))) $io_arbiter { 129 struct { 130 condition blocked; 131 $io_context * ctx; 132 volatile bool flag; 133 } pending; 134 135 Sequence($io_context) assigned; 136 137 Sequence($io_context) available; 104 138 }; 105 139 … … 133 167 #endif 134 168 135 struct $io_ctx_thread; 136 void __ioctx_register($io_ctx_thread & ctx); 137 void __ioctx_unregister($io_ctx_thread & ctx); 138 void __ioctx_prepare_block($io_ctx_thread & ctx); 139 void __sqe_clean( volatile struct io_uring_sqe * sqe ); 169 void __ioctx_prepare_block($io_context & ctx); 140 170 #endif 141 171 -
libcfa/src/concurrency/iofwd.hfa
rb44959f r78da4ab 48 48 struct cluster; 49 49 struct io_future_t; 50 struct io_context; 51 struct io_cancellation; 50 struct $io_context; 52 51 53 52 struct iovec; … … 55 54 struct sockaddr; 56 55 struct statx; 56 struct epoll_event; 57 58 //---------- 59 // underlying calls 60 extern struct $io_context * cfa_io_allocate(struct io_uring_sqe * out_sqes[], __u32 out_idxs[], __u32 want) __attribute__((nonnull (1,2))); 61 extern void cfa_io_submit( struct $io_context * in_ctx, __u32 in_idxs[], __u32 have ) __attribute__((nonnull (1,2))); 57 62 58 63 //---------- 59 64 // synchronous calls 60 65 #if defined(CFA_HAVE_PREADV2) 61 extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags , Duration timeout, io_cancellation * cancellation, io_context * context);66 extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags); 62 67 #endif 63 68 #if defined(CFA_HAVE_PWRITEV2) 64 extern ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags , Duration timeout, io_cancellation * cancellation, io_context * context);69 extern ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags); 65 70 #endif 66 extern int cfa_fsync(int fd, int submit_flags , Duration timeout, io_cancellation * cancellation, io_context * context);67 extern int cfa_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event, int submit_flags , Duration timeout, io_cancellation * cancellation, io_context * context);68 extern int cfa_sync_file_range(int fd, off64_t offset, off64_t nbytes, unsigned int flags, int submit_flags , Duration timeout, io_cancellation * cancellation, io_context * context);69 extern ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, int submit_flags , Duration timeout, io_cancellation * cancellation, io_context * context);70 extern ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, int submit_flags , Duration timeout, io_cancellation * cancellation, io_context * context);71 extern ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, int submit_flags , Duration timeout, io_cancellation * cancellation, io_context * context);72 extern ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, int submit_flags , Duration timeout, io_cancellation * cancellation, io_context * context);73 extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags , Duration timeout, io_cancellation * cancellation, io_context * context);74 extern int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags , Duration timeout, io_cancellation * cancellation, io_context * context);75 extern int cfa_fallocate(int fd, int mode, off_t offset, off_t len, int submit_flags , Duration timeout, io_cancellation * cancellation, io_context * context);76 extern int cfa_posix_fadvise(int fd, off_t offset, off_t len, int advice, int submit_flags , Duration timeout, io_cancellation * cancellation, io_context * context);77 extern int cfa_madvise(void *addr, size_t length, int advice, int submit_flags , Duration timeout, io_cancellation * cancellation, io_context * context);78 extern int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags , Duration timeout, io_cancellation * cancellation, io_context * context);71 extern int cfa_fsync(int fd, int submit_flags); 72 extern int cfa_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event, int submit_flags); 73 extern int cfa_sync_file_range(int fd, off64_t offset, off64_t nbytes, unsigned int flags, int submit_flags); 74 extern ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, int submit_flags); 75 extern ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, int submit_flags); 76 extern ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, int submit_flags); 77 extern ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, int submit_flags); 78 extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags); 79 extern int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags); 80 extern int cfa_fallocate(int fd, int mode, off_t offset, off_t len, int submit_flags); 81 extern int cfa_posix_fadvise(int fd, off_t offset, off_t len, int advice, int submit_flags); 82 extern int cfa_madvise(void *addr, size_t length, int advice, int submit_flags); 83 extern int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags); 79 84 #if defined(CFA_HAVE_OPENAT2) 80 extern int cfa_openat2(int dirfd, const char *pathname, struct open_how * how, size_t size, int submit_flags , Duration timeout, io_cancellation * cancellation, io_context * context);85 extern int cfa_openat2(int dirfd, const char *pathname, struct open_how * how, size_t size, int submit_flags); 81 86 #endif 82 extern int cfa_close(int fd, int submit_flags , Duration timeout, io_cancellation * cancellation, io_context * context);87 extern int cfa_close(int fd, int submit_flags); 83 88 #if defined(CFA_HAVE_STATX) 84 extern int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags , Duration timeout, io_cancellation * cancellation, io_context * context);89 extern int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags); 85 90 #endif 86 extern ssize_t cfa_read(int fd, void * buf, size_t count, int submit_flags , Duration timeout, io_cancellation * cancellation, io_context * context);87 extern ssize_t cfa_write(int fd, void * buf, size_t count, int submit_flags , Duration timeout, io_cancellation * cancellation, io_context * context);88 extern ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags , Duration timeout, io_cancellation * cancellation, io_context * context);89 extern ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags , Duration timeout, io_cancellation * cancellation, io_context * context);91 extern ssize_t cfa_read(int fd, void * buf, size_t count, int submit_flags); 92 extern ssize_t cfa_write(int fd, void * buf, size_t count, int submit_flags); 93 extern ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags); 94 extern ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags); 90 95 91 96 //---------- 92 97 // asynchronous calls 93 98 #if defined(CFA_HAVE_PREADV2) 94 extern void async_preadv2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags , io_cancellation * cancellation, io_context * context);99 extern void async_preadv2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags); 95 100 #endif 96 101 #if defined(CFA_HAVE_PWRITEV2) 97 extern void async_pwritev2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags , io_cancellation * cancellation, io_context * context);102 extern void async_pwritev2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags); 98 103 #endif 99 extern void async_fsync(io_future_t & future, int fd, int submit_flags , io_cancellation * cancellation, io_context * context);100 extern void async_epoll_ctl(io_future_t & future, int epfd, int op, int fd, struct epoll_event *event, int submit_flags , io_cancellation * cancellation, io_context * context);101 extern void async_sync_file_range(io_future_t & future, int fd, off64_t offset, off64_t nbytes, unsigned int flags, int submit_flags , io_cancellation * cancellation, io_context * context);102 extern void async_sendmsg(io_future_t & future, int sockfd, const struct msghdr *msg, int flags, int submit_flags , io_cancellation * cancellation, io_context * context);103 extern void async_recvmsg(io_future_t & future, int sockfd, struct msghdr *msg, int flags, int submit_flags , io_cancellation * cancellation, io_context * context);104 extern void async_send(io_future_t & future, int sockfd, const void *buf, size_t len, int flags, int submit_flags , io_cancellation * cancellation, io_context * context);105 extern void async_recv(io_future_t & future, int sockfd, void *buf, size_t len, int flags, int submit_flags , io_cancellation * cancellation, io_context * context);106 extern void async_accept4(io_future_t & future, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags , io_cancellation * cancellation, io_context * context);107 extern void async_connect(io_future_t & future, int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags , io_cancellation * cancellation, io_context * context);108 extern void async_fallocate(io_future_t & future, int fd, int mode, off_t offset, off_t len, int submit_flags , io_cancellation * cancellation, io_context * context);109 extern void async_posix_fadvise(io_future_t & future, int fd, off_t offset, off_t len, int advice, int submit_flags , io_cancellation * cancellation, io_context * context);110 extern void async_madvise(io_future_t & future, void *addr, size_t length, int advice, int submit_flags , io_cancellation * cancellation, io_context * context);111 extern void async_openat(io_future_t & future, int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags , io_cancellation * cancellation, io_context * context);104 extern void async_fsync(io_future_t & future, int fd, int submit_flags); 105 extern void async_epoll_ctl(io_future_t & future, int epfd, int op, int fd, struct epoll_event *event, int submit_flags); 106 extern void async_sync_file_range(io_future_t & future, int fd, off64_t offset, off64_t nbytes, unsigned int flags, int submit_flags); 107 extern void async_sendmsg(io_future_t & future, int sockfd, const struct msghdr *msg, int flags, int submit_flags); 108 extern void async_recvmsg(io_future_t & future, int sockfd, struct msghdr *msg, int flags, int submit_flags); 109 extern void async_send(io_future_t & future, int sockfd, const void *buf, size_t len, int flags, int submit_flags); 110 extern void async_recv(io_future_t & future, int sockfd, void *buf, size_t len, int flags, int submit_flags); 111 extern void async_accept4(io_future_t & future, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags); 112 extern void async_connect(io_future_t & future, int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags); 113 extern void async_fallocate(io_future_t & future, int fd, int mode, off_t offset, off_t len, int submit_flags); 114 extern void async_posix_fadvise(io_future_t & future, int fd, off_t offset, off_t len, int advice, int submit_flags); 115 extern void async_madvise(io_future_t & future, void *addr, size_t length, int advice, int submit_flags); 116 extern void async_openat(io_future_t & future, int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags); 112 117 #if defined(CFA_HAVE_OPENAT2) 113 extern void async_openat2(io_future_t & future, int dirfd, const char *pathname, struct open_how * how, size_t size, int submit_flags , io_cancellation * cancellation, io_context * context);118 extern void async_openat2(io_future_t & future, int dirfd, const char *pathname, struct open_how * how, size_t size, int submit_flags); 114 119 #endif 115 extern void async_close(io_future_t & future, int fd, int submit_flags , io_cancellation * cancellation, io_context * context);120 extern void async_close(io_future_t & future, int fd, int submit_flags); 116 121 #if defined(CFA_HAVE_STATX) 117 extern void async_statx(io_future_t & future, int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags , io_cancellation * cancellation, io_context * context);122 extern void async_statx(io_future_t & future, int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags); 118 123 #endif 119 void async_read(io_future_t & future, int fd, void * buf, size_t count, int submit_flags , io_cancellation * cancellation, io_context * context);120 extern void async_write(io_future_t & future, int fd, void * buf, size_t count, int submit_flags , io_cancellation * cancellation, io_context * context);121 extern void async_splice(io_future_t & future, int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags , io_cancellation * cancellation, io_context * context);122 extern void async_tee(io_future_t & future, int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags , io_cancellation * cancellation, io_context * context);124 void async_read(io_future_t & future, int fd, void * buf, size_t count, int submit_flags); 125 extern void async_write(io_future_t & future, int fd, void * buf, size_t count, int submit_flags); 126 extern void async_splice(io_future_t & future, int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags); 127 extern void async_tee(io_future_t & future, int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags); 123 128 124 129 … … 126 131 // Check if a function is blocks a only the user thread 127 132 bool has_user_level_blocking( fptr_t func ); 128 129 //-----------------------------------------------------------------------------130 void register_fixed_files( io_context & ctx , int * files, unsigned count );131 void register_fixed_files( cluster & cltr, int * files, unsigned count ); -
libcfa/src/concurrency/kernel.hfa
rb44959f r78da4ab 41 41 42 42 //----------------------------------------------------------------------------- 43 // I/O 44 struct cluster; 45 struct $io_context; 46 struct $io_arbiter; 47 48 struct io_context_params { 49 int num_entries; 50 }; 51 52 void ?{}(io_context_params & this); 53 54 struct io_context { 55 $io_context * ctx; 56 cluster * cltr; 57 }; 58 void ?{}(io_context & this, struct cluster & cl); 59 void ^?{}(io_context & this); 60 61 //----------------------------------------------------------------------------- 43 62 // Processor 44 63 extern struct cluster * mainCluster; … … 78 97 pthread_t kernel_thread; 79 98 99 struct { 100 $io_context * volatile ctx; 101 volatile bool lock; 102 } io; 103 80 104 // Preemption data 81 105 // Node which is added in the discrete event simulaiton … … 116 140 117 141 DLISTED_MGD_IMPL_OUT(processor) 118 119 //-----------------------------------------------------------------------------120 // I/O121 struct __io_data;122 123 // IO poller user-thread124 // Not using the "thread" keyword because we want to control125 // more carefully when to start/stop it126 struct $io_ctx_thread {127 struct __io_data * ring;128 single_sem sem;129 volatile bool done;130 $thread self;131 };132 133 134 struct io_context {135 $io_ctx_thread thrd;136 };137 138 struct io_context_params {139 int num_entries;140 int num_ready;141 int submit_aff;142 bool eager_submits:1;143 bool poller_submits:1;144 bool poll_submit:1;145 bool poll_complete:1;146 };147 148 void ?{}(io_context_params & this);149 150 void ?{}(io_context & this, struct cluster & cl);151 void ?{}(io_context & this, struct cluster & cl, const io_context_params & params);152 void ^?{}(io_context & this);153 154 struct io_cancellation {155 __u64 target;156 };157 158 static inline void ?{}(io_cancellation & this) { this.target = -1u; }159 static inline void ^?{}(io_cancellation &) {}160 bool cancel(io_cancellation & this);161 142 162 143 //----------------------------------------------------------------------------- … … 244 225 245 226 struct { 246 io_context * ctxs;247 unsigned cnt;227 $io_arbiter * arbiter; 228 io_context_params params; 248 229 } io; 249 230 -
libcfa/src/concurrency/kernel/startup.cfa
rb44959f r78da4ab 104 104 KERNEL_STORAGE($thread, mainThread); 105 105 KERNEL_STORAGE(__stack_t, mainThreadCtx); 106 KERNEL_STORAGE(io_context, main PollerThread);106 KERNEL_STORAGE(io_context, mainIoContext); 107 107 KERNEL_STORAGE(__scheduler_RWLock_t, __scheduler_lock); 108 108 #if !defined(__CFA_NO_STATISTICS__) … … 231 231 __kernel_io_startup(); 232 232 233 io_context * mainio = (io_context *)&storage_mainIoContext; 234 (*mainio){ *mainCluster }; 235 233 236 // Add the main thread to the ready queue 234 237 // once resume is called on mainProcessor->runner the mainThread needs to be scheduled like any normal thread … … 243 246 // THE SYSTEM IS NOW COMPLETELY RUNNING 244 247 245 246 // SKULLDUGGERY: The constructor for the mainCluster will call alloc with a dimension of 0247 // malloc *can* return a non-null value, we should free it if that is the case248 free( mainCluster->io.ctxs );249 250 // Now that the system is up, finish creating systems that need threading251 mainCluster->io.ctxs = (io_context *)&storage_mainPollerThread;252 mainCluster->io.cnt = 1;253 (*mainCluster->io.ctxs){ *mainCluster };254 255 248 __cfadbg_print_safe(runtime_core, "Kernel : Started\n--------------------------------------------------\n\n"); 256 249 … … 263 256 static void __kernel_shutdown(void) { 264 257 //Before we start shutting things down, wait for systems that need threading to shutdown 265 ^(*mainCluster->io.ctxs){}; 266 mainCluster->io.cnt = 0; 267 mainCluster->io.ctxs = 0p; 258 io_context * mainio = (io_context *)&storage_mainIoContext; 259 ^(*mainio){}; 268 260 269 261 /* paranoid */ verify( __preemption_enabled() ); … … 486 478 pending_preemption = false; 487 479 480 this.io.ctx = 0p; 481 this.io.lock = false; 482 488 483 #if !defined(__CFA_NO_STATISTICS__) 489 484 print_stats = 0; … … 584 579 threads{ __get }; 585 580 581 io.arbiter = create(); 582 io.params = io_params; 583 586 584 doregister(this); 587 585 … … 596 594 ready_mutate_unlock( last_size ); 597 595 enable_interrupts_noPoll(); // Don't poll, could be in main cluster 598 599 600 this.io.cnt = num_io;601 this.io.ctxs = aalloc(num_io);602 for(i; this.io.cnt) {603 (this.io.ctxs[i]){ this, io_params };604 }605 596 } 606 597 607 598 void ^?{}(cluster & this) { 608 for(i; this.io.cnt) { 609 ^(this.io.ctxs[i]){ true }; 610 } 611 free(this.io.ctxs); 599 destroy(this.io.arbiter); 612 600 613 601 // Lock the RWlock so no-one pushes/pops while we are changing the queue -
libcfa/src/concurrency/kernel_private.hfa
rb44959f r78da4ab 77 77 //----------------------------------------------------------------------------- 78 78 // I/O 79 void ^?{}(io_context & this, bool ); 79 $io_arbiter * create(void); 80 void destroy($io_arbiter *); 80 81 81 82 //=======================================================================
Note: See TracChangeset
for help on using the changeset viewer.