Changeset dddb3dd0
- Timestamp:
- Mar 2, 2021, 1:58:12 PM (4 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 2cd784a
- Parents:
- 6047b00
- Location:
- libcfa/src
- Files:
-
- 11 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/bits/defs.hfa
r6047b00 rdddb3dd0 74 74 #error unsupported architecture 75 75 #endif 76 77 #define CFA_IO_LAZY (1_l64u << 32_l64u) -
libcfa/src/concurrency/io.cfa
r6047b00 rdddb3dd0 32 32 extern "C" { 33 33 #include <sys/syscall.h> 34 #include <sys/eventfd.h> 34 35 35 36 #include <linux/io_uring.h> … … 79 80 }; 80 81 81 //============================================================================================= 82 // I/O Syscall 83 //============================================================================================= 84 static int __io_uring_enter( struct $io_context & ctx, unsigned to_submit, bool get ) { 85 __STATS__( false, io.calls.count++; ) 86 bool need_sys_to_submit = false; 87 bool need_sys_to_complete = false; 88 unsigned flags = 0; 89 90 TO_SUBMIT: 91 if( to_submit > 0 ) { 92 if( !(ctx.ring_flags & IORING_SETUP_SQPOLL) ) { 93 need_sys_to_submit = true; 94 break TO_SUBMIT; 95 } 96 if( (*ctx.sq.flags) & IORING_SQ_NEED_WAKEUP ) { 97 need_sys_to_submit = true; 98 flags |= IORING_ENTER_SQ_WAKEUP; 99 } 100 } 101 102 if( get && !(ctx.ring_flags & IORING_SETUP_SQPOLL) ) { 103 flags |= IORING_ENTER_GETEVENTS; 104 if( (ctx.ring_flags & IORING_SETUP_IOPOLL) ) { 105 need_sys_to_complete = true; 106 } 107 } 108 109 int ret = 0; 110 if( need_sys_to_submit || need_sys_to_complete ) { 111 __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ctx.fd, to_submit, flags); 112 __STATS__( false, io.calls.blocks++; ) 113 ret = syscall( __NR_io_uring_enter, ctx.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8); 114 __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING %d returned %d\n", ctx.fd, ret); 115 } 116 117 // Memory barrier 118 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 119 return ret; 120 } 121 82 static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor *, __u32 idxs[], __u32 want ); 83 static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * , __u32 idxs[], __u32 have, bool lazy ); 84 static void __ioarbiter_flush ( $io_arbiter & mutex this, $io_context * ); 85 static inline void __ioarbiter_notify( $io_context & ctx ); 122 86 //============================================================================================= 123 87 // I/O Polling … … 126 90 static inline __u32 __release_sqes( struct $io_context & ); 127 91 128 static bool __drain_io( struct $io_context & ctx ) { 129 unsigned to_submit = __flush( ctx ); 130 int ret = __io_uring_enter( ctx, to_submit, true ); 92 void __cfa_io_drain( processor * proc ) { 93 /* paranoid */ verify( ! __preemption_enabled() ); 94 /* paranoid */ verify( proc ); 95 /* paranoid */ verify( proc->io.ctx ); 96 97 // Drain the queue 98 $io_context * ctx = proc->io.ctx; 99 unsigned head = *ctx->cq.head; 100 unsigned tail = *ctx->cq.tail; 101 const __u32 mask = *ctx->cq.mask; 102 103 __u32 count = tail - head; 104 __STATS__( false, io.calls.drain++; io.calls.completed += count; ) 105 106 for(i; count) { 107 unsigned idx = (head + i) & mask; 108 volatile struct io_uring_cqe & cqe = ctx->cq.cqes[idx]; 109 110 /* paranoid */ verify(&cqe); 111 112 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 113 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); 114 115 fulfil( *future, cqe.res ); 116 } 117 118 __cfadbg_print_safe(io, "Kernel I/O : %u completed\n", count); 119 120 // Mark to the kernel that the cqe has been seen 121 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 122 __atomic_store_n( ctx->cq.head, head + count, __ATOMIC_SEQ_CST ); 123 124 /* paranoid */ verify( ! __preemption_enabled() ); 125 126 return; 127 } 128 129 void __cfa_io_flush( processor * proc ) { 130 /* paranoid */ verify( ! __preemption_enabled() ); 131 /* paranoid */ verify( proc ); 132 /* paranoid */ verify( proc->io.ctx ); 133 134 $io_context & ctx = *proc->io.ctx; 135 136 if(!ctx.ext_sq.empty) { 137 __ioarbiter_flush( *ctx.arbiter, &ctx ); 138 } 139 140 __STATS__( true, io.calls.flush++; ) 141 int ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, 0, 0, (sigset_t *)0p, _NSIG / 8); 131 142 if( ret < 0 ) { 132 143 switch((int)errno) { … … 136 147 // Update statistics 137 148 __STATS__( false, io.calls.errors.busy ++; ) 138 return true; 139 break; 149 return; 140 150 default: 141 151 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); … … 143 153 } 144 154 145 // update statistics 146 if (to_submit > 0) { 147 __STATS__( false, io.calls.submitted += ret; ) 148 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 149 150 /* paranoid */ verify( ctx.sq.to_submit >= ret ); 151 ctx.sq.to_submit -= ret; 152 153 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 154 155 if(ret) { 156 __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring\n", ret); 157 } 158 } 155 __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd); 156 __STATS__( true, io.calls.submitted += ret; ) 157 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 158 /* paranoid */ verify( ctx.sq.to_submit >= ret ); 159 160 ctx.sq.to_submit -= ret; 161 162 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 159 163 160 164 // Release the consumed SQEs 161 165 __release_sqes( ctx ); 162 166 163 // Drain the queue 164 unsigned head = *ctx.cq.head; 165 unsigned tail = *ctx.cq.tail; 166 const __u32 mask = *ctx.cq.mask; 167 168 // Nothing was new return 0 169 if (head == tail) { 170 return ctx.sq.to_submit > 0; 171 } 172 173 __u32 count = tail - head; 174 /* paranoid */ verify( count != 0 ); 175 __STATS__( false, io.calls.completed += count; ) 176 177 for(i; count) { 178 unsigned idx = (head + i) & mask; 179 volatile struct io_uring_cqe & cqe = ctx.cq.cqes[idx]; 180 181 /* paranoid */ verify(&cqe); 182 183 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 184 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); 185 186 fulfil( *future, cqe.res ); 187 } 188 189 if(count) { 190 __cfadbg_print_safe(io, "Kernel I/O : %u completed\n", count); 191 } 192 193 // Mark to the kernel that the cqe has been seen 194 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 195 __atomic_store_n( ctx.cq.head, head + count, __ATOMIC_SEQ_CST ); 196 197 return count > 0 || to_submit > 0; 198 } 199 200 void main( $io_context & this ) { 201 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.fd, &this); 202 203 const int reset_cnt = 5; 204 int reset = reset_cnt; 205 // Then loop until we need to start 206 LOOP: 207 while() { 208 waitfor( ^?{} : this) { 209 break LOOP; 210 } 211 or else {} 212 213 // Drain the io 214 bool again = __drain_io( this ); 215 216 if(!again) reset--; 217 218 // If we got something, just yield and check again 219 if(reset > 1) { 220 yield(); 221 continue LOOP; 222 } 223 224 // We alread failed to find completed entries a few time. 225 if(reset == 1) { 226 // Rearm the context so it can block 227 // but don't block right away 228 // we need to retry one last time in case 229 // something completed *just now* 230 __ioctx_prepare_block( this ); 231 continue LOOP; 232 } 233 234 __STATS__( false, 235 io.poller.sleeps += 1; 236 ) 237 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.fd, &this); 238 239 // block this thread 240 wait( this.sem ); 241 242 // restore counter 243 reset = reset_cnt; 244 } 245 246 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.fd, &this); 167 /* paranoid */ verify( ! __preemption_enabled() ); 168 169 ctx.proc->io.pending = false; 247 170 } 248 171 … … 266 189 // head and tail must be fully filled and shouldn't ever be touched again. 267 190 // 268 269 static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor *, __u32 idxs[], __u32 want );270 static void __ioarbiter_submit ( $io_arbiter & mutex this, $io_context * , __u32 idxs[], __u32 have );271 static void __ioarbiter_flush ( $io_arbiter & mutex this, $io_context * );272 static inline void __ioarbiter_notify( $io_context & ctx );273 274 191 //============================================================================================= 275 192 // Allocation … … 278 195 struct io_uring_sqe * sqes = ctx->sq.sqes; 279 196 for(i; want) { 197 __cfadbg_print_safe(io, "Kernel I/O : filling loop\n"); 280 198 out_sqes[i] = &sqes[idxs[i]]; 281 199 } … … 295 213 // copy all the indexes we want from the available list 296 214 for(i; want) { 215 __cfadbg_print_safe(io, "Kernel I/O : allocating loop\n"); 297 216 idxs[i] = sq.free_ring.array[(fhead + i) & mask]; 298 217 } … … 315 234 disable_interrupts(); 316 235 processor * proc = __cfaabi_tls.this_processor; 236 $io_context * ctx = proc->io.ctx; 317 237 /* paranoid */ verify( __cfaabi_tls.this_processor ); 318 /* paranoid */ verify( proc->io.lock == false ); 319 320 __atomic_store_n( &proc->io.lock, true, __ATOMIC_SEQ_CST ); 321 $io_context * ctx = proc->io.ctx; 238 /* paranoid */ verify( ctx ); 239 240 __cfadbg_print_safe(io, "Kernel I/O : attempting to fast allocation\n"); 241 242 // We can proceed to the fast path 243 if( __alloc(ctx, idxs, want) ) { 244 // Allocation was successful 245 __STATS__( true, io.alloc.fast += 1; ) 246 enable_interrupts( __cfaabi_dbg_ctx ); 247 248 __cfadbg_print_safe(io, "Kernel I/O : fast allocation successful from ring %d\n", ctx->fd); 249 250 __fill( sqes, want, idxs, ctx ); 251 return ctx; 252 } 253 // The fast path failed, fallback 254 __STATS__( true, io.alloc.fail += 1; ) 255 256 // Fast path failed, fallback on arbitration 257 __STATS__( true, io.alloc.slow += 1; ) 258 enable_interrupts( __cfaabi_dbg_ctx ); 259 322 260 $io_arbiter * ioarb = proc->cltr->io.arbiter; 323 261 /* paranoid */ verify( ioarb ); 324 262 325 // Can we proceed to the fast path326 if( ctx // We alreay have an instance?327 && !ctx->revoked ) // Our instance is still valid?328 {329 __cfadbg_print_safe(io, "Kernel I/O : attempting to fast allocation\n");330 331 // We can proceed to the fast path332 if( __alloc(ctx, idxs, want) ) {333 // Allocation was successful334 // Mark the instance as no longer in-use and re-enable interrupts335 __atomic_store_n( &proc->io.lock, false, __ATOMIC_RELEASE );336 __STATS__( true, io.alloc.fast += 1; )337 enable_interrupts( __cfaabi_dbg_ctx );338 339 __cfadbg_print_safe(io, "Kernel I/O : fast allocation successful\n");340 341 __fill( sqes, want, idxs, ctx );342 return ctx;343 }344 // The fast path failed, fallback345 __STATS__( true, io.alloc.fail += 1; )346 }347 348 // Fast path failed, fallback on arbitration349 __atomic_store_n( &proc->io.lock, false, __ATOMIC_RELEASE );350 __STATS__( true, io.alloc.slow += 1; )351 enable_interrupts( __cfaabi_dbg_ctx );352 353 263 __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for allocation\n"); 354 264 355 265 struct $io_context * ret = __ioarbiter_allocate(*ioarb, proc, idxs, want); 356 266 357 __cfadbg_print_safe(io, "Kernel I/O : slow allocation completed \n");267 __cfadbg_print_safe(io, "Kernel I/O : slow allocation completed from ring %d\n", ret->fd); 358 268 359 269 __fill( sqes, want, idxs,ret ); … … 364 274 //============================================================================================= 365 275 // submission 366 static inline void __submit( struct $io_context * ctx, __u32 idxs[], __u32 have ) {276 static inline void __submit( struct $io_context * ctx, __u32 idxs[], __u32 have, bool lazy) { 367 277 // We can proceed to the fast path 368 278 // Get the right objects 369 279 __sub_ring_t & sq = ctx->sq; 370 280 const __u32 mask = *sq.mask; 371 __u32 tail = sq.kring.ready;281 __u32 tail = *sq.kring.tail; 372 282 373 283 // Add the sqes to the array 374 284 for( i; have ) { 285 __cfadbg_print_safe(io, "Kernel I/O : __submit loop\n"); 375 286 sq.kring.array[ (tail + i) & mask ] = idxs[i]; 376 287 } 377 288 378 289 // Make the sqes visible to the submitter 379 __atomic_store_n(&sq.kring.ready, tail + have, __ATOMIC_RELEASE); 380 381 // Make sure the poller is awake 382 __cfadbg_print_safe(io, "Kernel I/O : waking the poller\n"); 383 post( ctx->sem ); 384 } 385 386 void cfa_io_submit( struct $io_context * inctx, __u32 idxs[], __u32 have ) __attribute__((nonnull (1))) { 387 __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u\n", have); 290 __atomic_store_n(sq.kring.tail, tail + have, __ATOMIC_RELEASE); 291 sq.to_submit++; 292 293 ctx->proc->io.pending = true; 294 ctx->proc->io.dirty = true; 295 if(sq.to_submit > 30 || !lazy) { 296 __cfa_io_flush( ctx->proc ); 297 } 298 } 299 300 void cfa_io_submit( struct $io_context * inctx, __u32 idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1))) { 301 __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u (%s)\n", have, lazy ? "lazy" : "eager"); 388 302 389 303 disable_interrupts(); 390 304 processor * proc = __cfaabi_tls.this_processor; 305 $io_context * ctx = proc->io.ctx; 391 306 /* paranoid */ verify( __cfaabi_tls.this_processor ); 392 /* paranoid */ verify( proc->io.lock == false ); 393 394 __atomic_store_n( &proc->io.lock, true, __ATOMIC_SEQ_CST ); 395 $io_context * ctx = proc->io.ctx; 307 /* paranoid */ verify( ctx ); 396 308 397 309 // Can we proceed to the fast path 398 if( ctx // We alreay have an instance? 399 && !ctx->revoked // Our instance is still valid? 400 && ctx == inctx ) // We have the right instance? 310 if( ctx == inctx ) // We have the right instance? 401 311 { 402 __submit(ctx, idxs, have );312 __submit(ctx, idxs, have, lazy); 403 313 404 314 // Mark the instance as no longer in-use, re-enable interrupts and return 405 __atomic_store_n( &proc->io.lock, false, __ATOMIC_RELEASE );406 315 __STATS__( true, io.submit.fast += 1; ) 407 316 enable_interrupts( __cfaabi_dbg_ctx ); … … 412 321 413 322 // Fast path failed, fallback on arbitration 414 __atomic_store_n( &proc->io.lock, false, __ATOMIC_RELEASE );415 323 __STATS__( true, io.submit.slow += 1; ) 416 324 enable_interrupts( __cfaabi_dbg_ctx ); … … 418 326 __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for submission\n"); 419 327 420 __ioarbiter_submit(*inctx->arbiter, inctx, idxs, have );328 __ioarbiter_submit(*inctx->arbiter, inctx, idxs, have, lazy); 421 329 } 422 330 423 331 //============================================================================================= 424 332 // Flushing 425 static unsigned __flush( struct $io_context & ctx ) {426 // First check for external427 if( !__atomic_load_n(&ctx.ext_sq.empty, __ATOMIC_SEQ_CST) ) {428 // We have external submissions, delegate to the arbiter429 __ioarbiter_flush( *ctx.arbiter, &ctx );430 }431 432 __u32 tail = *ctx.sq.kring.tail;433 __u32 ready = ctx.sq.kring.ready;434 435 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );436 ctx.sq.to_submit += (ready - tail);437 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );438 439 if(ctx.sq.to_submit) {440 __cfadbg_print_safe(io, "Kernel I/O : %u ready to submit\n", ctx.sq.to_submit);441 }442 443 __atomic_store_n(ctx.sq.kring.tail, ready, __ATOMIC_RELEASE);444 445 return ctx.sq.to_submit;446 }447 448 449 333 // Go through the ring's submit queue and release everything that has already been consumed 450 334 // by io_uring … … 484 368 // go through the range and release the sqes 485 369 for( i; count ) { 370 __cfadbg_print_safe(io, "Kernel I/O : release loop\n"); 486 371 __u32 idx = ctx.sq.kring.array[ (phead + i) & mask ]; 487 372 ctx.sq.free_ring.array[ (ftail + i) & mask ] = idx; … … 499 384 // I/O Arbiter 500 385 //============================================================================================= 501 static inline void __revoke( $io_arbiter & this, $io_context * ctx ) {502 if(ctx->revoked) return;503 504 /* paranoid */ verify( ctx->proc );505 remove( this.assigned, *ctx );506 507 // Mark as revoked508 __atomic_store_n(&ctx->revoked, true, __ATOMIC_SEQ_CST);509 510 // Wait for the processor to no longer use it511 while(ctx->proc->io.lock) Pause();512 513 // Remove the coupling with the processor514 ctx->proc->io.ctx = 0p;515 ctx->proc = 0p;516 517 // add to available contexts518 addHead( this.available, *ctx );519 }520 521 static inline void __assign( $io_arbiter & this, $io_context * ctx, processor * proc ) {522 remove( this.available, *ctx );523 524 ctx->revoked = false;525 ctx->proc = proc;526 __atomic_store_n(&proc->io.ctx, ctx, __ATOMIC_SEQ_CST);527 528 // add to assigned contexts529 addTail( this.assigned, *ctx );530 }531 532 386 static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor * proc, __u32 idxs[], __u32 want ) { 533 387 __cfadbg_print_safe(io, "Kernel I/O : arbiter allocating\n"); 534 535 SeqIter($io_context) iter;536 $io_context & ci;537 // Do we already have something available?538 for( over( iter, this.available ); iter | ci;) {539 __cfadbg_print_safe(io, "Kernel I/O : attempting available context\n");540 541 $io_context * c = &ci;542 if(__alloc(c, idxs, want)) {543 __assign( this, c, proc);544 return c;545 }546 }547 548 549 // Otherwise, we have no choice but to revoke everyone to check if other instance have available data550 for( over( iter, this.assigned ); iter | ci; ) {551 __cfadbg_print_safe(io, "Kernel I/O : revoking context for allocation\n");552 553 $io_context * c = &ci;554 __revoke( this, c );555 556 __STATS__( false, io.alloc.revoke += 1; )557 558 if(__alloc(c, idxs, want)) {559 __assign( this, c, proc);560 return c;561 }562 }563 564 __cfadbg_print_safe(io, "Kernel I/O : waiting for available resources\n");565 388 566 389 __STATS__( false, io.alloc.block += 1; ) … … 577 400 /* paranoid */ verify( ret ); 578 401 579 __assign( this, this.pending.ctx, proc);580 402 return this.pending.ctx; 403 581 404 } 582 405 … … 586 409 587 410 while( !is_empty(this.pending.blocked) ) { 411 __cfadbg_print_safe(io, "Kernel I/O : notifying\n"); 588 412 __u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head; 589 413 __u32 want = front( this.pending.blocked ); … … 604 428 605 429 // Simply append to the pending 606 static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * ctx, __u32 idxs[], __u32 have ) {430 static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * ctx, __u32 idxs[], __u32 have, bool lazy ) { 607 431 __cfadbg_print_safe(io, "Kernel I/O : submitting %u from the arbiter to context %u\n", have, ctx->fd); 608 432 … … 612 436 __atomic_store_n( &ctx->ext_sq.empty, false, __ATOMIC_SEQ_CST ); 613 437 614 // Wake-up the poller615 post( ctx->sem );616 617 438 __cfadbg_print_safe(io, "Kernel I/O : waiting to submit %u\n", have); 618 439 … … 621 442 622 443 // Submit our indexes 623 __submit(ctx, idxs, have );444 __submit(ctx, idxs, have, lazy); 624 445 625 446 __cfadbg_print_safe(io, "Kernel I/O : %u submitted from arbiter\n", have); … … 630 451 631 452 __STATS__( false, io.flush.external += 1; ) 632 633 __revoke( this, ctx );634 453 635 454 __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n"); … … 643 462 ctx->ext_sq.empty = true; 644 463 } 645 646 void __ioarbiter_register( $io_arbiter & mutex this, $io_context & ctx ) {647 __cfadbg_print_safe(io, "Kernel I/O : registering new context\n");648 649 ctx.arbiter = &this;650 651 // add to available contexts652 addHead( this.available, ctx );653 654 // Check if this solves pending allocations655 if(this.pending.flag) {656 __ioarbiter_notify( ctx );657 }658 }659 660 void __ioarbiter_unregister( $io_arbiter & mutex this, $io_context & ctx ) {661 /* paranoid */ verify( &this == ctx.arbiter );662 663 __revoke( this, &ctx );664 665 remove( this.available, ctx );666 }667 464 #endif -
libcfa/src/concurrency/io/call.cfa.in
r6047b00 rdddb3dd0 75 75 76 76 extern struct $io_context * cfa_io_allocate(struct io_uring_sqe * out_sqes[], __u32 out_idxs[], __u32 want) __attribute__((nonnull (1,2))); 77 extern void cfa_io_submit( struct $io_context * in_ctx, __u32 in_idxs[], __u32 have ) __attribute__((nonnull (1,2)));77 extern void cfa_io_submit( struct $io_context * in_ctx, __u32 in_idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1,2))); 78 78 #endif 79 79 … … 185 185 return ', '.join(args_a) 186 186 187 AsyncTemplate = """inline void async_{name}(io_future_t & future, {params}, intsubmit_flags) {{187 AsyncTemplate = """inline void async_{name}(io_future_t & future, {params}, __u64 submit_flags) {{ 188 188 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_{op}) 189 189 ssize_t res = {name}({args}); … … 216 216 217 217 verify( sqe->user_data == (__u64)(uintptr_t)&future ); 218 cfa_io_submit( ctx, &idx, 1 );218 cfa_io_submit( ctx, &idx, 1, 0 != (submit_flags & CFA_IO_LAZY) ); 219 219 #endif 220 220 }}""" 221 221 222 SyncTemplate = """{ret} cfa_{name}({params}, intsubmit_flags) {{222 SyncTemplate = """{ret} cfa_{name}({params}, __u64 submit_flags) {{ 223 223 io_future_t future; 224 224 … … 388 388 if c.define: 389 389 print("""#if defined({define}) 390 {ret} cfa_{name}({params}, intsubmit_flags);390 {ret} cfa_{name}({params}, __u64 submit_flags); 391 391 #endif""".format(define=c.define,ret=c.ret, name=c.name, params=c.params)) 392 392 else: 393 print("{ret} cfa_{name}({params}, intsubmit_flags);"393 print("{ret} cfa_{name}({params}, __u64 submit_flags);" 394 394 .format(ret=c.ret, name=c.name, params=c.params)) 395 395 … … 399 399 if c.define: 400 400 print("""#if defined({define}) 401 void async_{name}(io_future_t & future, {params}, intsubmit_flags);401 void async_{name}(io_future_t & future, {params}, __u64 submit_flags); 402 402 #endif""".format(define=c.define,name=c.name, params=c.params)) 403 403 else: 404 print("void async_{name}(io_future_t & future, {params}, intsubmit_flags);"404 print("void async_{name}(io_future_t & future, {params}, __u64 submit_flags);" 405 405 .format(name=c.name, params=c.params)) 406 406 print("\n") -
libcfa/src/concurrency/io/setup.cfa
r6047b00 rdddb3dd0 26 26 27 27 #if !defined(CFA_HAVE_LINUX_IO_URING_H) 28 void __kernel_io_startup() {29 // Nothing to do without io_uring30 }31 32 void __kernel_io_shutdown() {33 // Nothing to do without io_uring34 }35 36 28 void ?{}(io_context_params & this) {} 37 29 … … 97 89 98 90 //============================================================================================= 99 // I/O Startup / Shutdown logic + Master Poller100 //=============================================================================================101 102 // IO Master poller loop forward103 static void * iopoll_loop( __attribute__((unused)) void * args );104 105 static struct {106 pthread_t thrd; // pthread handle to io poller thread107 void * stack; // pthread stack for io poller thread108 int epollfd; // file descriptor to the epoll instance109 volatile bool run; // Whether or not to continue110 volatile bool stopped; // Whether the poller has finished running111 volatile uint64_t epoch; // Epoch used for memory reclamation112 } iopoll;113 114 void __kernel_io_startup(void) {115 __cfadbg_print_safe(io_core, "Kernel : Creating EPOLL instance\n" );116 117 iopoll.epollfd = epoll_create1(0);118 if (iopoll.epollfd == -1) {119 abort( "internal error, epoll_create1\n");120 }121 122 __cfadbg_print_safe(io_core, "Kernel : Starting io poller thread\n" );123 124 iopoll.stack = __create_pthread( &iopoll.thrd, iopoll_loop, 0p );125 iopoll.run = true;126 iopoll.stopped = false;127 iopoll.epoch = 0;128 }129 130 void __kernel_io_shutdown(void) {131 // Notify the io poller thread of the shutdown132 iopoll.run = false;133 sigval val = { 1 };134 pthread_sigqueue( iopoll.thrd, SIGUSR1, val );135 136 // Wait for the io poller thread to finish137 138 __destroy_pthread( iopoll.thrd, iopoll.stack, 0p );139 140 int ret = close(iopoll.epollfd);141 if (ret == -1) {142 abort( "internal error, close epoll\n");143 }144 145 // Io polling is now fully stopped146 147 __cfadbg_print_safe(io_core, "Kernel : IO poller stopped\n" );148 }149 150 static void * iopoll_loop( __attribute__((unused)) void * args ) {151 __processor_id_t id;152 id.full_proc = false;153 id.id = doregister(&id);154 __cfaabi_tls.this_proc_id = &id;155 __cfadbg_print_safe(io_core, "Kernel : IO poller thread starting\n" );156 157 // Block signals to control when they arrive158 sigset_t mask;159 sigfillset(&mask);160 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {161 abort( "internal error, pthread_sigmask" );162 }163 164 sigdelset( &mask, SIGUSR1 );165 166 // Create sufficient events167 struct epoll_event events[10];168 // Main loop169 while( iopoll.run ) {170 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n");171 172 // increment the epoch to notify any deleters we are starting a new cycle173 __atomic_fetch_add(&iopoll.epoch, 1, __ATOMIC_SEQ_CST);174 175 // Wait for events176 int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask );177 178 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds);179 180 // Check if an error occured181 if (nfds == -1) {182 if( errno == EINTR ) continue;183 abort( "internal error, pthread_sigmask" );184 }185 186 for(i; nfds) {187 $io_context * io_ctx = ($io_context *)(uintptr_t)events[i].data.u64;188 /* paranoid */ verify( io_ctx );189 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->fd, io_ctx);190 #if !defined( __CFA_NO_STATISTICS__ )191 __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats;192 #endif193 194 eventfd_t v;195 eventfd_read(io_ctx->efd, &v);196 197 post( io_ctx->sem );198 }199 }200 201 __atomic_store_n(&iopoll.stopped, true, __ATOMIC_SEQ_CST);202 203 __cfadbg_print_safe(io_core, "Kernel : IO poller thread stopping\n" );204 unregister(&id);205 return 0p;206 }207 208 //=============================================================================================209 91 // I/O Context Constrution/Destruction 210 92 //============================================================================================= 211 93 212 static void __io_uring_setup ( $io_context & this, const io_context_params & params_in ); 94 95 96 static void __io_uring_setup ( $io_context & this, const io_context_params & params_in, int procfd ); 213 97 static void __io_uring_teardown( $io_context & this ); 214 98 static void __epoll_register($io_context & ctx); … … 217 101 void __ioarbiter_unregister( $io_arbiter & mutex, $io_context & ctx ); 218 102 219 void ?{}($io_context & this, struct cluster & cl) { 220 (this.self){ "IO Poller", cl }; 103 void ?{}($io_context & this, processor * proc, struct cluster & cl) { 104 /* paranoid */ verify( cl.io.arbiter ); 105 this.proc = proc; 106 this.arbiter = cl.io.arbiter; 221 107 this.ext_sq.empty = true; 222 this.revoked = true;223 __io_uring_setup( this, cl.io.params );108 (this.ext_sq.blocked){}; 109 __io_uring_setup( this, cl.io.params, proc->idle ); 224 110 __cfadbg_print_safe(io_core, "Kernel I/O : Created ring for io_context %u (%p)\n", this.fd, &this); 225 226 __epoll_register(this); 227 228 __ioarbiter_register(*cl.io.arbiter, this); 229 230 __thrd_start( this, main ); 231 __cfadbg_print_safe(io_core, "Kernel I/O : Started poller thread for io_context %u\n", this.fd); 232 } 233 234 void ^?{}($io_context & mutex this) { 111 } 112 113 void ^?{}($io_context & this) { 235 114 __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %u\n", this.fd); 236 237 ^(this.self){};238 __cfadbg_print_safe(io_core, "Kernel I/O : Stopped poller thread for io_context %u\n", this.fd);239 240 __ioarbiter_unregister(*this.arbiter, this);241 242 __epoll_unregister(this);243 115 244 116 __io_uring_teardown( this ); … … 246 118 } 247 119 248 void ?{}(io_context & this, struct cluster & cl) {249 // this.ctx = new(cl);250 this.ctx = alloc();251 (*this.ctx){ cl };252 253 __cfadbg_print_safe(io_core, "Kernel I/O : io_context %u ready\n", this.ctx->fd);254 }255 256 void ^?{}(io_context & this) {257 post( this.ctx->sem );258 259 delete(this.ctx);260 }261 262 120 extern void __disable_interrupts_hard(); 263 121 extern void __enable_interrupts_hard(); 264 122 265 static void __io_uring_setup( $io_context & this, const io_context_params & params_in ) {123 static void __io_uring_setup( $io_context & this, const io_context_params & params_in, int procfd ) { 266 124 // Step 1 : call to setup 267 125 struct io_uring_params params; … … 339 197 sq.dropped = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped); 340 198 341 sq.kring.ready = 0;342 199 sq.kring.released = 0; 343 200 … … 362 219 // io_uring_register is so f*cking slow on some machine that it 363 220 // will never succeed if preemption isn't hard blocked 221 __cfadbg_print_safe(io_core, "Kernel I/O : registering %d for completion with ring %d\n", procfd, fd); 222 364 223 __disable_interrupts_hard(); 365 224 366 int efd = eventfd(0, 0); 367 if (efd < 0) { 368 abort("KERNEL ERROR: IO_URING EVENTFD - %s\n", strerror(errno)); 369 } 370 371 int ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &efd, 1); 225 int ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &procfd, 1); 372 226 if (ret < 0) { 373 227 abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno)); … … 375 229 376 230 __enable_interrupts_hard(); 231 232 __cfadbg_print_safe(io_core, "Kernel I/O : registered %d for completion with ring %d\n", procfd, fd); 377 233 378 234 // some paranoid checks … … 390 246 this.ring_flags = 0; 391 247 this.fd = fd; 392 this.efd = efd;393 248 } 394 249 … … 411 266 // close the file descriptor 412 267 close(this.fd); 413 close(this.efd);414 268 415 269 free( this.sq.free_ring.array ); // Maybe null, doesn't matter 416 270 } 417 271 272 void __cfa_io_start( processor * proc ) { 273 proc->io.ctx = alloc(); 274 (*proc->io.ctx){proc, *proc->cltr}; 275 } 276 void __cfa_io_stop ( processor * proc ) { 277 ^(*proc->io.ctx){}; 278 free(proc->io.ctx); 279 } 280 418 281 //============================================================================================= 419 282 // I/O Context Sleep 420 283 //============================================================================================= 421 static inline void __epoll_ctl($io_context & ctx, int op, const char * error) {422 struct epoll_event ev;423 ev.events = EPOLLIN | EPOLLONESHOT;424 ev.data.u64 = (__u64)&ctx;425 int ret = epoll_ctl(iopoll.epollfd, op, ctx.efd, &ev);426 if (ret < 0) {427 abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) );428 }429 }430 431 static void __epoll_register($io_context & ctx) {432 __epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD");433 }434 435 static void __epoll_unregister($io_context & ctx) {436 // Read the current epoch so we know when to stop437 size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST);438 439 // Remove the fd from the iopoller440 __epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE");441 442 // Notify the io poller thread of the shutdown443 iopoll.run = false;444 sigval val = { 1 };445 pthread_sigqueue( iopoll.thrd, SIGUSR1, val );446 447 // Make sure all this is done448 __atomic_thread_fence(__ATOMIC_SEQ_CST);449 450 // Wait for the next epoch451 while(curr == iopoll.epoch && !iopoll.stopped) Pause();452 }453 454 void __ioctx_prepare_block($io_context & ctx) {455 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.fd, &ctx);456 __epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM");457 }284 // static inline void __epoll_ctl($io_context & ctx, int op, const char * error) { 285 // struct epoll_event ev; 286 // ev.events = EPOLLIN | EPOLLONESHOT; 287 // ev.data.u64 = (__u64)&ctx; 288 // int ret = epoll_ctl(iopoll.epollfd, op, ctx.efd, &ev); 289 // if (ret < 0) { 290 // abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) ); 291 // } 292 // } 293 294 // static void __epoll_register($io_context & ctx) { 295 // __epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD"); 296 // } 297 298 // static void __epoll_unregister($io_context & ctx) { 299 // // Read the current epoch so we know when to stop 300 // size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST); 301 302 // // Remove the fd from the iopoller 303 // __epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE"); 304 305 // // Notify the io poller thread of the shutdown 306 // iopoll.run = false; 307 // sigval val = { 1 }; 308 // pthread_sigqueue( iopoll.thrd, SIGUSR1, val ); 309 310 // // Make sure all this is done 311 // __atomic_thread_fence(__ATOMIC_SEQ_CST); 312 313 // // Wait for the next epoch 314 // while(curr == iopoll.epoch && !iopoll.stopped) Pause(); 315 // } 316 317 // void __ioctx_prepare_block($io_context & ctx) { 318 // __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.fd, &ctx); 319 // __epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM"); 320 // } 458 321 459 322 … … 466 329 467 330 void ^?{}( $io_arbiter & mutex this ) { 468 / * paranoid */ verify( empty(this.assigned) );469 / * paranoid */ verify( empty(this.available) );331 // /* paranoid */ verify( empty(this.assigned) ); 332 // /* paranoid */ verify( empty(this.available) ); 470 333 /* paranoid */ verify( is_empty(this.pending.blocked) ); 471 334 } -
libcfa/src/concurrency/io/types.hfa
r6047b00 rdddb3dd0 38 38 volatile __u32 * head; // one passed last index consumed by the kernel 39 39 volatile __u32 * tail; // one passed last index visible to the kernel 40 volatile __u32 ready; // one passed last index added to array ()41 40 volatile __u32 released; // one passed last index released back to the free list 42 41 … … 97 96 98 97 struct __attribute__((aligned(128))) $io_context { 99 inline Seqable; 100 101 volatile bool revoked; 98 $io_arbiter * arbiter; 102 99 processor * proc; 103 104 $io_arbiter * arbiter;105 100 106 101 struct { … … 113 108 __u32 ring_flags; 114 109 int fd; 115 int efd;116 117 single_sem sem;118 $thread self;119 110 }; 120 121 void main( $io_context & this );122 static inline $thread * get_thread ( $io_context & this ) __attribute__((const)) { return &this.self; }123 static inline $monitor * get_monitor( $io_context & this ) __attribute__((const)) { return &this.self.self_mon; }124 static inline $io_context *& Back( $io_context * n ) { return ($io_context *)Back( (Seqable *)n ); }125 static inline $io_context *& Next( $io_context * n ) { return ($io_context *)Next( (Colable *)n ); }126 void ^?{}( $io_context & mutex this );127 111 128 112 monitor __attribute__((aligned(128))) $io_arbiter { … … 132 116 volatile bool flag; 133 117 } pending; 134 135 Sequence($io_context) assigned;136 137 Sequence($io_context) available;138 118 }; 139 119 … … 167 147 #endif 168 148 169 void __ioctx_prepare_block($io_context & ctx);149 // void __ioctx_prepare_block($io_context & ctx); 170 150 #endif 171 151 -
libcfa/src/concurrency/iofwd.hfa
r6047b00 rdddb3dd0 59 59 // underlying calls 60 60 extern struct $io_context * cfa_io_allocate(struct io_uring_sqe * out_sqes[], __u32 out_idxs[], __u32 want) __attribute__((nonnull (1,2))); 61 extern void cfa_io_submit( struct $io_context * in_ctx, __u32 in_idxs[], __u32 have ) __attribute__((nonnull (1,2)));61 extern void cfa_io_submit( struct $io_context * in_ctx, __u32 in_idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1,2))); 62 62 63 63 //---------- 64 64 // synchronous calls 65 65 #if defined(CFA_HAVE_PREADV2) 66 extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, intsubmit_flags);66 extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, __u64 submit_flags); 67 67 #endif 68 68 #if defined(CFA_HAVE_PWRITEV2) 69 extern ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, intsubmit_flags);69 extern ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, __u64 submit_flags); 70 70 #endif 71 extern int cfa_fsync(int fd, intsubmit_flags);72 extern int cfa_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event, intsubmit_flags);73 extern int cfa_sync_file_range(int fd, off64_t offset, off64_t nbytes, unsigned int flags, intsubmit_flags);74 extern ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, intsubmit_flags);75 extern ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, intsubmit_flags);76 extern ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, intsubmit_flags);77 extern ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, intsubmit_flags);78 extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, intsubmit_flags);79 extern int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, intsubmit_flags);80 extern int cfa_fallocate(int fd, int mode, off_t offset, off_t len, intsubmit_flags);81 extern int cfa_posix_fadvise(int fd, off_t offset, off_t len, int advice, intsubmit_flags);82 extern int cfa_madvise(void *addr, size_t length, int advice, intsubmit_flags);83 extern int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, intsubmit_flags);71 extern int cfa_fsync(int fd, __u64 submit_flags); 72 extern int cfa_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event, __u64 submit_flags); 73 extern int cfa_sync_file_range(int fd, off64_t offset, off64_t nbytes, unsigned int flags, __u64 submit_flags); 74 extern ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, __u64 submit_flags); 75 extern ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, __u64 submit_flags); 76 extern ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, __u64 submit_flags); 77 extern ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, __u64 submit_flags); 78 extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, __u64 submit_flags); 79 extern int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, __u64 submit_flags); 80 extern int cfa_fallocate(int fd, int mode, off_t offset, off_t len, __u64 submit_flags); 81 extern int cfa_posix_fadvise(int fd, off_t offset, off_t len, int advice, __u64 submit_flags); 82 extern int cfa_madvise(void *addr, size_t length, int advice, __u64 submit_flags); 83 extern int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, __u64 submit_flags); 84 84 #if defined(CFA_HAVE_OPENAT2) 85 extern int cfa_openat2(int dirfd, const char *pathname, struct open_how * how, size_t size, intsubmit_flags);85 extern int cfa_openat2(int dirfd, const char *pathname, struct open_how * how, size_t size, __u64 submit_flags); 86 86 #endif 87 extern int cfa_close(int fd, intsubmit_flags);87 extern int cfa_close(int fd, __u64 submit_flags); 88 88 #if defined(CFA_HAVE_STATX) 89 extern int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, intsubmit_flags);89 extern int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, __u64 submit_flags); 90 90 #endif 91 extern ssize_t cfa_read(int fd, void * buf, size_t count, intsubmit_flags);92 extern ssize_t cfa_write(int fd, void * buf, size_t count, intsubmit_flags);93 extern ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, intsubmit_flags);94 extern ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, intsubmit_flags);91 extern ssize_t cfa_read(int fd, void * buf, size_t count, __u64 submit_flags); 92 extern ssize_t cfa_write(int fd, void * buf, size_t count, __u64 submit_flags); 93 extern ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, __u64 submit_flags); 94 extern ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, __u64 submit_flags); 95 95 96 96 //---------- 97 97 // asynchronous calls 98 98 #if defined(CFA_HAVE_PREADV2) 99 extern void async_preadv2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, intsubmit_flags);99 extern void async_preadv2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, __u64 submit_flags); 100 100 #endif 101 101 #if defined(CFA_HAVE_PWRITEV2) 102 extern void async_pwritev2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, intsubmit_flags);102 extern void async_pwritev2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, __u64 submit_flags); 103 103 #endif 104 extern void async_fsync(io_future_t & future, int fd, intsubmit_flags);105 extern void async_epoll_ctl(io_future_t & future, int epfd, int op, int fd, struct epoll_event *event, intsubmit_flags);106 extern void async_sync_file_range(io_future_t & future, int fd, off64_t offset, off64_t nbytes, unsigned int flags, intsubmit_flags);107 extern void async_sendmsg(io_future_t & future, int sockfd, const struct msghdr *msg, int flags, intsubmit_flags);108 extern void async_recvmsg(io_future_t & future, int sockfd, struct msghdr *msg, int flags, intsubmit_flags);109 extern void async_send(io_future_t & future, int sockfd, const void *buf, size_t len, int flags, intsubmit_flags);110 extern void async_recv(io_future_t & future, int sockfd, void *buf, size_t len, int flags, intsubmit_flags);111 extern void async_accept4(io_future_t & future, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, intsubmit_flags);112 extern void async_connect(io_future_t & future, int sockfd, const struct sockaddr *addr, socklen_t addrlen, intsubmit_flags);113 extern void async_fallocate(io_future_t & future, int fd, int mode, off_t offset, off_t len, intsubmit_flags);114 extern void async_posix_fadvise(io_future_t & future, int fd, off_t offset, off_t len, int advice, intsubmit_flags);115 extern void async_madvise(io_future_t & future, void *addr, size_t length, int advice, intsubmit_flags);116 extern void async_openat(io_future_t & future, int dirfd, const char *pathname, int flags, mode_t mode, intsubmit_flags);104 extern void async_fsync(io_future_t & future, int fd, __u64 submit_flags); 105 extern void async_epoll_ctl(io_future_t & future, int epfd, int op, int fd, struct epoll_event *event, __u64 submit_flags); 106 extern void async_sync_file_range(io_future_t & future, int fd, off64_t offset, off64_t nbytes, unsigned int flags, __u64 submit_flags); 107 extern void async_sendmsg(io_future_t & future, int sockfd, const struct msghdr *msg, int flags, __u64 submit_flags); 108 extern void async_recvmsg(io_future_t & future, int sockfd, struct msghdr *msg, int flags, __u64 submit_flags); 109 extern void async_send(io_future_t & future, int sockfd, const void *buf, size_t len, int flags, __u64 submit_flags); 110 extern void async_recv(io_future_t & future, int sockfd, void *buf, size_t len, int flags, __u64 submit_flags); 111 extern void async_accept4(io_future_t & future, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, __u64 submit_flags); 112 extern void async_connect(io_future_t & future, int sockfd, const struct sockaddr *addr, socklen_t addrlen, __u64 submit_flags); 113 extern void async_fallocate(io_future_t & future, int fd, int mode, off_t offset, off_t len, __u64 submit_flags); 114 extern void async_posix_fadvise(io_future_t & future, int fd, off_t offset, off_t len, int advice, __u64 submit_flags); 115 extern void async_madvise(io_future_t & future, void *addr, size_t length, int advice, __u64 submit_flags); 116 extern void async_openat(io_future_t & future, int dirfd, const char *pathname, int flags, mode_t mode, __u64 submit_flags); 117 117 #if defined(CFA_HAVE_OPENAT2) 118 extern void async_openat2(io_future_t & future, int dirfd, const char *pathname, struct open_how * how, size_t size, intsubmit_flags);118 extern void async_openat2(io_future_t & future, int dirfd, const char *pathname, struct open_how * how, size_t size, __u64 submit_flags); 119 119 #endif 120 extern void async_close(io_future_t & future, int fd, intsubmit_flags);120 extern void async_close(io_future_t & future, int fd, __u64 submit_flags); 121 121 #if defined(CFA_HAVE_STATX) 122 extern void async_statx(io_future_t & future, int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, intsubmit_flags);122 extern void async_statx(io_future_t & future, int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, __u64 submit_flags); 123 123 #endif 124 void async_read(io_future_t & future, int fd, void * buf, size_t count, intsubmit_flags);125 extern void async_write(io_future_t & future, int fd, void * buf, size_t count, intsubmit_flags);126 extern void async_splice(io_future_t & future, int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, intsubmit_flags);127 extern void async_tee(io_future_t & future, int fd_in, int fd_out, size_t len, unsigned int flags, intsubmit_flags);124 void async_read(io_future_t & future, int fd, void * buf, size_t count, __u64 submit_flags); 125 extern void async_write(io_future_t & future, int fd, void * buf, size_t count, __u64 submit_flags); 126 extern void async_splice(io_future_t & future, int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, __u64 submit_flags); 127 extern void async_tee(io_future_t & future, int fd_in, int fd_out, size_t len, unsigned int flags, __u64 submit_flags); 128 128 129 129 -
libcfa/src/concurrency/kernel.cfa
r6047b00 rdddb3dd0 22 22 #include <signal.h> 23 23 #include <unistd.h> 24 extern "C" { 25 #include <sys/eventfd.h> 26 } 24 27 25 28 //CFA Includes … … 109 112 static void __run_thread(processor * this, $thread * dst); 110 113 static void __wake_one(cluster * cltr); 111 static void wait(__bin_sem_t & this);112 114 113 115 static void push (__cluster_idles & idles, processor & proc); … … 115 117 static [unsigned idle, unsigned total, * processor] query( & __cluster_idles idles ); 116 118 119 extern void __cfa_io_start( processor * ); 120 extern void __cfa_io_drain( processor * ); 121 extern void __cfa_io_flush( processor * ); 122 extern void __cfa_io_stop ( processor * ); 123 static inline void __maybe_io_drain( processor * ); 124 125 extern void __disable_interrupts_hard(); 126 extern void __enable_interrupts_hard(); 117 127 118 128 //============================================================================================= … … 130 140 verify(this); 131 141 142 __cfa_io_start( this ); 143 132 144 __cfadbg_print_safe(runtime_core, "Kernel : core %p starting\n", this); 133 145 #if !defined(__CFA_NO_STATISTICS__) … … 151 163 MAIN_LOOP: 152 164 for() { 165 // Check if there is pending io 166 __maybe_io_drain( this ); 167 153 168 // Try to get the next thread 154 169 readyThread = __next_thread( this->cltr ); 155 170 156 171 if( !readyThread ) { 172 __cfa_io_flush( this ); 157 173 readyThread = __next_thread_slow( this->cltr ); 158 174 } … … 190 206 #endif 191 207 192 wait( this->idle ); 208 __cfadbg_print_safe(runtime_core, "Kernel : core %p waiting on eventfd %d\n", this, this->idle); 209 210 __disable_interrupts_hard(); 211 eventfd_t val; 212 eventfd_read( this->idle, &val ); 213 __enable_interrupts_hard(); 193 214 194 215 #if !defined(__CFA_NO_STATISTICS__) … … 206 227 207 228 /* paranoid */ verify( readyThread ); 229 230 // Reset io dirty bit 231 this->io.dirty = false; 208 232 209 233 // We found a thread run it … … 220 244 } 221 245 #endif 246 247 if(this->io.pending && !this->io.dirty) { 248 __cfa_io_flush( this ); 249 } 222 250 } 223 251 … … 225 253 } 226 254 255 __cfa_io_stop( this ); 256 227 257 post( this->terminated ); 258 228 259 229 260 if(this == mainProcessor) { … … 248 279 /* paranoid */ verifyf( thrd_dst->link.next == 0p, "Expected null got %p", thrd_dst->link.next ); 249 280 __builtin_prefetch( thrd_dst->context.SP ); 281 282 __cfadbg_print_safe(runtime_core, "Kernel : core %p running thread %p (%s)\n", this, thrd_dst, thrd_dst->self_cor.name); 250 283 251 284 $coroutine * proc_cor = get_coroutine(this->runner); … … 330 363 // Just before returning to the processor, set the processor coroutine to active 331 364 proc_cor->state = Active; 365 366 __cfadbg_print_safe(runtime_core, "Kernel : core %p finished running thread %p\n", this, thrd_dst); 332 367 333 368 /* paranoid */ verify( ! __preemption_enabled() ); … … 549 584 // Kernel Idle Sleep 550 585 //============================================================================================= 551 extern "C" {552 char * strerror(int);553 }554 #define CHECKED(x) { int err = x; if( err != 0 ) abort("KERNEL ERROR: Operation \"" #x "\" return error %d - %s\n", err, strerror(err)); }555 556 static void wait(__bin_sem_t & this) with( this ) {557 verify(__cfaabi_dbg_in_kernel());558 CHECKED( pthread_mutex_lock(&lock) );559 while(val < 1) {560 pthread_cond_wait(&cond, &lock);561 }562 val -= 1;563 CHECKED( pthread_mutex_unlock(&lock) );564 }565 566 static bool post(__bin_sem_t & this) with( this ) {567 bool needs_signal = false;568 569 CHECKED( pthread_mutex_lock(&lock) );570 if(val < 1) {571 val += 1;572 pthread_cond_signal(&cond);573 needs_signal = true;574 }575 CHECKED( pthread_mutex_unlock(&lock) );576 577 return needs_signal;578 }579 580 #undef CHECKED581 582 586 // Wake a thread from the front if there are any 583 587 static void __wake_one(cluster * this) { … … 595 599 596 600 // We found a processor, wake it up 597 post( p->idle ); 601 eventfd_t val; 602 val = 1; 603 eventfd_write( p->idle, val ); 598 604 599 605 #if !defined(__CFA_NO_STATISTICS__) … … 613 619 disable_interrupts(); 614 620 /* paranoid */ verify( ! __preemption_enabled() ); 615 post( this->idle ); 621 eventfd_t val; 622 val = 1; 623 eventfd_read( this->idle, &val ); 616 624 enable_interrupts( __cfaabi_dbg_ctx ); 617 625 } … … 696 704 // Kernel Utilities 697 705 //============================================================================================= 706 #if defined(CFA_HAVE_LINUX_IO_URING_H) 707 #include "io/types.hfa" 708 #endif 709 710 static inline void __maybe_io_drain( processor * proc ) { 711 #if defined(CFA_HAVE_LINUX_IO_URING_H) 712 __cfadbg_print_safe(runtime_core, "Kernel : core %p checking io for ring %d\n", proc, proc->io.ctx->fd); 713 714 // Check if we should drain the queue 715 $io_context * ctx = proc->io.ctx; 716 unsigned head = *ctx->cq.head; 717 unsigned tail = *ctx->cq.tail; 718 if(head != tail) __cfa_io_drain( proc ); 719 #endif 720 } 721 698 722 //----------------------------------------------------------------------------- 699 723 // Debug -
libcfa/src/concurrency/kernel.hfa
r6047b00 rdddb3dd0 28 28 } 29 29 30 //-----------------------------------------------------------------------------31 // Underlying Locks32 30 #ifdef __CFA_WITH_VERIFY__ 33 31 extern bool __cfaabi_dbg_in_kernel(); 34 32 #endif 35 36 struct __bin_sem_t {37 pthread_mutex_t lock;38 pthread_cond_t cond;39 int val;40 };41 33 42 34 //----------------------------------------------------------------------------- … … 52 44 void ?{}(io_context_params & this); 53 45 54 struct io_context {55 $io_context * ctx;56 cluster * cltr;57 };58 void ?{}(io_context & this, struct cluster & cl);59 void ^?{}(io_context & this);60 61 46 //----------------------------------------------------------------------------- 62 47 // Processor … … 98 83 99 84 struct { 100 $io_context * volatile ctx; 101 volatile bool lock; 85 $io_context * ctx; 86 bool pending; 87 bool dirty; 102 88 } io; 103 89 … … 110 96 111 97 // Idle lock (kernel semaphore) 112 __bin_sem_t idle;98 int idle; 113 99 114 100 // Termination synchronisation (user semaphore) -
libcfa/src/concurrency/kernel/startup.cfa
r6047b00 rdddb3dd0 22 22 extern "C" { 23 23 #include <limits.h> // PTHREAD_STACK_MIN 24 #include <sys/eventfd.h> // eventfd 24 25 #include <sys/mman.h> // mprotect 25 26 #include <sys/resource.h> // getrlimit … … 80 81 static void ?{}(processorCtx_t & this) {} 81 82 static void ?{}(processorCtx_t & this, processor * proc, current_stack_info_t * info); 82 static void ?{}(__bin_sem_t & this);83 static void ^?{}(__bin_sem_t & this);84 83 85 84 #if defined(__CFA_WITH_VERIFY__) … … 91 90 extern void __kernel_alarm_startup(void); 92 91 extern void __kernel_alarm_shutdown(void); 93 extern void __kernel_io_startup (void);94 extern void __kernel_io_shutdown(void);95 92 96 93 //----------------------------------------------------------------------------- … … 104 101 KERNEL_STORAGE($thread, mainThread); 105 102 KERNEL_STORAGE(__stack_t, mainThreadCtx); 106 KERNEL_STORAGE(io_context, mainIoContext);107 103 KERNEL_STORAGE(__scheduler_RWLock_t, __scheduler_lock); 108 104 #if !defined(__CFA_NO_STATISTICS__) … … 200 196 201 197 void ?{}(processor & this) with( this ) { 202 ( this.idle ){};203 198 ( this.terminated ){}; 204 199 ( this.runner ){}; … … 228 223 __kernel_alarm_startup(); 229 224 230 // Start IO231 __kernel_io_startup();232 233 io_context * mainio = (io_context *)&storage_mainIoContext;234 (*mainio){ *mainCluster };235 236 225 // Add the main thread to the ready queue 237 226 // once resume is called on mainProcessor->runner the mainThread needs to be scheduled like any normal thread … … 255 244 256 245 static void __kernel_shutdown(void) { 257 //Before we start shutting things down, wait for systems that need threading to shutdown258 io_context * mainio = (io_context *)&storage_mainIoContext;259 ^(*mainio){};260 261 246 /* paranoid */ verify( __preemption_enabled() ); 262 247 disable_interrupts(); … … 276 261 // Disable preemption 277 262 __kernel_alarm_shutdown(); 278 279 // Stop IO280 __kernel_io_shutdown();281 263 282 264 // Destroy the main processor and its context in reverse order of construction … … 479 461 480 462 this.io.ctx = 0p; 481 this.io.lock = false; 463 this.io.pending = false; 464 this.io.dirty = false; 465 466 this.idle = eventfd(0, 0); 467 if (idle < 0) { 468 abort("KERNEL ERROR: PROCESSOR EVENTFD - %s\n", strerror(errno)); 469 } 482 470 483 471 #if !defined(__CFA_NO_STATISTICS__) … … 521 509 // Finally we don't need the read_lock any more 522 510 unregister((__processor_id_t*)&this); 511 512 close(this.idle); 523 513 } 524 514 525 515 void ?{}(processor & this, const char name[], cluster & _cltr) { 526 ( this.idle ){};527 516 ( this.terminated ){}; 528 517 ( this.runner ){}; … … 726 715 } 727 716 728 extern "C" {729 char * strerror(int);730 }731 #define CHECKED(x) { int err = x; if( err != 0 ) abort("KERNEL ERROR: Operation \"" #x "\" return error %d - %s\n", err, strerror(err)); }732 733 static void ?{}(__bin_sem_t & this) with( this ) {734 // Create the mutex with error checking735 pthread_mutexattr_t mattr;736 pthread_mutexattr_init( &mattr );737 pthread_mutexattr_settype( &mattr, PTHREAD_MUTEX_ERRORCHECK_NP);738 pthread_mutex_init(&lock, &mattr);739 740 pthread_cond_init (&cond, (const pthread_condattr_t *)0p); // workaround trac#208: cast should not be required741 val = 0;742 }743 744 static void ^?{}(__bin_sem_t & this) with( this ) {745 CHECKED( pthread_mutex_destroy(&lock) );746 CHECKED( pthread_cond_destroy (&cond) );747 }748 749 #undef CHECKED750 751 717 #if defined(__CFA_WITH_VERIFY__) 752 718 static bool verify_fwd_bck_rng(void) { -
libcfa/src/concurrency/stats.cfa
r6047b00 rdddb3dd0 33 33 stats->io.submit.slow = 0; 34 34 stats->io.flush.external = 0; 35 stats->io.calls. count= 0;35 stats->io.calls.flush = 0; 36 36 stats->io.calls.submitted = 0; 37 stats->io.calls.drain = 0; 37 38 stats->io.calls.completed = 0; 38 stats->io.calls.blocks = 0;39 39 stats->io.calls.errors.busy = 0; 40 40 stats->io.poller.sleeps = 0; … … 67 67 __atomic_fetch_add( &cltr->io.submit.slow , proc->io.submit.slow , __ATOMIC_SEQ_CST ); proc->io.submit.slow = 0; 68 68 __atomic_fetch_add( &cltr->io.flush.external , proc->io.flush.external , __ATOMIC_SEQ_CST ); proc->io.flush.external = 0; 69 __atomic_fetch_add( &cltr->io.calls. count , proc->io.calls.count , __ATOMIC_SEQ_CST ); proc->io.calls.count= 0;69 __atomic_fetch_add( &cltr->io.calls.flush , proc->io.calls.flush , __ATOMIC_SEQ_CST ); proc->io.calls.flush = 0; 70 70 __atomic_fetch_add( &cltr->io.calls.submitted , proc->io.calls.submitted , __ATOMIC_SEQ_CST ); proc->io.calls.submitted = 0; 71 __atomic_fetch_add( &cltr->io.calls.drain , proc->io.calls.drain , __ATOMIC_SEQ_CST ); proc->io.calls.drain = 0; 71 72 __atomic_fetch_add( &cltr->io.calls.completed , proc->io.calls.completed , __ATOMIC_SEQ_CST ); proc->io.calls.completed = 0; 72 __atomic_fetch_add( &cltr->io.calls.blocks , proc->io.calls.blocks , __ATOMIC_SEQ_CST ); proc->io.calls.blocks = 0;73 73 __atomic_fetch_add( &cltr->io.calls.errors.busy, proc->io.calls.errors.busy, __ATOMIC_SEQ_CST ); proc->io.calls.errors.busy = 0; 74 74 __atomic_fetch_add( &cltr->io.poller.sleeps , proc->io.poller.sleeps , __ATOMIC_SEQ_CST ); proc->io.poller.sleeps = 0; … … 110 110 double avgfasts = ((double)io.submit.fast) / total_submits; 111 111 112 double avgsubs = ((double)io.calls.submitted) / io.calls. count;113 double avgcomp = ((double)io.calls.completed) / io.calls. count;112 double avgsubs = ((double)io.calls.submitted) / io.calls.flush; 113 double avgcomp = ((double)io.calls.completed) / io.calls.drain; 114 114 115 115 __cfaabi_bits_print_safe( STDOUT_FILENO, … … 129 129 , io.submit.fast, io.submit.slow, avgfasts 130 130 , io.flush.external 131 , io.calls. count, io.calls.blocks, io.calls.errors.busy131 , io.calls.flush, io.calls.drain, io.calls.errors.busy 132 132 , io.calls.submitted, avgsubs 133 133 , io.calls.completed, avgcomp -
libcfa/src/concurrency/stats.hfa
r6047b00 rdddb3dd0 80 80 } flush; 81 81 struct { 82 volatile uint64_t count; 82 volatile uint64_t drain; 83 volatile uint64_t completed; 84 volatile uint64_t flush; 83 85 volatile uint64_t submitted; 84 volatile uint64_t completed;85 volatile uint64_t blocks;86 86 struct { 87 87 volatile uint64_t busy;
Note: See TracChangeset
for help on using the changeset viewer.