Changes in / [d3ab183:f90d10f]
- Files:
-
- 5 edited
-
benchmark/io/readv.cfa (modified) (1 diff)
-
libcfa/prelude/defines.hfa.in (modified) (1 diff)
-
libcfa/src/concurrency/io.cfa (modified) (18 diffs)
-
libcfa/src/concurrency/kernel.hfa (modified) (2 diffs)
-
libcfa/src/concurrency/kernel_private.hfa (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/readv.cfa
rd3ab183 rf90d10f 16 16 #include <thread.hfa> 17 17 #include <time.hfa> 18 19 #if !defined(HAVE_LINUX_IO_URING_H)20 #warning no io uring21 #endif22 18 23 19 extern bool traceHeapOn(); -
libcfa/prelude/defines.hfa.in
rd3ab183 rf90d10f 16 16 #undef HAVE_LINUX_IO_URING_H 17 17 18 // #define __CFA_IO_POLLING_USER__ 19 // #define __CFA_IO_POLLING_KERNEL__ 18 #undef __CFA_NO_STATISTICS__ -
libcfa/src/concurrency/io.cfa
rd3ab183 rf90d10f 86 86 #endif 87 87 88 #if defined(__CFA_IO_POLLING_USER__) 89 void ?{}( __io_poller_fast & this, struct cluster & cltr ) { 90 this.ring = &cltr.io; 91 (this.thrd){ "Fast I/O Poller", cltr }; 92 } 93 void ^?{}( __io_poller_fast & mutex this ); 94 void main( __io_poller_fast & this ); 95 static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; } 96 void ^?{}( __io_poller_fast & mutex this ) {} 97 #endif 88 // Fast poller user-thread 89 // Not using the "thread" keyword because we want to control 90 // more carefully when to start/stop it 91 struct __io_poller_fast { 92 struct __io_data * ring; 93 bool waiting; 94 $thread thrd; 95 }; 96 97 void ?{}( __io_poller_fast & this, struct cluster & cltr ) { 98 this.ring = cltr.io; 99 this.waiting = true; 100 (this.thrd){ "Fast I/O Poller", cltr }; 101 } 102 void ^?{}( __io_poller_fast & mutex this ); 103 void main( __io_poller_fast & this ); 104 static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; } 105 void ^?{}( __io_poller_fast & mutex this ) {} 106 107 struct __submition_data { 108 // Head and tail of the ring (associated with array) 109 volatile uint32_t * head; 110 volatile uint32_t * tail; 111 112 // The actual kernel ring which uses head/tail 113 // indexes into the sqes arrays 114 uint32_t * array; 115 116 // number of entries and mask to go with it 117 const uint32_t * num; 118 const uint32_t * mask; 119 120 // Submission flags (Not sure what for) 121 uint32_t * flags; 122 123 // number of sqes not submitted (whatever that means) 124 uint32_t * dropped; 125 126 // Like head/tail but not seen by the kernel 127 volatile uint32_t alloc; 128 volatile uint32_t ready; 129 130 __spinlock_t lock; 131 132 // A buffer of sqes (not the actual ring) 133 struct io_uring_sqe * sqes; 134 135 // The location and size of the mmaped area 136 void * ring_ptr; 137 size_t ring_sz; 138 139 // Statistics 140 #if !defined(__CFA_NO_STATISTICS__) 141 struct { 142 struct { 143 unsigned long long int val; 144 unsigned long long int cnt; 145 } submit_avg; 146 } stats; 147 #endif 148 }; 149 150 struct __completion_data { 151 // Head and tail of the ring 152 volatile uint32_t * head; 153 volatile uint32_t * tail; 154 155 // number of entries and mask to go with it 156 const uint32_t * mask; 157 const uint32_t * num; 158 159 // number of cqes not submitted (whatever that means) 160 uint32_t * overflow; 161 162 // the kernel ring 163 struct io_uring_cqe * cqes; 164 165 // The location and size of the mmaped area 166 void * ring_ptr; 167 size_t ring_sz; 168 169 // Statistics 170 #if !defined(__CFA_NO_STATISTICS__) 171 struct { 172 struct { 173 unsigned long long int val; 174 unsigned long long int slow_cnt; 175 unsigned long long int fast_cnt; 176 } completed_avg; 177 } stats; 178 #endif 179 }; 180 181 struct __io_data { 182 struct __submition_data submit_q; 183 struct __completion_data completion_q; 184 uint32_t flags; 185 int fd; 186 semaphore submit; 187 volatile bool done; 188 struct { 189 struct { 190 void * stack; 191 pthread_t kthrd; 192 } slow; 193 __io_poller_fast fast; 194 __bin_sem_t sem; 195 } poller; 196 }; 98 197 99 198 //============================================================================================= … … 101 200 //============================================================================================= 102 201 void __kernel_io_startup( cluster & this, bool main_cluster ) { 202 this.io = malloc(); 203 103 204 // Step 1 : call to setup 104 205 struct io_uring_params params; … … 113 214 114 215 // Step 2 : mmap result 115 memset( &this.io, 0, sizeof(struct io_ring));116 struct io_uring_sq & sq = this.io.submit_q;117 struct io_uring_cq & cq = this.io.completion_q;216 memset( this.io, 0, sizeof(struct __io_data) ); 217 struct __submition_data & sq = this.io->submit_q; 218 struct __completion_data & cq = this.io->completion_q; 118 219 119 220 // calculate the right ring size … … 193 294 194 295 // Update the global ring info 195 this.io .flags = params.flags;196 this.io .fd = fd;197 this.io .done = false;198 (this.io .submit){ min(*sq.num, *cq.num) };296 this.io->flags = params.flags; 297 this.io->fd = fd; 298 this.io->done = false; 299 (this.io->submit){ min(*sq.num, *cq.num) }; 199 300 200 301 // Initialize statistics 201 302 #if !defined(__CFA_NO_STATISTICS__) 202 this.io.submit_q.stats.submit_avg.val = 0; 203 this.io.submit_q.stats.submit_avg.cnt = 0; 204 this.io.completion_q.stats.completed_avg.val = 0; 205 this.io.completion_q.stats.completed_avg.cnt = 0; 303 this.io->submit_q.stats.submit_avg.val = 0; 304 this.io->submit_q.stats.submit_avg.cnt = 0; 305 this.io->completion_q.stats.completed_avg.val = 0; 306 this.io->completion_q.stats.completed_avg.slow_cnt = 0; 307 this.io->completion_q.stats.completed_avg.fast_cnt = 0; 206 308 #endif 207 309 … … 212 314 213 315 void __kernel_io_finish_start( cluster & this ) { 214 #if defined(__CFA_IO_POLLING_USER__) 215 __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this); 216 (this.io.poller.fast){ this }; 217 __thrd_start( this.io.poller.fast, main ); 218 #endif 316 __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this); 317 (this.io->poller.fast){ this }; 318 __thrd_start( this.io->poller.fast, main ); 219 319 220 320 // Create the poller thread 221 321 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluter %p\n", &this); 222 this.io .poller.slow.stack = __create_pthread( &this.io.poller.slow.kthrd, __io_poller_slow, &this );322 this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this ); 223 323 } 224 324 … … 226 326 __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this); 227 327 // Notify the poller thread of the shutdown 228 __atomic_store_n(&this.io .done, true, __ATOMIC_SEQ_CST);328 __atomic_store_n(&this.io->done, true, __ATOMIC_SEQ_CST); 229 329 230 330 // Stop the IO Poller 231 331 sigval val = { 1 }; 232 pthread_sigqueue( this.io.poller.slow.kthrd, SIGUSR1, val ); 332 pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val ); 333 post( this.io->poller.sem ); 334 335 // Wait for the poller thread to finish 336 pthread_join( this.io->poller.slow.kthrd, 0p ); 337 free( this.io->poller.slow.stack ); 338 339 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this); 340 233 341 #if defined(__CFA_IO_POLLING_USER__) 234 post( this.io.poller.sem ); 235 #endif 236 237 // Wait for the poller thread to finish 238 pthread_join( this.io.poller.slow.kthrd, 0p ); 239 free( this.io.poller.slow.stack ); 240 241 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this); 242 243 #if defined(__CFA_IO_POLLING_USER__) 342 verify( this.io->poller.fast.waiting ); 343 verify( this.io->poller.fast.thrd.state == Blocked ); 344 345 this.io->poller.fast.thrd.curr_cluster = mainCluster; 346 244 347 // unpark the fast io_poller 245 unpark( &this.io .poller.fast.thrd __cfaabi_dbg_ctx2 );246 247 ^(this.io .poller.fast){};348 unpark( &this.io->poller.fast.thrd __cfaabi_dbg_ctx2 ); 349 350 ^(this.io->poller.fast){}; 248 351 249 352 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this); … … 259 362 #if !defined(__CFA_NO_STATISTICS__) 260 363 if(this.print_stats) { 261 __cfaabi_bits_print_safe( STDERR_FILENO, 262 "----- I/O uRing Stats -----\n" 263 "- total submit calls : %llu\n" 264 "- avg submit : %lf\n" 265 "- total wait calls : %llu\n" 266 "- avg completion/wait : %lf\n", 267 this.io.submit_q.stats.submit_avg.cnt, 268 ((double)this.io.submit_q.stats.submit_avg.val) / this.io.submit_q.stats.submit_avg.cnt, 269 this.io.completion_q.stats.completed_avg.cnt, 270 ((double)this.io.completion_q.stats.completed_avg.val) / this.io.completion_q.stats.completed_avg.cnt 271 ); 364 with(this.io->submit_q.stats, this.io->completion_q.stats) { 365 __cfaabi_bits_print_safe( STDERR_FILENO, 366 "----- I/O uRing Stats -----\n" 367 "- total submit calls : %llu\n" 368 "- avg submit : %lf\n" 369 "- total wait calls : %llu (%llu slow, %llu fast)\n" 370 "- avg completion/wait : %lf\n", 371 submit_avg.cnt, 372 ((double)submit_avg.val) / submit_avg.cnt, 373 completed_avg.slow_cnt + completed_avg.fast_cnt, 374 completed_avg.slow_cnt, completed_avg.fast_cnt, 375 ((double)completed_avg.val) / (completed_avg.slow_cnt + completed_avg.fast_cnt) 376 ); 377 } 272 378 } 273 379 #endif 274 380 275 381 // Shutdown the io rings 276 struct io_uring_sq & sq = this.io.submit_q;277 struct io_uring_cq & cq = this.io.completion_q;382 struct __submition_data & sq = this.io->submit_q; 383 struct __completion_data & cq = this.io->completion_q; 278 384 279 385 // unmap the submit queue entries … … 289 395 290 396 // close the file descriptor 291 close(this.io.fd); 397 close(this.io->fd); 398 399 free( this.io ); 292 400 } 293 401 … … 302 410 // Process a single completion message from the io_uring 303 411 // This is NOT thread-safe 304 static int __drain_io( struct io_ring& ring, sigset_t * mask, int waitcnt, bool in_kernel ) {412 static int __drain_io( struct __io_data & ring, sigset_t * mask, int waitcnt, bool in_kernel ) { 305 413 int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8); 306 414 if( ret < 0 ) { … … 320 428 // Nothing was new return 0 321 429 if (head == tail) { 322 #if !defined(__CFA_NO_STATISTICS__)323 ring.completion_q.stats.completed_avg.cnt += 1;324 #endif325 430 return 0; 326 431 } … … 348 453 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED ); 349 454 350 // Update statistics351 #if !defined(__CFA_NO_STATISTICS__)352 ring.completion_q.stats.completed_avg.val += count;353 ring.completion_q.stats.completed_avg.cnt += 1;354 #endif355 356 455 return count; 357 456 } … … 359 458 static void * __io_poller_slow( void * arg ) { 360 459 cluster * cltr = (cluster *)arg; 361 struct io_ring & ring =cltr->io;460 struct __io_data & ring = *cltr->io; 362 461 363 462 sigset_t mask; … … 372 471 verify( (*ring.completion_q.head) == (*ring.completion_q.tail) ); 373 472 473 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p ready\n", &ring); 474 374 475 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 375 476 #if defined(__CFA_IO_POLLING_USER__) … … 378 479 // batton pass to the user-thread 379 480 int count = __drain_io( ring, &mask, 1, true ); 481 482 // Update statistics 483 #if !defined(__CFA_NO_STATISTICS__) 484 ring.completion_q.stats.completed_avg.val += count; 485 ring.completion_q.stats.completed_avg.slow_cnt += 1; 486 #endif 487 380 488 if(count > 0) { 381 489 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring); … … 387 495 388 496 //In the naive approach, just poll the io completion queue directly 389 __drain_io( ring, &mask, 1, true ); 497 int count = __drain_io( ring, &mask, 1, true ); 498 499 // Update statistics 500 #if !defined(__CFA_NO_STATISTICS__) 501 ring.completion_q.stats.completed_avg.val += count; 502 ring.completion_q.stats.completed_avg.slow_cnt += 1; 503 #endif 390 504 391 505 #endif 392 506 } 393 507 508 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring); 509 394 510 return 0p; 395 511 } 396 512 397 #if defined(__CFA_IO_POLLING_USER__) 398 void main( __io_poller_fast & this ) { 399 // Start parked 400 park( __cfaabi_dbg_ctx ); 401 402 // Then loop until we need to start 403 while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) { 404 // Drain the io 405 if(0 > __drain_io( *this.ring, 0p, 0, false )) { 406 // If we got something, just yield and check again 407 yield(); 408 } 409 else { 410 // We didn't get anything baton pass to the slow poller 411 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring); 412 post( this.ring->poller.sem ); 413 park( __cfaabi_dbg_ctx ); 414 } 513 void main( __io_poller_fast & this ) { 514 // Start parked 515 park( __cfaabi_dbg_ctx ); 516 517 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring); 518 519 // Then loop until we need to start 520 while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) { 521 // Drain the io 522 this.waiting = false; 523 int count = __drain_io( *this.ring, 0p, 0, false ); 524 525 // Update statistics 526 #if !defined(__CFA_NO_STATISTICS__) 527 this.ring->completion_q.stats.completed_avg.val += count; 528 this.ring->completion_q.stats.completed_avg.fast_cnt += 1; 529 #endif 530 531 this.waiting = true; 532 if(0 > count) { 533 // If we got something, just yield and check again 534 yield(); 415 535 } 416 } 417 #endif 536 else { 537 // We didn't get anything baton pass to the slow poller 538 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring); 539 post( this.ring->poller.sem ); 540 park( __cfaabi_dbg_ctx ); 541 } 542 } 543 544 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring); 545 } 418 546 419 547 //============================================================================================= … … 445 573 // 446 574 447 static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct io_ring& ring ) {575 static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring ) { 448 576 // Wait for a spot to be available 449 577 P(ring.submit); … … 463 591 } 464 592 465 static inline void __submit( struct io_ring& ring, uint32_t idx ) {593 static inline void __submit( struct __io_data & ring, uint32_t idx ) { 466 594 // get mutual exclusion 467 595 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); … … 524 652 525 653 #define __submit_prelude \ 526 struct io_ring & ring =active_cluster()->io; \654 struct __io_data & ring = *active_cluster()->io; \ 527 655 struct io_uring_sqe * sqe; \ 528 656 uint32_t idx; \ -
libcfa/src/concurrency/kernel.hfa
rd3ab183 rf90d10f 114 114 //----------------------------------------------------------------------------- 115 115 // I/O 116 #if defined(HAVE_LINUX_IO_URING_H) 117 struct io_uring_sq { 118 // Head and tail of the ring (associated with array) 119 volatile uint32_t * head; 120 volatile uint32_t * tail; 121 122 // The actual kernel ring which uses head/tail 123 // indexes into the sqes arrays 124 uint32_t * array; 125 126 // number of entries and mask to go with it 127 const uint32_t * num; 128 const uint32_t * mask; 129 130 // Submission flags (Not sure what for) 131 uint32_t * flags; 132 133 // number of sqes not submitted (whatever that means) 134 uint32_t * dropped; 135 136 // Like head/tail but not seen by the kernel 137 volatile uint32_t alloc; 138 volatile uint32_t ready; 139 140 __spinlock_t lock; 141 142 // A buffer of sqes (not the actual ring) 143 struct io_uring_sqe * sqes; 144 145 // The location and size of the mmaped area 146 void * ring_ptr; 147 size_t ring_sz; 148 149 // Statistics 150 #if !defined(__CFA_NO_STATISTICS__) 151 struct { 152 struct { 153 unsigned long long int val; 154 unsigned long long int cnt; 155 } submit_avg; 156 } stats; 157 #endif 158 }; 159 160 struct io_uring_cq { 161 // Head and tail of the ring 162 volatile uint32_t * head; 163 volatile uint32_t * tail; 164 165 // number of entries and mask to go with it 166 const uint32_t * mask; 167 const uint32_t * num; 168 169 // number of cqes not submitted (whatever that means) 170 uint32_t * overflow; 171 172 // the kernel ring 173 struct io_uring_cqe * cqes; 174 175 // The location and size of the mmaped area 176 void * ring_ptr; 177 size_t ring_sz; 178 179 // Statistics 180 #if !defined(__CFA_NO_STATISTICS__) 181 struct { 182 struct { 183 unsigned long long int val; 184 unsigned long long int cnt; 185 } completed_avg; 186 } stats; 187 #endif 188 }; 189 190 #if defined(__CFA_IO_POLLING_USER__) 191 struct __io_poller_fast { 192 struct io_ring * ring; 193 $thread thrd; 194 }; 195 #endif 196 197 struct io_ring { 198 struct io_uring_sq submit_q; 199 struct io_uring_cq completion_q; 200 uint32_t flags; 201 int fd; 202 semaphore submit; 203 volatile bool done; 204 struct { 205 struct { 206 void * stack; 207 pthread_t kthrd; 208 } slow; 209 #if defined(__CFA_IO_POLLING_USER__) 210 __io_poller_fast fast; 211 __bin_sem_t sem; 212 #endif 213 } poller; 214 }; 215 #endif 116 struct __io_data; 216 117 217 118 //----------------------------------------------------------------------------- … … 247 148 } node; 248 149 249 #if defined(HAVE_LINUX_IO_URING_H) 250 struct io_ring io; 251 #endif 150 struct __io_data * io; 252 151 253 152 #if !defined(__CFA_NO_STATISTICS__) -
libcfa/src/concurrency/kernel_private.hfa
rd3ab183 rf90d10f 59 59 extern volatile thread_local __cfa_kernel_preemption_state_t preemption_state __attribute__ ((tls_model ( "initial-exec" ))); 60 60 61 extern cluster * mainCluster; 62 61 63 //----------------------------------------------------------------------------- 62 64 // Threads
Note:
See TracChangeset
for help on using the changeset viewer.