Changes in libcfa/src/concurrency/io.cfa [5751a56:3e2b9c9]
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
r5751a56 r3e2b9c9 14 14 // 15 15 16 #define __cforall_thread__ 17 16 18 #if defined(__CFA_DEBUG__) 17 19 // #define __CFA_DEBUG_PRINT_IO__ … … 19 21 #endif 20 22 21 #include "kernel.hfa" 22 #include "bitmanip.hfa" 23 24 #if !defined(CFA_HAVE_LINUX_IO_URING_H) 25 void __kernel_io_startup( cluster &, unsigned, bool ) { 26 // Nothing to do without io_uring 27 } 28 29 void __kernel_io_finish_start( cluster & ) { 30 // Nothing to do without io_uring 31 } 32 33 void __kernel_io_prepare_stop( cluster & ) { 34 // Nothing to do without io_uring 35 } 36 37 void __kernel_io_shutdown( cluster &, bool ) { 38 // Nothing to do without io_uring 39 } 40 41 #else 23 24 #if defined(CFA_HAVE_LINUX_IO_URING_H) 42 25 #define _GNU_SOURCE /* See feature_test_macros(7) */ 43 26 #include <errno.h> 27 #include <signal.h> 44 28 #include <stdint.h> 45 29 #include <string.h> 46 30 #include <unistd.h> 47 #include <sys/mman.h>48 31 49 32 extern "C" { 33 #include <sys/epoll.h> 50 34 #include <sys/syscall.h> 51 35 … … 53 37 } 54 38 55 #include "bits/signal.hfa" 56 #include "kernel_private.hfa" 57 #include "thread.hfa" 58 59 uint32_t entries_per_cluster() { 60 return 256; 61 } 62 63 static void * __io_poller_slow( void * arg ); 64 65 // Weirdly, some systems that do support io_uring don't actually define these 66 #ifdef __alpha__ 67 /* 68 * alpha is the only exception, all other architectures 69 * have common numbers for new system calls. 70 */ 71 #ifndef __NR_io_uring_setup 72 #define __NR_io_uring_setup 535 73 #endif 74 #ifndef __NR_io_uring_enter 75 #define __NR_io_uring_enter 536 76 #endif 77 #ifndef __NR_io_uring_register 78 #define __NR_io_uring_register 537 79 #endif 80 #else /* !__alpha__ */ 81 #ifndef __NR_io_uring_setup 82 #define __NR_io_uring_setup 425 83 #endif 84 #ifndef __NR_io_uring_enter 85 #define __NR_io_uring_enter 426 86 #endif 87 #ifndef __NR_io_uring_register 88 #define __NR_io_uring_register 427 89 #endif 90 #endif 91 92 // Fast poller user-thread 93 // Not using the "thread" keyword because we want to control 94 // more carefully when to start/stop it 95 struct __io_poller_fast { 96 struct __io_data * ring; 97 $thread thrd; 98 }; 99 100 void ?{}( __io_poller_fast & this, struct cluster & cltr ) { 101 this.ring = cltr.io; 102 (this.thrd){ "Fast I/O Poller", cltr }; 103 } 104 void ^?{}( __io_poller_fast & mutex this ); 105 void main( __io_poller_fast & this ); 106 static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; } 107 void ^?{}( __io_poller_fast & mutex this ) {} 108 109 struct __submition_data { 110 // Head and tail of the ring (associated with array) 111 volatile uint32_t * head; 112 volatile uint32_t * tail; 113 volatile uint32_t prev_head; 114 115 // The actual kernel ring which uses head/tail 116 // indexes into the sqes arrays 117 uint32_t * array; 118 119 // number of entries and mask to go with it 120 const uint32_t * num; 121 const uint32_t * mask; 122 123 // Submission flags (Not sure what for) 124 uint32_t * flags; 125 126 // number of sqes not submitted (whatever that means) 127 uint32_t * dropped; 128 129 // Like head/tail but not seen by the kernel 130 volatile uint32_t * ready; 131 uint32_t ready_cnt; 132 133 __spinlock_t lock; 134 __spinlock_t release_lock; 135 136 // A buffer of sqes (not the actual ring) 137 struct io_uring_sqe * sqes; 138 139 // The location and size of the mmaped area 140 void * ring_ptr; 141 size_t ring_sz; 142 }; 143 144 struct __completion_data { 145 // Head and tail of the ring 146 volatile uint32_t * head; 147 volatile uint32_t * tail; 148 149 // number of entries and mask to go with it 150 const uint32_t * mask; 151 const uint32_t * num; 152 153 // number of cqes not submitted (whatever that means) 154 uint32_t * overflow; 155 156 // the kernel ring 157 struct io_uring_cqe * cqes; 158 159 // The location and size of the mmaped area 160 void * ring_ptr; 161 size_t ring_sz; 162 }; 163 164 struct __io_data { 165 struct __submition_data submit_q; 166 struct __completion_data completion_q; 167 uint32_t ring_flags; 168 int cltr_flags; 169 int fd; 170 semaphore submit; 171 volatile bool done; 172 struct { 173 struct { 174 __processor_id_t id; 175 void * stack; 176 pthread_t kthrd; 177 volatile bool blocked; 178 } slow; 179 __io_poller_fast fast; 180 __bin_sem_t sem; 181 } poller; 182 }; 183 184 //============================================================================================= 185 // I/O Startup / Shutdown logic 186 //============================================================================================= 187 void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) { 188 if( (io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) && (io_flags & CFA_CLUSTER_IO_EAGER_SUBMITS) ) { 189 abort("CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS and CFA_CLUSTER_IO_EAGER_SUBMITS cannot be mixed\n"); 190 } 191 192 this.io = malloc(); 193 194 // Step 1 : call to setup 195 struct io_uring_params params; 196 memset(¶ms, 0, sizeof(params)); 197 if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS ) params.flags |= IORING_SETUP_SQPOLL; 198 if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES ) params.flags |= IORING_SETUP_IOPOLL; 199 200 uint32_t nentries = entries_per_cluster(); 201 202 int fd = syscall(__NR_io_uring_setup, nentries, ¶ms ); 203 if(fd < 0) { 204 abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno)); 205 } 206 207 // Step 2 : mmap result 208 memset( this.io, 0, sizeof(struct __io_data) ); 209 struct __submition_data & sq = this.io->submit_q; 210 struct __completion_data & cq = this.io->completion_q; 211 212 // calculate the right ring size 213 sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned) ); 214 cq.ring_sz = params.cq_off.cqes + (params.cq_entries * sizeof(struct io_uring_cqe)); 215 216 // Requires features 217 #if defined(IORING_FEAT_SINGLE_MMAP) 218 // adjust the size according to the parameters 219 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 220 cq.ring_sz = sq.ring_sz = max(cq.ring_sz, sq.ring_sz); 221 } 222 #endif 223 224 // mmap the Submit Queue into existence 225 sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING); 226 if (sq.ring_ptr == (void*)MAP_FAILED) { 227 abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno)); 228 } 229 230 // Requires features 231 #if defined(IORING_FEAT_SINGLE_MMAP) 232 // mmap the Completion Queue into existence (may or may not be needed) 233 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 234 cq.ring_ptr = sq.ring_ptr; 235 } 236 else 237 #endif 238 { 239 // We need multiple call to MMAP 240 cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING); 241 if (cq.ring_ptr == (void*)MAP_FAILED) { 242 munmap(sq.ring_ptr, sq.ring_sz); 243 abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno)); 244 } 245 } 246 247 // mmap the submit queue entries 248 size_t size = params.sq_entries * sizeof(struct io_uring_sqe); 249 sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES); 250 if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) { 251 munmap(sq.ring_ptr, sq.ring_sz); 252 if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz); 253 abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno)); 254 } 255 256 // Get the pointers from the kernel to fill the structure 257 // submit queue 258 sq.head = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head); 259 sq.tail = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail); 260 sq.mask = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask); 261 sq.num = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries); 262 sq.flags = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags); 263 sq.dropped = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped); 264 sq.array = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); 265 sq.prev_head = *sq.head; 266 267 { 268 const uint32_t num = *sq.num; 269 for( i; num ) { 270 sq.sqes[i].user_data = 0ul64; 271 } 272 } 273 274 (sq.lock){}; 275 (sq.release_lock){}; 276 277 if( io_flags & ( CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS | CFA_CLUSTER_IO_EAGER_SUBMITS ) ) { 278 /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8) ); 279 sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8); 280 sq.ready = alloc_align( 64, sq.ready_cnt ); 281 for(i; sq.ready_cnt) { 282 sq.ready[i] = -1ul32; 283 } 284 } 285 else { 286 sq.ready_cnt = 0; 287 sq.ready = 0p; 288 } 289 290 // completion queue 291 cq.head = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head); 292 cq.tail = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail); 293 cq.mask = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask); 294 cq.num = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries); 295 cq.overflow = ( uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow); 296 cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes); 297 298 // some paranoid checks 299 /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask ); 300 /* paranoid */ verifyf( (*cq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num ); 301 /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head ); 302 /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail ); 303 304 /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask ); 305 /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num ); 306 /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head ); 307 /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail ); 308 309 // Update the global ring info 310 this.io->ring_flags = params.flags; 311 this.io->cltr_flags = io_flags; 312 this.io->fd = fd; 313 this.io->done = false; 314 (this.io->submit){ min(*sq.num, *cq.num) }; 315 316 if(!main_cluster) { 317 __kernel_io_finish_start( this ); 318 } 319 } 320 321 void __kernel_io_finish_start( cluster & this ) { 322 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) { 323 __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this); 324 (this.io->poller.fast){ this }; 325 __thrd_start( this.io->poller.fast, main ); 326 } 327 328 // Create the poller thread 329 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluster %p\n", &this); 330 this.io->poller.slow.blocked = false; 331 this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this ); 332 } 333 334 void __kernel_io_prepare_stop( cluster & this ) { 335 __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this); 336 // Notify the poller thread of the shutdown 337 __atomic_store_n(&this.io->done, true, __ATOMIC_SEQ_CST); 338 339 // Stop the IO Poller 340 sigval val = { 1 }; 341 pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val ); 342 post( this.io->poller.sem ); 343 344 // Wait for the poller thread to finish 345 pthread_join( this.io->poller.slow.kthrd, 0p ); 346 free( this.io->poller.slow.stack ); 347 348 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this); 349 350 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) { 351 with( this.io->poller.fast ) { 352 /* paranoid */ verify( this.nprocessors == 0 || &this == mainCluster ); 353 /* paranoid */ verify( !ready_mutate_islocked() ); 354 355 // We need to adjust the clean-up based on where the thread is 356 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) { 357 358 ready_schedule_lock( (struct __processor_id_t *)active_processor() ); 359 360 // This is the tricky case 361 // The thread was preempted and now it is on the ready queue 362 // The thread should be the last on the list 363 /* paranoid */ verify( thrd.link.next != 0p ); 364 365 // Remove the thread from the ready queue of this cluster 366 __attribute__((unused)) bool removed = remove_head( &this, &thrd ); 367 /* paranoid */ verify( removed ); 368 thrd.link.next = 0p; 369 thrd.link.prev = 0p; 370 __cfaabi_dbg_debug_do( thrd.unpark_stale = true ); 371 372 // Fixup the thread state 373 thrd.state = Blocked; 374 thrd.ticket = 0; 375 thrd.preempted = __NO_PREEMPTION; 376 377 ready_schedule_unlock( (struct __processor_id_t *)active_processor() ); 378 379 // Pretend like the thread was blocked all along 380 } 381 // !!! This is not an else if !!! 382 if( thrd.state == Blocked ) { 383 384 // This is the "easy case" 385 // The thread is parked and can easily be moved to active cluster 386 verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster ); 387 thrd.curr_cluster = active_cluster(); 388 389 // unpark the fast io_poller 390 unpark( &thrd __cfaabi_dbg_ctx2 ); 391 } 392 else { 393 394 // The thread is in a weird state 395 // I don't know what to do here 396 abort("Fast poller thread is in unexpected state, cannot clean-up correctly\n"); 397 } 398 399 } 400 401 ^(this.io->poller.fast){}; 402 403 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this); 404 } 405 } 406 407 void __kernel_io_shutdown( cluster & this, bool main_cluster ) { 408 if(!main_cluster) { 409 __kernel_io_prepare_stop( this ); 410 } 411 412 // Shutdown the io rings 413 struct __submition_data & sq = this.io->submit_q; 414 struct __completion_data & cq = this.io->completion_q; 415 416 // unmap the submit queue entries 417 munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe)); 418 419 // unmap the Submit Queue ring 420 munmap(sq.ring_ptr, sq.ring_sz); 421 422 // unmap the Completion Queue ring, if it is different 423 if (cq.ring_ptr != sq.ring_ptr) { 424 munmap(cq.ring_ptr, cq.ring_sz); 425 } 426 427 // close the file descriptor 428 close(this.io->fd); 429 430 free( this.io->submit_q.ready ); // Maybe null, doesn't matter 431 free( this.io ); 432 } 433 434 int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get, sigset_t * mask ) { 39 #include "stats.hfa" 40 #include "kernel.hfa" 41 #include "kernel/fwd.hfa" 42 #include "io/types.hfa" 43 44 //============================================================================================= 45 // I/O Syscall 46 //============================================================================================= 47 static int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get ) { 435 48 bool need_sys_to_submit = false; 436 49 bool need_sys_to_complete = false; 437 unsigned min_complete = 0;438 50 unsigned flags = 0; 439 440 51 441 52 TO_SUBMIT: … … 451 62 } 452 63 453 TO_COMPLETE:454 64 if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { 455 65 flags |= IORING_ENTER_GETEVENTS; 456 if( mask ) {457 need_sys_to_complete = true;458 min_complete = 1;459 break TO_COMPLETE;460 }461 66 if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) { 462 67 need_sys_to_complete = true; … … 466 71 int ret = 0; 467 72 if( need_sys_to_submit || need_sys_to_complete ) { 468 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, min_complete, flags, mask, _NSIG / 8);73 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, 0p, _NSIG / 8); 469 74 if( ret < 0 ) { 470 75 switch((int)errno) { … … 490 95 static uint32_t __release_consumed_submission( struct __io_data & ring ); 491 96 492 static inline void process(struct io_uring_cqe & cqe , struct __processor_id_t * id) {97 static inline void process(struct io_uring_cqe & cqe ) { 493 98 struct __io_user_data_t * data = (struct __io_user_data_t *)(uintptr_t)cqe.user_data; 494 99 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", data, cqe.res, data->thrd ); 495 100 496 101 data->result = cqe.res; 497 if(!id) { unpark( data->thrd __cfaabi_dbg_ctx2 ); } 498 else { __unpark( id, data->thrd __cfaabi_dbg_ctx2 ); } 102 unpark( data->thrd __cfaabi_dbg_ctx2 ); 499 103 } 500 104 501 105 // Process a single completion message from the io_uring 502 106 // This is NOT thread-safe 503 static [int, bool] __drain_io( & struct __io_data ring , * sigset_t mask) {107 static [int, bool] __drain_io( & struct __io_data ring ) { 504 108 /* paranoid */ verify( !kernelTLS.preemption_state.enabled ); 505 109 506 110 unsigned to_submit = 0; 507 if( ring. cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) {111 if( ring.poller_submits ) { 508 112 // If the poller thread also submits, then we need to aggregate the submissions which are ready 509 113 to_submit = __collect_submitions( ring ); 510 114 } 511 115 512 int ret = __io_uring_enter(ring, to_submit, true , mask);116 int ret = __io_uring_enter(ring, to_submit, true); 513 117 if( ret < 0 ) { 514 118 return [0, true]; … … 547 151 /* paranoid */ verify(&cqe); 548 152 549 process( cqe, !mask ? (struct __processor_id_t *)0p : &ring.poller.slow.id ); 550 } 551 552 // Allow new submissions to happen 553 // V(ring.submit, count); 153 process( cqe ); 154 } 554 155 555 156 // Mark to the kernel that the cqe has been seen … … 561 162 } 562 163 563 static void * __io_poller_slow( void * arg ) { 564 #if !defined( __CFA_NO_STATISTICS__ ) 565 __stats_t local_stats; 566 __init_stats( &local_stats ); 567 kernelTLS.this_stats = &local_stats; 568 #endif 569 570 cluster * cltr = (cluster *)arg; 571 struct __io_data & ring = *cltr->io; 572 573 ring.poller.slow.id.id = doregister( &ring.poller.slow.id ); 574 575 sigset_t mask; 576 sigfillset(&mask); 577 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) { 578 abort( "KERNEL ERROR: IO_URING - pthread_sigmask" ); 579 } 580 581 sigdelset( &mask, SIGUSR1 ); 582 583 verify( (*ring.submit_q.head) == (*ring.submit_q.tail) ); 584 verify( (*ring.completion_q.head) == (*ring.completion_q.tail) ); 585 586 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p ready\n", &ring); 587 588 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) { 589 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 590 591 __atomic_store_n( &ring.poller.slow.blocked, true, __ATOMIC_SEQ_CST ); 592 593 // In the user-thread approach drain and if anything was drained, 594 // batton pass to the user-thread 595 int count; 596 bool again; 597 [count, again] = __drain_io( ring, &mask ); 598 599 __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST ); 600 601 // Update statistics 602 __STATS__( true, 603 io.complete_q.completed_avg.val += count; 604 io.complete_q.completed_avg.slow_cnt += 1; 605 ) 606 607 if(again) { 608 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring); 609 __unpark( &ring.poller.slow.id, &ring.poller.fast.thrd __cfaabi_dbg_ctx2 ); 610 wait( ring.poller.sem ); 611 } 612 } 613 } 614 else { 615 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 616 //In the naive approach, just poll the io completion queue directly 617 int count; 618 bool again; 619 [count, again] = __drain_io( ring, &mask ); 620 621 // Update statistics 622 __STATS__( true, 623 io.complete_q.completed_avg.val += count; 624 io.complete_q.completed_avg.slow_cnt += 1; 625 ) 626 } 627 } 628 629 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring); 630 631 unregister( &ring.poller.slow.id ); 632 633 #if !defined(__CFA_NO_STATISTICS__) 634 __tally_stats(cltr->stats, &local_stats); 635 #endif 636 637 return 0p; 638 } 639 640 void main( __io_poller_fast & this ) { 641 verify( this.ring->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ); 642 643 // Start parked 644 park( __cfaabi_dbg_ctx ); 645 646 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring); 164 void main( $io_ctx_thread & this ) { 165 epoll_event ev; 166 __ioctx_register( this, ev ); 167 168 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %p for ring %p ready\n", &this, &this.ring); 647 169 648 170 int reset = 0; 649 650 171 // Then loop until we need to start 651 while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) { 652 172 while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) { 653 173 // Drain the io 654 174 int count; 655 175 bool again; 656 176 disable_interrupts(); 657 [count, again] = __drain_io( *this.ring , 0p);177 [count, again] = __drain_io( *this.ring ); 658 178 659 179 if(!again) reset++; … … 672 192 // We didn't get anything baton pass to the slow poller 673 193 else { 674 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);194 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self); 675 195 reset = 0; 676 196 677 // wake up the slow poller 678 post( this.ring->poller.sem ); 679 680 // park this thread 681 park( __cfaabi_dbg_ctx ); 197 // block this thread 198 __ioctx_prepare_block( this, ev ); 199 wait( this.sem ); 682 200 } 683 201 } 684 202 685 203 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring); 686 }687 688 static inline void __wake_poller( struct __io_data & ring ) __attribute__((artificial));689 static inline void __wake_poller( struct __io_data & ring ) {690 if(!__atomic_load_n( &ring.poller.slow.blocked, __ATOMIC_SEQ_CST)) return;691 692 sigval val = { 1 };693 pthread_sigqueue( ring.poller.slow.kthrd, SIGUSR1, val );694 204 } 695 205 … … 806 316 } 807 317 808 void __submit( struct __io_data & ring, uint32_t idx ) { 318 void __submit( struct io_context * ctx, uint32_t idx ) __attribute__((nonnull (1))) { 319 __io_data & ring = *ctx->thrd.ring; 809 320 // Get now the data we definetely need 810 321 uint32_t * const tail = ring.submit_q.tail; 811 const uint32_t mask = *ring.submit_q.mask;322 const uint32_t mask = *ring.submit_q.mask; 812 323 813 324 // There are 2 submission schemes, check which one we are using 814 if( ring. cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) {325 if( ring.poller_submits ) { 815 326 // If the poller thread submits, then we just need to add this to the ready array 816 327 __submit_to_ready_array( ring, idx, mask ); 817 328 818 __wake_poller( ring);329 post( ctx->thrd.sem ); 819 330 820 331 __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() ); 821 332 } 822 else if( ring. cltr_flags & CFA_CLUSTER_IO_EAGER_SUBMITS) {333 else if( ring.eager_submits ) { 823 334 uint32_t picked = __submit_to_ready_array( ring, idx, mask ); 824 335 … … 849 360 // We got the lock 850 361 unsigned to_submit = __collect_submitions( ring ); 851 int ret = __io_uring_enter( ring, to_submit, false , 0p);362 int ret = __io_uring_enter( ring, to_submit, false ); 852 363 if( ret < 0 ) { 853 364 unlock(ring.submit_q.lock); … … 892 403 893 404 // Submit however, many entries need to be submitted 894 int ret = __io_uring_enter( ring, 1, false , 0p);405 int ret = __io_uring_enter( ring, 1, false ); 895 406 if( ret < 0 ) { 896 407 switch((int)errno) { … … 958 469 return count; 959 470 } 960 961 //=============================================================================================962 // I/O Submissions963 //=============================================================================================964 965 void register_fixed_files( cluster & cl, int * files, unsigned count ) {966 int ret = syscall( __NR_io_uring_register, cl.io->fd, IORING_REGISTER_FILES, files, count );967 if( ret < 0 ) {968 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );969 }970 971 __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret );972 }973 471 #endif
Note: See TracChangeset
for help on using the changeset viewer.