Changeset f00b26d4
- Timestamp:
- Jul 30, 2020, 3:00:19 PM (4 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 920dca3
- Parents:
- e0f93e0
- Location:
- libcfa/src/concurrency
- Files:
-
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
re0f93e0 rf00b26d4 16 16 #if defined(__CFA_DEBUG__) 17 17 // #define __CFA_DEBUG_PRINT_IO__ 18 //#define __CFA_DEBUG_PRINT_IO_CORE__18 #define __CFA_DEBUG_PRINT_IO_CORE__ 19 19 #endif 20 20 21 #include "kernel .hfa"21 #include "kernel_private.hfa" 22 22 #include "bitmanip.hfa" 23 23 24 24 #if !defined(CFA_HAVE_LINUX_IO_URING_H) 25 void __kernel_io_startup( cluster &, unsigned, bool) {25 void __kernel_io_startup() { 26 26 // Nothing to do without io_uring 27 27 } 28 28 29 void __kernel_io_ finish_start( cluster &) {29 void __kernel_io_shutdown() { 30 30 // Nothing to do without io_uring 31 31 } 32 32 33 void __kernel_io_prepare_stop( cluster & ) { 34 // Nothing to do without io_uring 35 } 36 37 void __kernel_io_shutdown( cluster &, bool ) { 38 // Nothing to do without io_uring 39 } 33 void ?{}(io_context & this, struct cluster & cl) {} 34 void ?{}(io_context & this, struct cluster & cl, const io_context_params & params) {} 35 36 void ^?{}(io_context & this) {} 37 void ^?{}(io_context & this, bool cluster_context) {} 40 38 41 39 #else … … 45 43 #include <string.h> 46 44 #include <unistd.h> 47 #include <sys/mman.h>48 45 49 46 extern "C" { 47 #include <sys/epoll.h> 48 #include <sys/mman.h> 50 49 #include <sys/syscall.h> 51 50 … … 57 56 #include "thread.hfa" 58 57 59 uint32_t entries_per_cluster() { 60 return 256; 58 void ?{}(io_context_params & this) { 59 this.num_entries = 256; 60 this.num_ready = 256; 61 this.submit_aff = -1; 62 this.eager_submits = false; 63 this.poller_submits = false; 64 this.poll_submit = false; 65 this.poll_complete = false; 61 66 } 62 67 … … 90 95 #endif 91 96 92 // Fast poller user-thread93 // Not using the "thread" keyword because we want to control94 // more carefully when to start/stop it95 struct __io_poller_fast {96 struct __io_data * ring;97 $thread thrd;98 };99 100 void ?{}( __io_poller_fast & this, struct cluster & cltr ) {101 this.ring = cltr.io;102 (this.thrd){ "Fast I/O Poller", cltr };103 }104 void ^?{}( __io_poller_fast & mutex this );105 void main( __io_poller_fast & this );106 static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }107 void ^?{}( __io_poller_fast & mutex this ) {}108 109 97 struct __submition_data { 110 98 // Head and tail of the ring (associated with array) … … 166 154 struct __completion_data completion_q; 167 155 uint32_t ring_flags; 168 int cltr_flags;169 156 int fd; 170 semaphore submit; 171 volatile bool done; 172 struct { 173 struct { 174 __processor_id_t id; 175 void * stack; 176 pthread_t kthrd; 177 volatile bool blocked; 178 } slow; 179 __io_poller_fast fast; 180 __bin_sem_t sem; 181 } poller; 157 bool eager_submits:1; 158 bool poller_submits:1; 182 159 }; 183 160 184 161 //============================================================================================= 185 // I/O Startup / Shutdown logic 162 // I/O Startup / Shutdown logic + Master Poller 186 163 //============================================================================================= 187 void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) { 188 if( (io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) && (io_flags & CFA_CLUSTER_IO_EAGER_SUBMITS) ) { 189 abort("CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS and CFA_CLUSTER_IO_EAGER_SUBMITS cannot be mixed\n"); 190 } 191 192 this.io = malloc(); 193 164 165 // IO Master poller loop forward 166 static void * iopoll_loop( __attribute__((unused)) void * args ); 167 168 static struct { 169 pthread_t thrd; // pthread handle to io poller thread 170 void * stack; // pthread stack for io poller thread 171 int epollfd; // file descriptor to the epoll instance 172 volatile bool run; // Whether or not to continue 173 } iopoll; 174 175 void __kernel_io_startup() { 176 __cfaabi_dbg_print_safe( "Kernel : Creating EPOLL instance\n" ); 177 178 iopoll.epollfd = epoll_create1(0); 179 if (iopoll.epollfd == -1) { 180 abort( "internal error, epoll_create1\n"); 181 } 182 183 __cfaabi_dbg_print_safe( "Kernel : Starting io poller thread\n" ); 184 185 iopoll.run = true; 186 iopoll.stack = __create_pthread( &iopoll.thrd, iopoll_loop, 0p ); 187 } 188 189 void __kernel_io_shutdown() { 190 // Notify the io poller thread of the shutdown 191 iopoll.run = false; 192 sigval val = { 1 }; 193 pthread_sigqueue( iopoll.thrd, SIGUSR1, val ); 194 195 // Wait for the io poller thread to finish 196 197 pthread_join( iopoll.thrd, 0p ); 198 free( iopoll.stack ); 199 200 int ret = close(iopoll.epollfd); 201 if (ret == -1) { 202 abort( "internal error, close epoll\n"); 203 } 204 205 // Io polling is now fully stopped 206 207 __cfaabi_dbg_print_safe( "Kernel : IO poller stopped\n" ); 208 } 209 210 static void * iopoll_loop( __attribute__((unused)) void * args ) { 211 __processor_id_t id; 212 id.id = doregister(&id); 213 __cfaabi_dbg_print_safe( "Kernel : IO poller thread starting\n" ); 214 215 // Block signals to control when they arrive 216 sigset_t mask; 217 sigfillset(&mask); 218 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) { 219 abort( "internal error, pthread_sigmask" ); 220 } 221 222 sigdelset( &mask, SIGUSR1 ); 223 224 // Create sufficient events 225 struct epoll_event events[10]; 226 // Main loop 227 while( iopoll.run ) { 228 // Wait for events 229 int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask ); 230 231 // Check if an error occured 232 if (nfds == -1) { 233 if( errno == EINTR ) continue; 234 abort( "internal error, pthread_sigmask" ); 235 } 236 237 for(i; nfds) { 238 $io_ctx_thread * io_ctx = ($io_ctx_thread *)(uintptr_t)events[i].data.u64; 239 /* paranoid */ verify( io_ctx ); 240 __cfadbg_print_safe(io_core, "Kernel I/O : Unparking io poller %p\n", io_ctx); 241 #if !defined( __CFA_NO_STATISTICS__ ) 242 kernelTLS.this_stats = io_ctx->self.curr_cluster->stats; 243 #endif 244 __post( io_ctx->sem, &id ); 245 } 246 } 247 248 __cfaabi_dbg_print_safe( "Kernel : IO poller thread stopping\n" ); 249 unregister(&id); 250 return 0p; 251 } 252 253 //============================================================================================= 254 // I/O Context Constrution/Destruction 255 //============================================================================================= 256 257 void ?{}($io_ctx_thread & this, struct cluster & cl) { (this.self){ "IO Poller", cl }; } 258 void main( $io_ctx_thread & this ); 259 static inline $thread * get_thread( $io_ctx_thread & this ) { return &this.self; } 260 void ^?{}( $io_ctx_thread & mutex this ) {} 261 262 static void __io_create ( __io_data & this, const io_context_params & params_in ); 263 static void __io_destroy( __io_data & this ); 264 265 void ?{}(io_context & this, struct cluster & cl, const io_context_params & params) { 266 (this.thrd){ cl }; 267 this.thrd.ring = malloc(); 268 __cfadbg_print_safe(io_core, "Kernel I/O : Creating ring for io_context %p\n", &this); 269 __io_create( *this.thrd.ring, params ); 270 271 __cfadbg_print_safe(io_core, "Kernel I/O : Starting poller thread for io_context %p\n", &this); 272 this.thrd.done = false; 273 __thrd_start( this.thrd, main ); 274 275 __cfadbg_print_safe(io_core, "Kernel I/O : io_context %p ready\n", &this); 276 } 277 278 void ?{}(io_context & this, struct cluster & cl) { 279 io_context_params params; 280 (this){ cl, params }; 281 } 282 283 void ^?{}(io_context & this, bool cluster_context) { 284 __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %p\n", &this); 285 286 // Notify the thread of the shutdown 287 __atomic_store_n(&this.thrd.done, true, __ATOMIC_SEQ_CST); 288 289 // If this is an io_context within a cluster, things get trickier 290 $thread & thrd = this.thrd.self; 291 if( cluster_context ) { 292 cluster & cltr = *thrd.curr_cluster; 293 /* paranoid */ verify( cltr.nprocessors == 0 || &cltr == mainCluster ); 294 /* paranoid */ verify( !ready_mutate_islocked() ); 295 296 // We need to adjust the clean-up based on where the thread is 297 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) { 298 299 ready_schedule_lock( (struct __processor_id_t *)active_processor() ); 300 301 // This is the tricky case 302 // The thread was preempted and now it is on the ready queue 303 // The thread should be the last on the list 304 /* paranoid */ verify( thrd.link.next != 0p ); 305 306 // Remove the thread from the ready queue of this cluster 307 __attribute__((unused)) bool removed = remove_head( &cltr, &thrd ); 308 /* paranoid */ verify( removed ); 309 thrd.link.next = 0p; 310 thrd.link.prev = 0p; 311 __cfaabi_dbg_debug_do( thrd.unpark_stale = true ); 312 313 // Fixup the thread state 314 thrd.state = Blocked; 315 thrd.ticket = 0; 316 thrd.preempted = __NO_PREEMPTION; 317 318 ready_schedule_unlock( (struct __processor_id_t *)active_processor() ); 319 320 // Pretend like the thread was blocked all along 321 } 322 // !!! This is not an else if !!! 323 if( thrd.state == Blocked ) { 324 325 // This is the "easy case" 326 // The thread is parked and can easily be moved to active cluster 327 verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster ); 328 thrd.curr_cluster = active_cluster(); 329 330 // unpark the fast io_poller 331 unpark( &thrd __cfaabi_dbg_ctx2 ); 332 } 333 else { 334 335 // The thread is in a weird state 336 // I don't know what to do here 337 abort("io_context poller thread is in unexpected state, cannot clean-up correctly\n"); 338 } 339 } else { 340 unpark( &thrd __cfaabi_dbg_ctx2 ); 341 } 342 343 ^(this.thrd){}; 344 __cfadbg_print_safe(io_core, "Kernel I/O : Stopped poller thread for io_context %p\n", &this); 345 346 __io_destroy( *this.thrd.ring ); 347 __cfadbg_print_safe(io_core, "Kernel I/O : Destroyed ring for io_context %p\n", &this); 348 349 free(this.thrd.ring); 350 } 351 352 void ^?{}(io_context & this) { 353 ^(this){ false }; 354 } 355 356 static void __io_create( __io_data & this, const io_context_params & params_in ) { 194 357 // Step 1 : call to setup 195 358 struct io_uring_params params; 196 359 memset(¶ms, 0, sizeof(params)); 197 if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS) params.flags |= IORING_SETUP_SQPOLL;198 if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES) params.flags |= IORING_SETUP_IOPOLL;199 200 uint32_t nentries = entries_per_cluster();360 if( params_in.poll_submit ) params.flags |= IORING_SETUP_SQPOLL; 361 if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL; 362 363 uint32_t nentries = params_in.num_entries; 201 364 202 365 int fd = syscall(__NR_io_uring_setup, nentries, ¶ms ); … … 206 369 207 370 // Step 2 : mmap result 208 memset( this.io, 0, sizeof(struct __io_data) );209 struct __submition_data & sq = this. io->submit_q;210 struct __completion_data & cq = this. io->completion_q;371 memset( &this, 0, sizeof(struct __io_data) ); 372 struct __submition_data & sq = this.submit_q; 373 struct __completion_data & cq = this.completion_q; 211 374 212 375 // calculate the right ring size … … 275 438 (sq.release_lock){}; 276 439 277 if( io_flags & ( CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS | CFA_CLUSTER_IO_EAGER_SUBMITS )) {278 /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8));279 sq.ready_cnt = max( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8);440 if( params_in.poller_submits || params_in.eager_submits ) { 441 /* paranoid */ verify( is_pow2( params_in.num_ready ) || (params_in.num_ready < 8) ); 442 sq.ready_cnt = max( params_in.num_ready, 8 ); 280 443 sq.ready = alloc_align( 64, sq.ready_cnt ); 281 444 for(i; sq.ready_cnt) { … … 308 471 309 472 // Update the global ring info 310 this.io->ring_flags = params.flags; 311 this.io->cltr_flags = io_flags; 312 this.io->fd = fd; 313 this.io->done = false; 314 (this.io->submit){ min(*sq.num, *cq.num) }; 315 316 if(!main_cluster) { 317 __kernel_io_finish_start( this ); 318 } 319 } 320 321 void __kernel_io_finish_start( cluster & this ) { 322 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) { 323 __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this); 324 (this.io->poller.fast){ this }; 325 __thrd_start( this.io->poller.fast, main ); 326 } 327 328 // Create the poller thread 329 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluster %p\n", &this); 330 this.io->poller.slow.blocked = false; 331 this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this ); 332 } 333 334 void __kernel_io_prepare_stop( cluster & this ) { 335 __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this); 336 // Notify the poller thread of the shutdown 337 __atomic_store_n(&this.io->done, true, __ATOMIC_SEQ_CST); 338 339 // Stop the IO Poller 340 sigval val = { 1 }; 341 pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val ); 342 post( this.io->poller.sem ); 343 344 // Wait for the poller thread to finish 345 pthread_join( this.io->poller.slow.kthrd, 0p ); 346 free( this.io->poller.slow.stack ); 347 348 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this); 349 350 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) { 351 with( this.io->poller.fast ) { 352 /* paranoid */ verify( this.nprocessors == 0 || &this == mainCluster ); 353 /* paranoid */ verify( !ready_mutate_islocked() ); 354 355 // We need to adjust the clean-up based on where the thread is 356 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) { 357 358 ready_schedule_lock( (struct __processor_id_t *)active_processor() ); 359 360 // This is the tricky case 361 // The thread was preempted and now it is on the ready queue 362 // The thread should be the last on the list 363 /* paranoid */ verify( thrd.link.next != 0p ); 364 365 // Remove the thread from the ready queue of this cluster 366 __attribute__((unused)) bool removed = remove_head( &this, &thrd ); 367 /* paranoid */ verify( removed ); 368 thrd.link.next = 0p; 369 thrd.link.prev = 0p; 370 __cfaabi_dbg_debug_do( thrd.unpark_stale = true ); 371 372 // Fixup the thread state 373 thrd.state = Blocked; 374 thrd.ticket = 0; 375 thrd.preempted = __NO_PREEMPTION; 376 377 ready_schedule_unlock( (struct __processor_id_t *)active_processor() ); 378 379 // Pretend like the thread was blocked all along 380 } 381 // !!! This is not an else if !!! 382 if( thrd.state == Blocked ) { 383 384 // This is the "easy case" 385 // The thread is parked and can easily be moved to active cluster 386 verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster ); 387 thrd.curr_cluster = active_cluster(); 388 389 // unpark the fast io_poller 390 unpark( &thrd __cfaabi_dbg_ctx2 ); 391 } 392 else { 393 394 // The thread is in a weird state 395 // I don't know what to do here 396 abort("Fast poller thread is in unexpected state, cannot clean-up correctly\n"); 397 } 398 399 } 400 401 ^(this.io->poller.fast){}; 402 403 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this); 404 } 405 } 406 407 void __kernel_io_shutdown( cluster & this, bool main_cluster ) { 408 if(!main_cluster) { 409 __kernel_io_prepare_stop( this ); 410 } 411 473 this.ring_flags = params.flags; 474 this.fd = fd; 475 this.eager_submits = params_in.eager_submits; 476 this.poller_submits = params_in.poller_submits; 477 } 478 479 void __io_destroy( __io_data & this ) { 412 480 // Shutdown the io rings 413 struct __submition_data & sq = this. io->submit_q;414 struct __completion_data & cq = this. io->completion_q;481 struct __submition_data & sq = this.submit_q; 482 struct __completion_data & cq = this.completion_q; 415 483 416 484 // unmap the submit queue entries … … 426 494 427 495 // close the file descriptor 428 close(this.io->fd); 429 430 free( this.io->submit_q.ready ); // Maybe null, doesn't matter 431 free( this.io ); 432 } 433 434 int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get, sigset_t * mask ) { 496 close(this.fd); 497 498 free( this.submit_q.ready ); // Maybe null, doesn't matter 499 } 500 501 int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get ) { 435 502 bool need_sys_to_submit = false; 436 503 bool need_sys_to_complete = false; 437 unsigned min_complete = 0;438 504 unsigned flags = 0; 439 440 505 441 506 TO_SUBMIT: … … 451 516 } 452 517 453 TO_COMPLETE:454 518 if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { 455 519 flags |= IORING_ENTER_GETEVENTS; 456 if( mask ) {457 need_sys_to_complete = true;458 min_complete = 1;459 break TO_COMPLETE;460 }461 520 if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) { 462 521 need_sys_to_complete = true; … … 466 525 int ret = 0; 467 526 if( need_sys_to_submit || need_sys_to_complete ) { 468 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, min_complete, flags, mask, _NSIG / 8);527 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, 0p, _NSIG / 8); 469 528 if( ret < 0 ) { 470 529 switch((int)errno) { … … 490 549 static uint32_t __release_consumed_submission( struct __io_data & ring ); 491 550 492 static inline void process(struct io_uring_cqe & cqe , struct __processor_id_t * id) {551 static inline void process(struct io_uring_cqe & cqe ) { 493 552 struct __io_user_data_t * data = (struct __io_user_data_t *)(uintptr_t)cqe.user_data; 494 553 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", data, cqe.res, data->thrd ); 495 554 496 555 data->result = cqe.res; 497 if(!id) { unpark( data->thrd __cfaabi_dbg_ctx2 ); } 498 else { __unpark( id, data->thrd __cfaabi_dbg_ctx2 ); } 556 unpark( data->thrd __cfaabi_dbg_ctx2 ); 499 557 } 500 558 501 559 // Process a single completion message from the io_uring 502 560 // This is NOT thread-safe 503 static [int, bool] __drain_io( & struct __io_data ring , * sigset_t mask) {561 static [int, bool] __drain_io( & struct __io_data ring ) { 504 562 /* paranoid */ verify( !kernelTLS.preemption_state.enabled ); 505 563 506 564 unsigned to_submit = 0; 507 if( ring. cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) {565 if( ring.poller_submits ) { 508 566 // If the poller thread also submits, then we need to aggregate the submissions which are ready 509 567 to_submit = __collect_submitions( ring ); 510 568 } 511 569 512 int ret = __io_uring_enter(ring, to_submit, true , mask);570 int ret = __io_uring_enter(ring, to_submit, true); 513 571 if( ret < 0 ) { 514 572 return [0, true]; … … 547 605 /* paranoid */ verify(&cqe); 548 606 549 process( cqe, !mask ? (struct __processor_id_t *)0p : &ring.poller.slow.id ); 550 } 551 552 // Allow new submissions to happen 553 // V(ring.submit, count); 607 process( cqe ); 608 } 554 609 555 610 // Mark to the kernel that the cqe has been seen … … 561 616 } 562 617 563 static void * __io_poller_slow( void * arg ) { 564 #if !defined( __CFA_NO_STATISTICS__ ) 565 __stats_t local_stats; 566 __init_stats( &local_stats ); 567 kernelTLS.this_stats = &local_stats; 568 #endif 569 570 cluster * cltr = (cluster *)arg; 571 struct __io_data & ring = *cltr->io; 572 573 ring.poller.slow.id.id = doregister( &ring.poller.slow.id ); 574 575 sigset_t mask; 576 sigfillset(&mask); 577 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) { 578 abort( "KERNEL ERROR: IO_URING - pthread_sigmask" ); 579 } 580 581 sigdelset( &mask, SIGUSR1 ); 582 583 verify( (*ring.submit_q.head) == (*ring.submit_q.tail) ); 584 verify( (*ring.completion_q.head) == (*ring.completion_q.tail) ); 585 586 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p ready\n", &ring); 587 588 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) { 589 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 590 591 __atomic_store_n( &ring.poller.slow.blocked, true, __ATOMIC_SEQ_CST ); 592 593 // In the user-thread approach drain and if anything was drained, 594 // batton pass to the user-thread 595 int count; 596 bool again; 597 [count, again] = __drain_io( ring, &mask ); 598 599 __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST ); 600 601 // Update statistics 602 __STATS__( true, 603 io.complete_q.completed_avg.val += count; 604 io.complete_q.completed_avg.slow_cnt += 1; 605 ) 606 607 if(again) { 608 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring); 609 __unpark( &ring.poller.slow.id, &ring.poller.fast.thrd __cfaabi_dbg_ctx2 ); 610 wait( ring.poller.sem ); 611 } 612 } 613 } 614 else { 615 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 616 //In the naive approach, just poll the io completion queue directly 617 int count; 618 bool again; 619 [count, again] = __drain_io( ring, &mask ); 620 621 // Update statistics 622 __STATS__( true, 623 io.complete_q.completed_avg.val += count; 624 io.complete_q.completed_avg.slow_cnt += 1; 625 ) 626 } 627 } 628 629 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring); 630 631 unregister( &ring.poller.slow.id ); 632 633 #if !defined(__CFA_NO_STATISTICS__) 634 __tally_stats(cltr->stats, &local_stats); 635 #endif 636 637 return 0p; 638 } 639 640 void main( __io_poller_fast & this ) { 641 verify( this.ring->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ); 642 643 // Start parked 644 park( __cfaabi_dbg_ctx ); 645 646 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring); 618 void main( $io_ctx_thread & this ) { 619 epoll_event ev; 620 ev.events = EPOLLIN | EPOLLONESHOT; 621 ev.data.u64 = (uint64_t)&this; 622 int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_ADD, this.ring->fd, &ev); 623 if (ret < 0) { 624 abort( "KERNEL ERROR: EPOLL ADD - (%d) %s\n", (int)errno, strerror(errno) ); 625 } 626 627 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %p for ring %p ready\n", &this, &this.ring); 647 628 648 629 int reset = 0; 649 650 630 // Then loop until we need to start 651 while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) { 652 631 while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) { 653 632 // Drain the io 654 633 int count; 655 634 bool again; 656 635 disable_interrupts(); 657 [count, again] = __drain_io( *this.ring , 0p);636 [count, again] = __drain_io( *this.ring ); 658 637 659 638 if(!again) reset++; … … 672 651 // We didn't get anything baton pass to the slow poller 673 652 else { 674 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);653 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self); 675 654 reset = 0; 676 655 677 656 // wake up the slow poller 678 post( this.ring->poller.sem ); 657 ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_MOD, this.ring->fd, &ev); 658 if (ret < 0) { 659 abort( "KERNEL ERROR: EPOLL REARM - (%d) %s\n", (int)errno, strerror(errno) ); 660 } 679 661 680 662 // park this thread 681 park( __cfaabi_dbg_ctx);663 wait( this.sem ); 682 664 } 683 665 } 684 666 685 667 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring); 686 }687 688 static inline void __wake_poller( struct __io_data & ring ) __attribute__((artificial));689 static inline void __wake_poller( struct __io_data & ring ) {690 if(!__atomic_load_n( &ring.poller.slow.blocked, __ATOMIC_SEQ_CST)) return;691 692 sigval val = { 1 };693 pthread_sigqueue( ring.poller.slow.kthrd, SIGUSR1, val );694 668 } 695 669 … … 806 780 } 807 781 808 void __submit( struct __io_data & ring, uint32_t idx ) { 782 void __submit( struct io_context * ctx, uint32_t idx ) __attribute__((nonnull (1))) { 783 __io_data & ring = *ctx->thrd.ring; 809 784 // Get now the data we definetely need 810 785 uint32_t * const tail = ring.submit_q.tail; 811 const uint32_t mask = *ring.submit_q.mask;786 const uint32_t mask = *ring.submit_q.mask; 812 787 813 788 // There are 2 submission schemes, check which one we are using 814 if( ring. cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) {789 if( ring.poller_submits ) { 815 790 // If the poller thread submits, then we just need to add this to the ready array 816 791 __submit_to_ready_array( ring, idx, mask ); 817 792 818 __wake_poller( ring);793 post( ctx->thrd.sem ); 819 794 820 795 __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() ); 821 796 } 822 else if( ring. cltr_flags & CFA_CLUSTER_IO_EAGER_SUBMITS) {797 else if( ring.eager_submits ) { 823 798 uint32_t picked = __submit_to_ready_array( ring, idx, mask ); 824 799 … … 849 824 // We got the lock 850 825 unsigned to_submit = __collect_submitions( ring ); 851 int ret = __io_uring_enter( ring, to_submit, false , 0p);826 int ret = __io_uring_enter( ring, to_submit, false ); 852 827 if( ret < 0 ) { 853 828 unlock(ring.submit_q.lock); … … 892 867 893 868 // Submit however, many entries need to be submitted 894 int ret = __io_uring_enter( ring, 1, false , 0p);869 int ret = __io_uring_enter( ring, 1, false ); 895 870 if( ret < 0 ) { 896 871 switch((int)errno) { … … 963 938 //============================================================================================= 964 939 965 void register_fixed_files( cluster & cl, int * files, unsigned count ) {966 int ret = syscall( __NR_io_uring_register, c l.io->fd, IORING_REGISTER_FILES, files, count );940 void register_fixed_files( io_context & ctx, int * files, unsigned count ) { 941 int ret = syscall( __NR_io_uring_register, ctx.thrd.ring->fd, IORING_REGISTER_FILES, files, count ); 967 942 if( ret < 0 ) { 968 943 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); … … 971 946 __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret ); 972 947 } 948 949 void register_fixed_files( cluster & cltr, int * files, unsigned count ) { 950 for(i; cltr.io.cnt) { 951 register_fixed_files( cltr.io.ctxs[i], files, count ); 952 } 953 } 973 954 #endif -
libcfa/src/concurrency/iocall.cfa
re0f93e0 rf00b26d4 22 22 #if defined(CFA_HAVE_LINUX_IO_URING_H) 23 23 #include <stdint.h> 24 #include <errno.h> 24 25 #include <linux/io_uring.h> 25 26 … … 27 28 28 29 extern [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ); 29 extern void __submit( struct __io_data & ring, uint32_t idx);30 extern void __submit( struct io_context * ctx, uint32_t idx ) __attribute__((nonnull (1))); 30 31 31 32 static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd) { … … 52 53 } 53 54 55 56 57 #if defined(CFA_HAVE_IOSQE_FIXED_FILE) && defined(CFA_HAVE_IOSQE_IO_DRAIN) && defined(CFA_HAVE_IOSQE_ASYNC) 58 #define REGULAR_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_DRAIN | IOSQE_ASYNC) 59 #elif defined(CFA_HAVE_IOSQE_FIXED_FILE) && defined(CFA_HAVE_IOSQE_ASYNC) 60 #define REGULAR_FLAGS (IOSQE_FIXED_FILE | IOSQE_ASYNC) 61 #elif defined(CFA_HAVE_IOSQE_FIXED_FILE) && defined(CFA_HAVE_IOSQE_IO_DRAIN) 62 #define REGULAR_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_DRAIN) 63 #elif defined(CFA_HAVE_IOSQE_IO_DRAIN) && defined(CFA_HAVE_IOSQE_ASYNC) 64 #define REGULAR_FLAGS (IOSQE_IO_DRAIN | IOSQE_ASYNC) 65 #elif defined(CFA_HAVE_IOSQE_FIXED_FILE) 66 #define REGULAR_FLAGS (IOSQE_FIXED_FILE) 67 #elif defined(CFA_HAVE_IOSQE_IO_DRAIN) 68 #define REGULAR_FLAGS (IOSQE_IO_DRAIN) 69 #elif defined(CFA_HAVE_IOSQE_ASYNC) 70 #define REGULAR_FLAGS (IOSQE_ASYNC) 71 #else 72 #define REGULAR_FLAGS (0) 73 #endif 74 75 #if defined(CFA_HAVE_IOSQE_IO_LINK) && defined(CFA_HAVE_IOSQE_IO_HARDLINK) 76 #define LINK_FLAGS (IOSQE_IO_LINK | IOSQE_IO_HARDLINK) 77 #elif defined(CFA_HAVE_IOSQE_IO_LINK) 78 #define LINK_FLAGS (IOSQE_IO_LINK) 79 #elif defined(CFA_HAVE_IOSQE_IO_HARDLINK) 80 #define LINK_FLAGS (IOSQE_IO_HARDLINK) 81 #else 82 #define LINK_FLAGS (0) 83 #endif 84 85 #if defined(CFA_HAVE_SPLICE_F_FD_IN_FIXED) 86 #define SPLICE_FLAGS (SPLICE_F_FD_IN_FIXED) 87 #else 88 #define SPLICE_FLAGS (0) 89 #endif 90 91 54 92 #define __submit_prelude \ 93 if( 0 != (submit_flags & LINK_FLAGS) ) { errno = ENOTSUP; return -1; } \ 94 (void)timeout; (void)cancellation; \ 95 if( !context ) context = __get_io_context(); \ 55 96 __io_user_data_t data = { 0, active_thread() }; \ 56 struct __io_data & ring = * data.thrd->curr_cluster->io; \97 struct __io_data & ring = *context->thrd.ring; \ 57 98 struct io_uring_sqe * sqe; \ 58 99 uint32_t idx; \ 59 [sqe, idx] = __submit_alloc( ring, (uint64_t)(uintptr_t)&data ); 100 [sqe, idx] = __submit_alloc( ring, (uint64_t)(uintptr_t)&data ); \ 101 sqe->flags = REGULAR_FLAGS & submit_flags; 60 102 61 103 #define __submit_wait \ 62 104 /*__cfaabi_bits_print_safe( STDERR_FILENO, "Preparing user data %p for %p\n", &data, data.thrd );*/ \ 63 105 verify( sqe->user_data == (uint64_t)(uintptr_t)&data ); \ 64 __submit( ring, idx ); \106 __submit( context, idx ); \ 65 107 park( __cfaabi_dbg_ctx ); \ 108 if( data.result < 0 ) { \ 109 errno = -data.result; \ 110 return -1; \ 111 } \ 66 112 return data.result; 67 113 #endif … … 70 116 // I/O Forwards 71 117 //============================================================================================= 118 #include <time.hfa> 72 119 73 120 // Some forward declarations … … 121 168 // Asynchronous operations 122 169 #if defined(HAVE_PREADV2) 123 ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags ) {170 ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 124 171 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_READV) 125 172 return preadv2(fd, iov, iovcnt, offset, flags); … … 132 179 #endif 133 180 } 134 135 ssize_t cfa_preadv2_fixed(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 136 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_READV) 137 return preadv2(fd, iov, iovcnt, offset, flags); 181 #endif 182 183 #if defined(HAVE_PWRITEV2) 184 ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 185 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_WRITEV) 186 return pwritev2(fd, iov, iovcnt, offset, flags); 138 187 #else 139 188 __submit_prelude 140 189 141 (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset }; 142 sqe->flags |= IOSQE_FIXED_FILE; 190 (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset }; 143 191 144 192 __submit_wait … … 147 195 #endif 148 196 149 #if defined(HAVE_PWRITEV2) 150 ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { 151 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_WRITEV) 152 return pwritev2(fd, iov, iovcnt, offset, flags); 153 #else 154 __submit_prelude 155 156 (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset }; 157 158 __submit_wait 159 #endif 160 } 161 #endif 162 163 int cfa_fsync(int fd) { 197 int cfa_fsync(int fd, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 164 198 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FSYNC) 165 199 return fsync(fd); … … 173 207 } 174 208 175 int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags ) {209 int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 176 210 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SYNC_FILE_RANGE) 177 211 return sync_file_range(fd, offset, nbytes, flags); … … 189 223 190 224 191 ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags ) {225 ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 192 226 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SENDMSG) 193 227 return sendmsg(sockfd, msg, flags); … … 202 236 } 203 237 204 ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags ) {238 ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 205 239 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_RECVMSG) 206 240 return recvmsg(sockfd, msg, flags); … … 215 249 } 216 250 217 ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags ) {251 ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 218 252 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SEND) 219 253 return send( sockfd, buf, len, flags ); … … 230 264 } 231 265 232 ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags ) {266 ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 233 267 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_RECV) 234 268 return recv( sockfd, buf, len, flags ); … … 245 279 } 246 280 247 int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags ) {281 int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 248 282 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ACCEPT) 249 283 return accept4( sockfd, addr, addrlen, flags ); … … 260 294 } 261 295 262 int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen ) {296 int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 263 297 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_CONNECT) 264 298 return connect( sockfd, addr, addrlen ); … … 274 308 } 275 309 276 int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len ) {310 int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 277 311 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FALLOCATE) 278 312 return fallocate( fd, mode, offset, len ); … … 289 323 } 290 324 291 int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice ) {325 int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 292 326 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FADVISE) 293 327 return posix_fadvise( fd, offset, len, advice ); … … 304 338 } 305 339 306 int cfa_madvise(void *addr, size_t length, int advice ) {340 int cfa_madvise(void *addr, size_t length, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 307 341 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_MADVISE) 308 342 return madvise( addr, length, advice ); … … 319 353 } 320 354 321 int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode ) {355 int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 322 356 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_OPENAT) 323 357 return openat( dirfd, pathname, flags, mode ); … … 334 368 } 335 369 336 int cfa_close(int fd ) {370 int cfa_close(int fd, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 337 371 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_CLOSE) 338 372 return close( fd ); … … 348 382 // Forward declare in case it is not supported 349 383 struct statx; 350 int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf ) {384 int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 351 385 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_STATX) 352 386 #if defined(__NR_statx) … … 360 394 361 395 (*sqe){ IORING_OP_STATX, dirfd, pathname, mask, (uint64_t)statxbuf }; 362 sqe-> flags = flags;363 364 __submit_wait 365 #endif 366 } 367 368 ssize_t cfa_read(int fd, void *buf, size_t count ) {396 sqe->statx_flags = flags; 397 398 __submit_wait 399 #endif 400 } 401 402 ssize_t cfa_read(int fd, void *buf, size_t count, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 369 403 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_READ) 370 404 return read( fd, buf, count ); … … 378 412 } 379 413 380 ssize_t cfa_write(int fd, void *buf, size_t count ) {414 ssize_t cfa_write(int fd, void *buf, size_t count, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 381 415 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_WRITE) 382 416 return read( fd, buf, count ); … … 390 424 } 391 425 392 ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags ) {426 ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 393 427 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SPLICE) 394 428 return splice( fd_in, off_in, fd_out, off_out, len, flags ); … … 399 433 sqe->splice_fd_in = fd_in; 400 434 sqe->splice_off_in = off_in; 401 sqe->splice_flags = flags; 402 403 __submit_wait 404 #endif 405 } 406 407 ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int in_flags, int out_flags) { 408 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SPLICE) 409 return splice( fd_in, off_in, fd_out, off_out, len, flags ); 410 #else 411 __submit_prelude 412 413 (*sqe){ IORING_OP_SPLICE, fd_out, 0p, len, off_out }; 414 sqe->splice_fd_in = fd_in; 415 sqe->splice_off_in = off_in; 416 sqe->splice_flags = flags | out_flags; 417 sqe->flags = in_flags; 418 419 __submit_wait 420 #endif 421 } 422 423 ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags) { 435 sqe->splice_flags = flags | (SPLICE_FLAGS & submit_flags); 436 437 __submit_wait 438 #endif 439 } 440 441 ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 424 442 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_TEE) 425 443 return tee( fd_in, fd_out, len, flags ); … … 429 447 (*sqe){ IORING_OP_TEE, fd_out, 0p, len, 0 }; 430 448 sqe->splice_fd_in = fd_in; 431 sqe->splice_flags = flags;449 sqe->splice_flags = flags | (SPLICE_FLAGS & submit_flags); 432 450 433 451 __submit_wait … … 536 554 537 555 if( /*func == (fptr_t)splice || */ 538 func == (fptr_t)(ssize_t (*)(int, loff_t *, int, loff_t *, size_t, unsigned int))cfa_splice, 539 func == (fptr_t)(ssize_t (*)(int, loff_t *, int, loff_t *, size_t, unsigned int, int, int))cfa_splice ) 556 func == (fptr_t)cfa_splice ) 540 557 #define _CFA_IO_FEATURE_CFA_HAVE_IORING_OP_SPLICE , 541 558 return IS_DEFINED(CFA_HAVE_IORING_OP_SPLICE); -
libcfa/src/concurrency/iofwd.hfa
re0f93e0 rf00b26d4 19 19 extern "C" { 20 20 #include <sys/types.h> 21 #if CFA_HAVE_LINUX_IO_URING_H 22 #include <linux/io_uring.h> 23 #endif 21 24 } 22 25 #include "bits/defs.hfa" 26 #include "time.hfa" 27 28 #if defined(CFA_HAVE_IOSQE_FIXED_FILE) 29 #define CFA_IO_FIXED_FD1 IOSQE_FIXED_FILE 30 #endif 31 #if defined(CFA_HAVE_SPLICE_F_FD_IN_FIXED) 32 #define CFA_IO_FIXED_FD2 SPLICE_F_FD_IN_FIXED 33 #endif 34 #if defined(CFA_HAVE_IOSQE_IO_DRAIN) 35 #define CFA_IO_DRAIN IOSQE_IO_DRAIN 36 #endif 37 #if defined(CFA_HAVE_IOSQE_ASYNC) 38 #define CFA_IO_ASYNC IOSQE_ASYNC 39 #endif 40 41 struct cluster; 42 struct io_context; 43 struct io_cancellation; 23 44 24 45 struct iovec; … … 27 48 struct statx; 28 49 29 extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags );30 extern ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags );31 extern int cfa_fsync(int fd );32 extern int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags );33 extern ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags );34 extern ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags );35 extern ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags );36 extern ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags );37 extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags );38 extern int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen );39 extern int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len );40 extern int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice );41 extern int cfa_madvise(void *addr, size_t length, int advice );42 extern int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode );43 extern int cfa_close(int fd );44 extern int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf );45 extern ssize_t cfa_read(int fd, void *buf, size_t count );46 extern ssize_t cfa_write(int fd, void *buf, size_t count );47 extern ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags );48 extern ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags );50 extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 51 extern ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 52 extern int cfa_fsync(int fd, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 53 extern int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 54 extern ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 55 extern ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 56 extern ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 57 extern ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 58 extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 59 extern int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 60 extern int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 61 extern int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 62 extern int cfa_madvise(void *addr, size_t length, int advice, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 63 extern int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 64 extern int cfa_close(int fd, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 65 extern int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 66 extern ssize_t cfa_read(int fd, void *buf, size_t count, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 67 extern ssize_t cfa_write(int fd, void *buf, size_t count, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 68 extern ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 69 extern ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p); 49 70 50 71 //----------------------------------------------------------------------------- 51 72 // Check if a function is blocks a only the user thread 52 73 bool has_user_level_blocking( fptr_t func ); 74 75 //----------------------------------------------------------------------------- 76 void register_fixed_files( io_context & ctx , int * files, unsigned count ); 77 void register_fixed_files( cluster & cltr, int * files, unsigned count ); -
libcfa/src/concurrency/kernel.cfa
re0f93e0 rf00b26d4 130 130 KERNEL_STORAGE($thread, mainThread); 131 131 KERNEL_STORAGE(__stack_t, mainThreadCtx); 132 KERNEL_STORAGE(io_context, mainPollerThread); 132 133 KERNEL_STORAGE(__scheduler_RWLock_t, __scheduler_lock); 133 134 #if !defined(__CFA_NO_STATISTICS__) … … 310 311 } 311 312 312 void ?{}(cluster & this, const char name[], Duration preemption_rate, unsigned io_flags) with( this ) {313 void ?{}(cluster & this, const char name[], Duration preemption_rate, unsigned num_io, const io_context_params & io_params) with( this ) { 313 314 this.name = name; 314 315 this.preemption_rate = preemption_rate; … … 335 336 ready_mutate_unlock( last_size ); 336 337 337 338 __kernel_io_startup( this, io_flags, &this == mainCluster ); 338 this.io.cnt = num_io; 339 this.io.ctxs = aalloc(num_io); 340 for(i; this.io.cnt) { 341 (this.io.ctxs[i]){ this, io_params }; 342 } 339 343 } 340 344 341 345 void ^?{}(cluster & this) { 342 __kernel_io_shutdown( this, &this == mainCluster ); 346 for(i; this.io.cnt) { 347 ^(this.io.ctxs[i]){ true }; 348 } 349 free(this.io.ctxs); 343 350 344 351 // Lock the RWlock so no-one pushes/pops while we are changing the queue … … 853 860 // Initialize the main cluster 854 861 mainCluster = (cluster *)&storage_mainCluster; 855 (*mainCluster){"Main Cluster" };862 (*mainCluster){"Main Cluster", 0}; 856 863 857 864 __cfadbg_print_safe(runtime_core, "Kernel : Main cluster ready\n"); … … 901 908 #endif 902 909 910 // Start IO 911 __kernel_io_startup(); 912 903 913 // Enable preemption 904 914 kernel_start_preemption(); … … 918 928 919 929 // Now that the system is up, finish creating systems that need threading 920 __kernel_io_finish_start( *mainCluster ); 921 930 mainCluster->io.ctxs = (io_context *)&storage_mainPollerThread; 931 mainCluster->io.cnt = 1; 932 (*mainCluster->io.ctxs){ *mainCluster }; 922 933 923 934 __cfadbg_print_safe(runtime_core, "Kernel : Started\n--------------------------------------------------\n\n"); … … 930 941 static void __kernel_shutdown(void) { 931 942 //Before we start shutting things down, wait for systems that need threading to shutdown 932 __kernel_io_prepare_stop( *mainCluster ); 943 ^(*mainCluster->io.ctxs){}; 944 mainCluster->io.cnt = 0; 945 mainCluster->io.ctxs = 0p; 933 946 934 947 /* paranoid */ verify( TL_GET( preemption_state.enabled ) ); … … 949 962 // Disable preemption 950 963 kernel_stop_preemption(); 964 965 // Stop IO 966 __kernel_io_shutdown(); 951 967 952 968 // Destroy the main processor and its context in reverse order of construction -
libcfa/src/concurrency/kernel.hfa
re0f93e0 rf00b26d4 129 129 struct __io_data; 130 130 131 #define CFA_CLUSTER_IO_POLLER_USER_THREAD (1 << 0) // 0x01 132 #define CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS (1 << 1) // 0x02 133 #define CFA_CLUSTER_IO_EAGER_SUBMITS (1 << 2) // 0x04 134 #define CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS (1 << 3) // 0x08 135 #define CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES (1 << 4) // 0x10 136 #define CFA_CLUSTER_IO_BUFFLEN_OFFSET 16 137 131 // IO poller user-thread 132 // Not using the "thread" keyword because we want to control 133 // more carefully when to start/stop it 134 struct $io_ctx_thread { 135 struct __io_data * ring; 136 single_sem sem; 137 volatile bool done; 138 $thread self; 139 }; 140 141 142 struct io_context { 143 $io_ctx_thread thrd; 144 }; 145 146 struct io_context_params { 147 int num_entries; 148 int num_ready; 149 int submit_aff; 150 bool eager_submits:1; 151 bool poller_submits:1; 152 bool poll_submit:1; 153 bool poll_complete:1; 154 }; 155 156 void ?{}(io_context_params & this); 157 158 void ?{}(io_context & this, struct cluster & cl); 159 void ?{}(io_context & this, struct cluster & cl, const io_context_params & params); 160 void ^?{}(io_context & this); 161 162 struct io_cancellation { 163 uint32_t target; 164 }; 165 166 static inline void ?{}(io_cancellation & this) { this.target = -1u; } 167 static inline void ^?{}(io_cancellation & this) {} 168 bool cancel(io_cancellation & this); 138 169 139 170 //----------------------------------------------------------------------------- … … 206 237 } node; 207 238 208 struct __io_data * io; 239 struct { 240 io_context * ctxs; 241 unsigned cnt; 242 } io; 209 243 210 244 #if !defined(__CFA_NO_STATISTICS__) … … 215 249 extern Duration default_preemption(); 216 250 217 void ?{} (cluster & this, const char name[], Duration preemption_rate, unsigned flags);251 void ?{} (cluster & this, const char name[], Duration preemption_rate, unsigned num_io, const io_context_params & io_params); 218 252 void ^?{}(cluster & this); 219 253 220 static inline void ?{} (cluster & this) { this{"Anonymous Cluster", default_preemption(), 0}; } 221 static inline void ?{} (cluster & this, Duration preemption_rate) { this{"Anonymous Cluster", preemption_rate, 0}; } 222 static inline void ?{} (cluster & this, const char name[]) { this{name, default_preemption(), 0}; } 223 static inline void ?{} (cluster & this, unsigned flags) { this{"Anonymous Cluster", default_preemption(), flags}; } 224 static inline void ?{} (cluster & this, Duration preemption_rate, unsigned flags) { this{"Anonymous Cluster", preemption_rate, flags}; } 225 static inline void ?{} (cluster & this, const char name[], unsigned flags) { this{name, default_preemption(), flags}; } 254 static inline void ?{} (cluster & this) { io_context_params default_params; this{"Anonymous Cluster", default_preemption(), 1, default_params}; } 255 static inline void ?{} (cluster & this, Duration preemption_rate) { io_context_params default_params; this{"Anonymous Cluster", preemption_rate, 1, default_params}; } 256 static inline void ?{} (cluster & this, const char name[]) { io_context_params default_params; this{name, default_preemption(), 1, default_params}; } 257 static inline void ?{} (cluster & this, unsigned num_io) { io_context_params default_params; this{"Anonymous Cluster", default_preemption(), num_io, default_params}; } 258 static inline void ?{} (cluster & this, Duration preemption_rate, unsigned num_io) { io_context_params default_params; this{"Anonymous Cluster", preemption_rate, num_io, default_params}; } 259 static inline void ?{} (cluster & this, const char name[], unsigned num_io) { io_context_params default_params; this{name, default_preemption(), num_io, default_params}; } 260 static inline void ?{} (cluster & this, const io_context_params & io_params) { this{"Anonymous Cluster", default_preemption(), 1, io_params}; } 261 static inline void ?{} (cluster & this, Duration preemption_rate, const io_context_params & io_params) { this{"Anonymous Cluster", preemption_rate, 1, io_params}; } 262 static inline void ?{} (cluster & this, const char name[], const io_context_params & io_params) { this{name, default_preemption(), 1, io_params}; } 263 static inline void ?{} (cluster & this, unsigned num_io, const io_context_params & io_params) { this{"Anonymous Cluster", default_preemption(), num_io, io_params}; } 264 static inline void ?{} (cluster & this, Duration preemption_rate, unsigned num_io, const io_context_params & io_params) { this{"Anonymous Cluster", preemption_rate, num_io, io_params}; } 265 static inline void ?{} (cluster & this, const char name[], unsigned num_io, const io_context_params & io_params) { this{name, default_preemption(), num_io, io_params}; } 226 266 227 267 static inline [cluster *&, cluster *& ] __get( cluster & this ) __attribute__((const)) { return this.node.[next, prev]; } -
libcfa/src/concurrency/kernel_private.hfa
re0f93e0 rf00b26d4 84 84 void __unpark( struct __processor_id_t *, $thread * thrd __cfaabi_dbg_ctx_param2 ); 85 85 86 //----------------------------------------------------------------------------- 87 // I/O 88 void __kernel_io_startup ( cluster &, unsigned, bool ); 89 void __kernel_io_finish_start( cluster & ); 90 void __kernel_io_prepare_stop( cluster & ); 91 void __kernel_io_shutdown ( cluster &, bool ); 86 static inline bool __post(single_sem & this, struct __processor_id_t * id) { 87 for() { 88 struct $thread * expected = this.ptr; 89 if(expected == 1p) return false; 90 if(expected == 0p) { 91 if(__atomic_compare_exchange_n(&this.ptr, &expected, 1p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 92 return false; 93 } 94 } 95 else { 96 if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 97 __unpark( id, expected __cfaabi_dbg_ctx2 ); 98 return true; 99 } 100 } 101 } 102 } 92 103 93 104 //----------------------------------------------------------------------------- … … 109 120 void doregister( struct cluster * cltr, struct $thread & thrd ); 110 121 void unregister( struct cluster * cltr, struct $thread & thrd ); 122 123 //----------------------------------------------------------------------------- 124 // I/O 125 void __kernel_io_startup (); 126 void __kernel_io_shutdown (); 127 128 static inline io_context * __get_io_context( void ) { 129 cluster * cltr = active_cluster(); 130 /* paranoid */ verifyf( cltr, "No active cluster for io operation\n"); 131 assertf( cltr->io.cnt > 0, "Cluster %p has no default io contexts and no context was specified\n", cltr ); 132 /* paranoid */ verifyf( cltr->io.ctxs, "default io contexts for cluster %p are missing\n", cltr); 133 return &cltr->io.ctxs[ __tls_rand() % cltr->io.cnt ]; 134 } 135 136 void ^?{}(io_context & this, bool ); 111 137 112 138 //=======================================================================
Note: See TracChangeset
for help on using the changeset viewer.