- Timestamp:
- May 5, 2020, 10:45:18 AM (4 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- f90d10f
- Parents:
- 3c039b0
- Location:
- libcfa
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/prelude/defines.hfa.in
r3c039b0 r61dd73d 16 16 #undef HAVE_LINUX_IO_URING_H 17 17 18 // #define __CFA_IO_POLLING_USER__ 19 // #define __CFA_IO_POLLING_KERNEL__ 18 #undef __CFA_NO_STATISTICS__ -
libcfa/src/concurrency/io.cfa
r3c039b0 r61dd73d 86 86 #endif 87 87 88 #if defined(__CFA_IO_POLLING_USER__) 89 void ?{}( __io_poller_fast & this, struct cluster & cltr ) { 90 this.ring = &cltr.io; 91 this.waiting = true; 92 (this.thrd){ "Fast I/O Poller", cltr }; 93 } 94 void ^?{}( __io_poller_fast & mutex this ); 95 void main( __io_poller_fast & this ); 96 static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; } 97 void ^?{}( __io_poller_fast & mutex this ) {} 98 #endif 88 // Fast poller user-thread 89 // Not using the "thread" keyword because we want to control 90 // more carefully when to start/stop it 91 struct __io_poller_fast { 92 struct __io_data * ring; 93 bool waiting; 94 $thread thrd; 95 }; 96 97 void ?{}( __io_poller_fast & this, struct cluster & cltr ) { 98 this.ring = cltr.io; 99 this.waiting = true; 100 (this.thrd){ "Fast I/O Poller", cltr }; 101 } 102 void ^?{}( __io_poller_fast & mutex this ); 103 void main( __io_poller_fast & this ); 104 static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; } 105 void ^?{}( __io_poller_fast & mutex this ) {} 106 107 struct __submition_data { 108 // Head and tail of the ring (associated with array) 109 volatile uint32_t * head; 110 volatile uint32_t * tail; 111 112 // The actual kernel ring which uses head/tail 113 // indexes into the sqes arrays 114 uint32_t * array; 115 116 // number of entries and mask to go with it 117 const uint32_t * num; 118 const uint32_t * mask; 119 120 // Submission flags (Not sure what for) 121 uint32_t * flags; 122 123 // number of sqes not submitted (whatever that means) 124 uint32_t * dropped; 125 126 // Like head/tail but not seen by the kernel 127 volatile uint32_t alloc; 128 volatile uint32_t ready; 129 130 __spinlock_t lock; 131 132 // A buffer of sqes (not the actual ring) 133 struct io_uring_sqe * sqes; 134 135 // The location and size of the mmaped area 136 void * ring_ptr; 137 size_t ring_sz; 138 139 // Statistics 140 #if !defined(__CFA_NO_STATISTICS__) 141 struct { 142 struct { 143 unsigned long long int val; 144 unsigned long long int cnt; 145 } submit_avg; 146 } stats; 147 #endif 148 }; 149 150 struct __completion_data { 151 // Head and tail of the ring 152 volatile uint32_t * head; 153 volatile uint32_t * tail; 154 155 // number of entries and mask to go with it 156 const uint32_t * mask; 157 const uint32_t * num; 158 159 // number of cqes not submitted (whatever that means) 160 uint32_t * overflow; 161 162 // the kernel ring 163 struct io_uring_cqe * cqes; 164 165 // The location and size of the mmaped area 166 void * ring_ptr; 167 size_t ring_sz; 168 169 // Statistics 170 #if !defined(__CFA_NO_STATISTICS__) 171 struct { 172 struct { 173 unsigned long long int val; 174 unsigned long long int slow_cnt; 175 unsigned long long int fast_cnt; 176 } completed_avg; 177 } stats; 178 #endif 179 }; 180 181 struct __io_data { 182 struct __submition_data submit_q; 183 struct __completion_data completion_q; 184 uint32_t flags; 185 int fd; 186 semaphore submit; 187 volatile bool done; 188 struct { 189 struct { 190 void * stack; 191 pthread_t kthrd; 192 } slow; 193 __io_poller_fast fast; 194 __bin_sem_t sem; 195 } poller; 196 }; 99 197 100 198 //============================================================================================= … … 102 200 //============================================================================================= 103 201 void __kernel_io_startup( cluster & this, bool main_cluster ) { 202 this.io = malloc(); 203 104 204 // Step 1 : call to setup 105 205 struct io_uring_params params; … … 114 214 115 215 // Step 2 : mmap result 116 memset( &this.io, 0, sizeof(struct io_ring));117 struct io_uring_sq & sq = this.io.submit_q;118 struct io_uring_cq & cq = this.io.completion_q;216 memset( this.io, 0, sizeof(struct __io_data) ); 217 struct __submition_data & sq = this.io->submit_q; 218 struct __completion_data & cq = this.io->completion_q; 119 219 120 220 // calculate the right ring size … … 194 294 195 295 // Update the global ring info 196 this.io .flags = params.flags;197 this.io .fd = fd;198 this.io .done = false;199 (this.io .submit){ min(*sq.num, *cq.num) };296 this.io->flags = params.flags; 297 this.io->fd = fd; 298 this.io->done = false; 299 (this.io->submit){ min(*sq.num, *cq.num) }; 200 300 201 301 // Initialize statistics 202 302 #if !defined(__CFA_NO_STATISTICS__) 203 this.io .submit_q.stats.submit_avg.val = 0;204 this.io .submit_q.stats.submit_avg.cnt = 0;205 this.io .completion_q.stats.completed_avg.val = 0;206 this.io .completion_q.stats.completed_avg.slow_cnt = 0;207 this.io .completion_q.stats.completed_avg.fast_cnt = 0;303 this.io->submit_q.stats.submit_avg.val = 0; 304 this.io->submit_q.stats.submit_avg.cnt = 0; 305 this.io->completion_q.stats.completed_avg.val = 0; 306 this.io->completion_q.stats.completed_avg.slow_cnt = 0; 307 this.io->completion_q.stats.completed_avg.fast_cnt = 0; 208 308 #endif 209 309 … … 214 314 215 315 void __kernel_io_finish_start( cluster & this ) { 216 #if defined(__CFA_IO_POLLING_USER__) 217 __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this); 218 (this.io.poller.fast){ this }; 219 __thrd_start( this.io.poller.fast, main ); 220 #endif 316 __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this); 317 (this.io->poller.fast){ this }; 318 __thrd_start( this.io->poller.fast, main ); 221 319 222 320 // Create the poller thread 223 321 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluter %p\n", &this); 224 this.io .poller.slow.stack = __create_pthread( &this.io.poller.slow.kthrd, __io_poller_slow, &this );322 this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this ); 225 323 } 226 324 … … 228 326 __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this); 229 327 // Notify the poller thread of the shutdown 230 __atomic_store_n(&this.io .done, true, __ATOMIC_SEQ_CST);328 __atomic_store_n(&this.io->done, true, __ATOMIC_SEQ_CST); 231 329 232 330 // Stop the IO Poller 233 331 sigval val = { 1 }; 234 pthread_sigqueue( this.io.poller.slow.kthrd, SIGUSR1, val ); 332 pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val ); 333 post( this.io->poller.sem ); 334 335 // Wait for the poller thread to finish 336 pthread_join( this.io->poller.slow.kthrd, 0p ); 337 free( this.io->poller.slow.stack ); 338 339 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this); 340 235 341 #if defined(__CFA_IO_POLLING_USER__) 236 post( this.io.poller.sem ); 237 #endif 238 239 // Wait for the poller thread to finish 240 pthread_join( this.io.poller.slow.kthrd, 0p ); 241 free( this.io.poller.slow.stack ); 242 243 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this); 244 245 #if defined(__CFA_IO_POLLING_USER__) 246 verify( this.io.poller.fast.waiting ); 247 verify( this.io.poller.fast.thrd.state == Blocked ); 248 249 this.io.poller.fast.thrd.curr_cluster = mainCluster; 342 verify( this.io->poller.fast.waiting ); 343 verify( this.io->poller.fast.thrd.state == Blocked ); 344 345 this.io->poller.fast.thrd.curr_cluster = mainCluster; 250 346 251 347 // unpark the fast io_poller 252 unpark( &this.io .poller.fast.thrd __cfaabi_dbg_ctx2 );253 254 ^(this.io .poller.fast){};348 unpark( &this.io->poller.fast.thrd __cfaabi_dbg_ctx2 ); 349 350 ^(this.io->poller.fast){}; 255 351 256 352 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this); … … 266 362 #if !defined(__CFA_NO_STATISTICS__) 267 363 if(this.print_stats) { 268 __cfaabi_bits_print_safe( STDERR_FILENO, 269 "----- I/O uRing Stats -----\n" 270 "- total submit calls : %llu\n" 271 "- avg submit : %lf\n" 272 "- total wait calls : %llu (%llu slow, %llu fast)\n" 273 "- avg completion/wait : %lf\n", 274 this.io.submit_q.stats.submit_avg.cnt, 275 ((double)this.io.submit_q.stats.submit_avg.val) / this.io.submit_q.stats.submit_avg.cnt, 276 this.io.completion_q.stats.completed_avg.slow_cnt + this.io.completion_q.stats.completed_avg.fast_cnt, 277 this.io.completion_q.stats.completed_avg.slow_cnt, this.io.completion_q.stats.completed_avg.fast_cnt, 278 ((double)this.io.completion_q.stats.completed_avg.val) / (this.io.completion_q.stats.completed_avg.slow_cnt + this.io.completion_q.stats.completed_avg.fast_cnt) 279 ); 364 with(this.io->submit_q.stats, this.io->completion_q.stats) { 365 __cfaabi_bits_print_safe( STDERR_FILENO, 366 "----- I/O uRing Stats -----\n" 367 "- total submit calls : %llu\n" 368 "- avg submit : %lf\n" 369 "- total wait calls : %llu (%llu slow, %llu fast)\n" 370 "- avg completion/wait : %lf\n", 371 submit_avg.cnt, 372 ((double)submit_avg.val) / submit_avg.cnt, 373 completed_avg.slow_cnt + completed_avg.fast_cnt, 374 completed_avg.slow_cnt, completed_avg.fast_cnt, 375 ((double)completed_avg.val) / (completed_avg.slow_cnt + completed_avg.fast_cnt) 376 ); 377 } 280 378 } 281 379 #endif 282 380 283 381 // Shutdown the io rings 284 struct io_uring_sq & sq = this.io.submit_q;285 struct io_uring_cq & cq = this.io.completion_q;382 struct __submition_data & sq = this.io->submit_q; 383 struct __completion_data & cq = this.io->completion_q; 286 384 287 385 // unmap the submit queue entries … … 297 395 298 396 // close the file descriptor 299 close(this.io.fd); 397 close(this.io->fd); 398 399 free( this.io ); 300 400 } 301 401 … … 310 410 // Process a single completion message from the io_uring 311 411 // This is NOT thread-safe 312 static int __drain_io( struct io_ring& ring, sigset_t * mask, int waitcnt, bool in_kernel ) {412 static int __drain_io( struct __io_data & ring, sigset_t * mask, int waitcnt, bool in_kernel ) { 313 413 int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8); 314 414 if( ret < 0 ) { … … 358 458 static void * __io_poller_slow( void * arg ) { 359 459 cluster * cltr = (cluster *)arg; 360 struct io_ring & ring =cltr->io;460 struct __io_data & ring = *cltr->io; 361 461 362 462 sigset_t mask; … … 411 511 } 412 512 413 #if defined(__CFA_IO_POLLING_USER__) 414 void main( __io_poller_fast & this ) { 415 // Start parked 416 park( __cfaabi_dbg_ctx ); 417 418 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring); 419 420 // Then loop until we need to start 421 while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) { 422 // Drain the io 423 this.waiting = false; 424 int count = __drain_io( *this.ring, 0p, 0, false ); 425 426 // Update statistics 427 #if !defined(__CFA_NO_STATISTICS__) 428 this.ring->completion_q.stats.completed_avg.val += count; 429 this.ring->completion_q.stats.completed_avg.fast_cnt += 1; 430 #endif 431 432 this.waiting = true; 433 if(0 > count) { 434 // If we got something, just yield and check again 435 yield(); 436 } 437 else { 438 // We didn't get anything baton pass to the slow poller 439 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring); 440 post( this.ring->poller.sem ); 441 park( __cfaabi_dbg_ctx ); 442 } 513 void main( __io_poller_fast & this ) { 514 // Start parked 515 park( __cfaabi_dbg_ctx ); 516 517 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring); 518 519 // Then loop until we need to start 520 while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) { 521 // Drain the io 522 this.waiting = false; 523 int count = __drain_io( *this.ring, 0p, 0, false ); 524 525 // Update statistics 526 #if !defined(__CFA_NO_STATISTICS__) 527 this.ring->completion_q.stats.completed_avg.val += count; 528 this.ring->completion_q.stats.completed_avg.fast_cnt += 1; 529 #endif 530 531 this.waiting = true; 532 if(0 > count) { 533 // If we got something, just yield and check again 534 yield(); 443 535 } 444 445 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring); 446 } 447 #endif 536 else { 537 // We didn't get anything baton pass to the slow poller 538 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring); 539 post( this.ring->poller.sem ); 540 park( __cfaabi_dbg_ctx ); 541 } 542 } 543 544 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring); 545 } 448 546 449 547 //============================================================================================= … … 475 573 // 476 574 477 static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct io_ring& ring ) {575 static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring ) { 478 576 // Wait for a spot to be available 479 577 P(ring.submit); … … 493 591 } 494 592 495 static inline void __submit( struct io_ring& ring, uint32_t idx ) {593 static inline void __submit( struct __io_data & ring, uint32_t idx ) { 496 594 // get mutual exclusion 497 595 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); … … 554 652 555 653 #define __submit_prelude \ 556 struct io_ring & ring =active_cluster()->io; \654 struct __io_data & ring = *active_cluster()->io; \ 557 655 struct io_uring_sqe * sqe; \ 558 656 uint32_t idx; \ -
libcfa/src/concurrency/kernel.hfa
r3c039b0 r61dd73d 114 114 //----------------------------------------------------------------------------- 115 115 // I/O 116 #if defined(HAVE_LINUX_IO_URING_H) 117 struct io_uring_sq { 118 // Head and tail of the ring (associated with array) 119 volatile uint32_t * head; 120 volatile uint32_t * tail; 121 122 // The actual kernel ring which uses head/tail 123 // indexes into the sqes arrays 124 uint32_t * array; 125 126 // number of entries and mask to go with it 127 const uint32_t * num; 128 const uint32_t * mask; 129 130 // Submission flags (Not sure what for) 131 uint32_t * flags; 132 133 // number of sqes not submitted (whatever that means) 134 uint32_t * dropped; 135 136 // Like head/tail but not seen by the kernel 137 volatile uint32_t alloc; 138 volatile uint32_t ready; 139 140 __spinlock_t lock; 141 142 // A buffer of sqes (not the actual ring) 143 struct io_uring_sqe * sqes; 144 145 // The location and size of the mmaped area 146 void * ring_ptr; 147 size_t ring_sz; 148 149 // Statistics 150 #if !defined(__CFA_NO_STATISTICS__) 151 struct { 152 struct { 153 unsigned long long int val; 154 unsigned long long int cnt; 155 } submit_avg; 156 } stats; 157 #endif 158 }; 159 160 struct io_uring_cq { 161 // Head and tail of the ring 162 volatile uint32_t * head; 163 volatile uint32_t * tail; 164 165 // number of entries and mask to go with it 166 const uint32_t * mask; 167 const uint32_t * num; 168 169 // number of cqes not submitted (whatever that means) 170 uint32_t * overflow; 171 172 // the kernel ring 173 struct io_uring_cqe * cqes; 174 175 // The location and size of the mmaped area 176 void * ring_ptr; 177 size_t ring_sz; 178 179 // Statistics 180 #if !defined(__CFA_NO_STATISTICS__) 181 struct { 182 struct { 183 unsigned long long int val; 184 unsigned long long int slow_cnt; 185 unsigned long long int fast_cnt; 186 } completed_avg; 187 } stats; 188 #endif 189 }; 190 191 #if defined(__CFA_IO_POLLING_USER__) 192 struct __io_poller_fast { 193 struct io_ring * ring; 194 bool waiting; 195 $thread thrd; 196 }; 197 #endif 198 199 struct io_ring { 200 struct io_uring_sq submit_q; 201 struct io_uring_cq completion_q; 202 uint32_t flags; 203 int fd; 204 semaphore submit; 205 volatile bool done; 206 struct { 207 struct { 208 void * stack; 209 pthread_t kthrd; 210 } slow; 211 #if defined(__CFA_IO_POLLING_USER__) 212 __io_poller_fast fast; 213 __bin_sem_t sem; 214 #endif 215 } poller; 216 }; 217 #endif 116 struct __io_data; 218 117 219 118 //----------------------------------------------------------------------------- … … 249 148 } node; 250 149 251 #if defined(HAVE_LINUX_IO_URING_H) 252 struct io_ring io; 253 #endif 150 struct __io_data * io; 254 151 255 152 #if !defined(__CFA_NO_STATISTICS__)
Note: See TracChangeset
for help on using the changeset viewer.