Changes in / [4385e8b:08a994e]
- Files:
-
- 7 edited
-
benchmark/io/readv.cfa (modified) (6 diffs)
-
libcfa/prelude/defines.hfa.in (modified) (1 diff)
-
libcfa/src/concurrency/io.cfa (modified) (17 diffs)
-
libcfa/src/concurrency/kernel.cfa (modified) (4 diffs)
-
libcfa/src/concurrency/kernel.hfa (modified) (4 diffs)
-
libcfa/src/concurrency/kernel_private.hfa (modified) (2 diffs)
-
libcfa/src/startup.cfa (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/readv.cfa
r4385e8b r08a994e 16 16 #include <thread.hfa> 17 17 #include <time.hfa> 18 19 #if !defined(HAVE_LINUX_IO_URING_H) 20 #warning no io uring 21 #endif 18 22 19 23 extern bool traceHeapOn(); … … 49 53 while(__atomic_load_n(&run, __ATOMIC_RELAXED)) { 50 54 int r = cfa_preadv2(fd, &iov, 1, 0, 0); 51 if(r < 0) abort( "%s\n",strerror(-r));55 if(r < 0) abort(strerror(-r)); 52 56 53 57 __atomic_fetch_add( &count, 1, __ATOMIC_SEQ_CST ); … … 59 63 unsigned long int nthreads = 2; 60 64 unsigned long int nprocs = 1; 61 int flags = 0; 65 66 printf("Setting local\n"); 67 setlocale(LC_NUMERIC, ""); 62 68 63 69 arg_loop: 64 70 for(;;) { 65 71 static struct option options[] = { 66 {"duration", required_argument, 0, 'd'}, 67 {"nthreads", required_argument, 0, 't'}, 68 {"nprocs", required_argument, 0, 'p'}, 69 {"bufsize", required_argument, 0, 'b'}, 70 {"userthread", no_argument , 0, 'u'}, 72 {"duration", required_argument, 0, 'd'}, 73 {"nthreads", required_argument, 0, 't'}, 74 {"nprocs", required_argument, 0, 'p'}, 75 {"bufsize", required_argument, 0, 'b'}, 71 76 {0, 0, 0, 0} 72 77 }; 73 78 74 79 int idx = 0; 75 int opt = getopt_long(argc, argv, "d:t:p:b: u", options, &idx);80 int opt = getopt_long(argc, argv, "d:t:p:b:", options, &idx); 76 81 77 82 const char * arg = optarg ? optarg : ""; … … 110 115 } 111 116 break; 112 case 'u':113 flags |= CFA_CLUSTER_IO_POLLER_USER_THREAD;114 break;115 117 // Other cases 116 118 default: /* ? */ … … 133 135 } 134 136 135 printf("Running %lu threads , reading %lu bytes each, over %lu processors for %lf seconds\n", buflen, nthreads, nprocs, duration);137 printf("Running %lu threads over %lu processors for %lf seconds\n", nthreads, nprocs, duration); 136 138 137 139 { 138 140 Time start, end; 139 cluster cl = { "IO Cluster" , flags};141 cluster cl = { "IO Cluster" }; 140 142 the_cluster = &cl; 141 143 #if !defined(__CFA_NO_STATISTICS__) … … 159 161 } 160 162 } 161 printf("Took % 'ld ms\n", (end - start)`ms);163 printf("Took %ld ms\n", (end - start)`ms); 162 164 printf("Total reads: %'zu\n", count); 163 printf("Reads per second: %'.2lf\n", ((double)count) / (end - start)`s); 164 printf("Total read size: %'zu\n", buflen * count); 165 printf("Bytes per second: %'.2lf\n", ((double)count * buflen) / (end - start)`s); 165 printf("Reads per second: %'lf\n", ((double)count) / (end - start)`s); 166 166 } 167 167 -
libcfa/prelude/defines.hfa.in
r4385e8b r08a994e 19 19 #undef HAVE_PWRITEV2 20 20 21 #undef __CFA_NO_STATISTICS__ 21 // #define __CFA_IO_POLLING_USER__ 22 // #define __CFA_IO_POLLING_KERNEL__ -
libcfa/src/concurrency/io.cfa
r4385e8b r08a994e 20 20 21 21 #if !defined(HAVE_LINUX_IO_URING_H) 22 void __kernel_io_startup( cluster &, int,bool ) {22 void __kernel_io_startup( cluster &, bool ) { 23 23 // Nothing to do without io_uring 24 24 } … … 86 86 #endif 87 87 88 // Fast poller user-thread 89 // Not using the "thread" keyword because we want to control 90 // more carefully when to start/stop it 91 struct __io_poller_fast { 92 struct __io_data * ring; 93 bool waiting; 94 $thread thrd; 95 }; 96 97 void ?{}( __io_poller_fast & this, struct cluster & cltr ) { 98 this.ring = cltr.io; 99 this.waiting = true; 100 (this.thrd){ "Fast I/O Poller", cltr }; 101 } 102 void ^?{}( __io_poller_fast & mutex this ); 103 void main( __io_poller_fast & this ); 104 static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; } 105 void ^?{}( __io_poller_fast & mutex this ) {} 106 107 struct __submition_data { 108 // Head and tail of the ring (associated with array) 109 volatile uint32_t * head; 110 volatile uint32_t * tail; 111 112 // The actual kernel ring which uses head/tail 113 // indexes into the sqes arrays 114 uint32_t * array; 115 116 // number of entries and mask to go with it 117 const uint32_t * num; 118 const uint32_t * mask; 119 120 // Submission flags (Not sure what for) 121 uint32_t * flags; 122 123 // number of sqes not submitted (whatever that means) 124 uint32_t * dropped; 125 126 // Like head/tail but not seen by the kernel 127 volatile uint32_t alloc; 128 volatile uint32_t ready; 129 130 __spinlock_t lock; 131 132 // A buffer of sqes (not the actual ring) 133 struct io_uring_sqe * sqes; 134 135 // The location and size of the mmaped area 136 void * ring_ptr; 137 size_t ring_sz; 138 139 // Statistics 140 #if !defined(__CFA_NO_STATISTICS__) 141 struct { 142 struct { 143 volatile unsigned long long int val; 144 volatile unsigned long long int cnt; 145 volatile unsigned long long int block; 146 } submit_avg; 147 } stats; 148 #endif 149 }; 150 151 struct __completion_data { 152 // Head and tail of the ring 153 volatile uint32_t * head; 154 volatile uint32_t * tail; 155 156 // number of entries and mask to go with it 157 const uint32_t * mask; 158 const uint32_t * num; 159 160 // number of cqes not submitted (whatever that means) 161 uint32_t * overflow; 162 163 // the kernel ring 164 struct io_uring_cqe * cqes; 165 166 // The location and size of the mmaped area 167 void * ring_ptr; 168 size_t ring_sz; 169 170 // Statistics 171 #if !defined(__CFA_NO_STATISTICS__) 172 struct { 173 struct { 174 unsigned long long int val; 175 unsigned long long int slow_cnt; 176 unsigned long long int fast_cnt; 177 } completed_avg; 178 } stats; 179 #endif 180 }; 181 182 struct __io_data { 183 struct __submition_data submit_q; 184 struct __completion_data completion_q; 185 uint32_t ring_flags; 186 int cltr_flags; 187 int fd; 188 semaphore submit; 189 volatile bool done; 190 struct { 191 struct { 192 void * stack; 193 pthread_t kthrd; 194 } slow; 195 __io_poller_fast fast; 196 __bin_sem_t sem; 197 } poller; 198 }; 88 #if defined(__CFA_IO_POLLING_USER__) 89 void ?{}( __io_poller_fast & this, struct cluster & cltr ) { 90 this.ring = &cltr.io; 91 (this.thrd){ "Fast I/O Poller", cltr }; 92 } 93 void ^?{}( __io_poller_fast & mutex this ); 94 void main( __io_poller_fast & this ); 95 static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; } 96 void ^?{}( __io_poller_fast & mutex this ) {} 97 #endif 199 98 200 99 //============================================================================================= 201 100 // I/O Startup / Shutdown logic 202 101 //============================================================================================= 203 void __kernel_io_startup( cluster & this, int io_flags, bool main_cluster ) { 204 this.io = malloc(); 205 102 void __kernel_io_startup( cluster & this, bool main_cluster ) { 206 103 // Step 1 : call to setup 207 104 struct io_uring_params params; … … 216 113 217 114 // Step 2 : mmap result 218 memset( this.io, 0, sizeof(struct __io_data));219 struct __submition_data & sq = this.io->submit_q;220 struct __completion_data & cq = this.io->completion_q;115 memset(&this.io, 0, sizeof(struct io_ring)); 116 struct io_uring_sq & sq = this.io.submit_q; 117 struct io_uring_cq & cq = this.io.completion_q; 221 118 222 119 // calculate the right ring size … … 296 193 297 194 // Update the global ring info 298 this.io->ring_flags = params.flags; 299 this.io->cltr_flags = io_flags; 300 this.io->fd = fd; 301 this.io->done = false; 302 (this.io->submit){ min(*sq.num, *cq.num) }; 195 this.io.flags = params.flags; 196 this.io.fd = fd; 197 this.io.done = false; 198 (this.io.submit){ min(*sq.num, *cq.num) }; 303 199 304 200 // Initialize statistics 305 201 #if !defined(__CFA_NO_STATISTICS__) 306 this.io->submit_q.stats.submit_avg.val = 0; 307 this.io->submit_q.stats.submit_avg.cnt = 0; 308 this.io->submit_q.stats.submit_avg.block = 0; 309 this.io->completion_q.stats.completed_avg.val = 0; 310 this.io->completion_q.stats.completed_avg.slow_cnt = 0; 311 this.io->completion_q.stats.completed_avg.fast_cnt = 0; 202 this.io.submit_q.stats.submit_avg.val = 0; 203 this.io.submit_q.stats.submit_avg.cnt = 0; 204 this.io.completion_q.stats.completed_avg.val = 0; 205 this.io.completion_q.stats.completed_avg.cnt = 0; 312 206 #endif 313 207 … … 318 212 319 213 void __kernel_io_finish_start( cluster & this ) { 320 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {214 #if defined(__CFA_IO_POLLING_USER__) 321 215 __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this); 322 (this.io ->poller.fast){ this };323 __thrd_start( this.io ->poller.fast, main );324 }216 (this.io.poller.fast){ this }; 217 __thrd_start( this.io.poller.fast, main ); 218 #endif 325 219 326 220 // Create the poller thread 327 221 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluter %p\n", &this); 328 this.io ->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );222 this.io.poller.slow.stack = __create_pthread( &this.io.poller.slow.kthrd, __io_poller_slow, &this ); 329 223 } 330 224 … … 332 226 __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this); 333 227 // Notify the poller thread of the shutdown 334 __atomic_store_n(&this.io ->done, true, __ATOMIC_SEQ_CST);228 __atomic_store_n(&this.io.done, true, __ATOMIC_SEQ_CST); 335 229 336 230 // Stop the IO Poller 337 231 sigval val = { 1 }; 338 pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val ); 339 post( this.io->poller.sem ); 232 pthread_sigqueue( this.io.poller.slow.kthrd, SIGUSR1, val ); 233 #if defined(__CFA_IO_POLLING_USER__) 234 post( this.io.poller.sem ); 235 #endif 340 236 341 237 // Wait for the poller thread to finish 342 pthread_join( this.io ->poller.slow.kthrd, 0p );343 free( this.io ->poller.slow.stack );238 pthread_join( this.io.poller.slow.kthrd, 0p ); 239 free( this.io.poller.slow.stack ); 344 240 345 241 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this); 346 242 347 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) { 348 with( this.io->poller.fast ) { 349 /* paranoid */ verify( waiting ); // The thread shouldn't be in a system call 350 /* paranoid */ verify( this.procs.head == 0p || &this == mainCluster ); 351 /* paranoid */ verify( this.idles.head == 0p || &this == mainCluster ); 352 353 // We need to adjust the clean-up based on where the thread is 354 if( thrd.preempted != __NO_PREEMPTION ) { 355 356 // This is the tricky case 357 // The thread was preempted and now it is on the ready queue 358 /* paranoid */ verify( thrd.state == Active ); // The thread better be in this state 359 /* paranoid */ verify( thrd.next == 1p ); // The thread should be the last on the list 360 /* paranoid */ verify( this.ready_queue.head == &thrd ); // The thread should be the only thing on the list 361 362 // Remove the thread from the ready queue of this cluster 363 this.ready_queue.head = 1p; 364 thrd.next = 0p; 365 366 // Fixup the thread state 367 thrd.state = Blocked; 368 thrd.preempted = __NO_PREEMPTION; 369 370 // Pretend like the thread was blocked all along 371 } 372 // !!! This is not an else if !!! 373 if( thrd.state == Blocked ) { 374 375 // This is the "easy case" 376 // The thread is parked and can easily be moved to active cluster 377 verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster ); 378 thrd.curr_cluster = active_cluster(); 379 243 #if defined(__CFA_IO_POLLING_USER__) 380 244 // unpark the fast io_poller 381 unpark( &thrd __cfaabi_dbg_ctx2 ); 382 } 383 else { 384 385 // The thread is in a weird state 386 // I don't know what to do here 387 abort("Fast poller thread is in unexpected state, cannot clean-up correctly\n"); 388 } 389 390 } 391 392 ^(this.io->poller.fast){}; 245 unpark( &this.io.poller.fast.thrd __cfaabi_dbg_ctx2 ); 246 247 ^(this.io.poller.fast){}; 393 248 394 249 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this); 395 }250 #endif 396 251 } 397 252 … … 404 259 #if !defined(__CFA_NO_STATISTICS__) 405 260 if(this.print_stats) { 406 with(this.io->submit_q.stats, this.io->completion_q.stats) { 407 __cfaabi_bits_print_safe( STDERR_FILENO, 408 "----- I/O uRing Stats -----\n" 409 "- total submit calls : %'llu\n" 410 "- avg submit : %'.2lf\n" 411 "- pre-submit block %% : %'.2lf\n" 412 "- total wait calls : %'llu (%'llu slow, %'llu fast)\n" 413 "- avg completion/wait : %'.2lf\n", 414 submit_avg.cnt, 415 ((double)submit_avg.val) / submit_avg.cnt, 416 (100.0 * submit_avg.block) / submit_avg.cnt, 417 completed_avg.slow_cnt + completed_avg.fast_cnt, 418 completed_avg.slow_cnt, completed_avg.fast_cnt, 419 ((double)completed_avg.val) / (completed_avg.slow_cnt + completed_avg.fast_cnt) 420 ); 421 } 261 __cfaabi_bits_print_safe( STDERR_FILENO, 262 "----- I/O uRing Stats -----\n" 263 "- total submit calls : %llu\n" 264 "- avg submit : %lf\n" 265 "- total wait calls : %llu\n" 266 "- avg completion/wait : %lf\n", 267 this.io.submit_q.stats.submit_avg.cnt, 268 ((double)this.io.submit_q.stats.submit_avg.val) / this.io.submit_q.stats.submit_avg.cnt, 269 this.io.completion_q.stats.completed_avg.cnt, 270 ((double)this.io.completion_q.stats.completed_avg.val) / this.io.completion_q.stats.completed_avg.cnt 271 ); 422 272 } 423 273 #endif 424 274 425 275 // Shutdown the io rings 426 struct __submition_data & sq = this.io->submit_q;427 struct __completion_data & cq = this.io->completion_q;276 struct io_uring_sq & sq = this.io.submit_q; 277 struct io_uring_cq & cq = this.io.completion_q; 428 278 429 279 // unmap the submit queue entries … … 439 289 440 290 // close the file descriptor 441 close(this.io->fd); 442 443 free( this.io ); 291 close(this.io.fd); 444 292 } 445 293 … … 454 302 // Process a single completion message from the io_uring 455 303 // This is NOT thread-safe 456 static int __drain_io( struct __io_data& ring, sigset_t * mask, int waitcnt, bool in_kernel ) {304 static int __drain_io( struct io_ring & ring, sigset_t * mask, int waitcnt, bool in_kernel ) { 457 305 int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8); 458 306 if( ret < 0 ) { … … 472 320 // Nothing was new return 0 473 321 if (head == tail) { 322 #if !defined(__CFA_NO_STATISTICS__) 323 ring.completion_q.stats.completed_avg.cnt += 1; 324 #endif 474 325 return 0; 475 326 } … … 497 348 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED ); 498 349 350 // Update statistics 351 #if !defined(__CFA_NO_STATISTICS__) 352 ring.completion_q.stats.completed_avg.val += count; 353 ring.completion_q.stats.completed_avg.cnt += 1; 354 #endif 355 499 356 return count; 500 357 } … … 502 359 static void * __io_poller_slow( void * arg ) { 503 360 cluster * cltr = (cluster *)arg; 504 struct __io_data & ring = *cltr->io;361 struct io_ring & ring = cltr->io; 505 362 506 363 sigset_t mask; … … 515 372 verify( (*ring.completion_q.head) == (*ring.completion_q.tail) ); 516 373 517 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p ready\n", &ring); 518 519 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) { 520 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 374 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 375 #if defined(__CFA_IO_POLLING_USER__) 376 521 377 // In the user-thread approach drain and if anything was drained, 522 378 // batton pass to the user-thread 523 379 int count = __drain_io( ring, &mask, 1, true ); 524 525 // Update statistics526 #if !defined(__CFA_NO_STATISTICS__)527 ring.completion_q.stats.completed_avg.val += count;528 ring.completion_q.stats.completed_avg.slow_cnt += 1;529 #endif530 531 380 if(count > 0) { 532 381 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring); … … 534 383 wait( ring.poller.sem ); 535 384 } 385 386 #else 387 388 //In the naive approach, just poll the io completion queue directly 389 __drain_io( ring, &mask, 1, true ); 390 391 #endif 392 } 393 394 return 0p; 395 } 396 397 #if defined(__CFA_IO_POLLING_USER__) 398 void main( __io_poller_fast & this ) { 399 // Start parked 400 park( __cfaabi_dbg_ctx ); 401 402 // Then loop until we need to start 403 while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) { 404 // Drain the io 405 if(0 > __drain_io( *this.ring, 0p, 0, false )) { 406 // If we got something, just yield and check again 407 yield(); 408 } 409 else { 410 // We didn't get anything baton pass to the slow poller 411 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring); 412 post( this.ring->poller.sem ); 413 park( __cfaabi_dbg_ctx ); 414 } 536 415 } 537 416 } 538 else { 539 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 540 //In the naive approach, just poll the io completion queue directly 541 int count = __drain_io( ring, &mask, 1, true ); 542 543 // Update statistics 544 #if !defined(__CFA_NO_STATISTICS__) 545 ring.completion_q.stats.completed_avg.val += count; 546 ring.completion_q.stats.completed_avg.slow_cnt += 1; 547 #endif 548 } 549 } 550 551 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring); 552 553 return 0p; 554 } 555 556 void main( __io_poller_fast & this ) { 557 verify( this.ring->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ); 558 559 // Start parked 560 park( __cfaabi_dbg_ctx ); 561 562 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring); 563 564 int reset = 0; 565 566 // Then loop until we need to start 567 while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) { 568 // Drain the io 569 this.waiting = false; 570 int count = __drain_io( *this.ring, 0p, 0, false ); 571 reset += count > 0 ? 1 : 0; 572 573 // Update statistics 574 #if !defined(__CFA_NO_STATISTICS__) 575 this.ring->completion_q.stats.completed_avg.val += count; 576 this.ring->completion_q.stats.completed_avg.fast_cnt += 1; 577 #endif 578 579 this.waiting = true; 580 if(reset < 5) { 581 // If we got something, just yield and check again 582 yield(); 583 } 584 else { 585 // We didn't get anything baton pass to the slow poller 586 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring); 587 post( this.ring->poller.sem ); 588 park( __cfaabi_dbg_ctx ); 589 reset = 0; 590 } 591 } 592 593 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring); 594 } 417 #endif 595 418 596 419 //============================================================================================= … … 622 445 // 623 446 624 static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data& ring ) {447 static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct io_ring & ring ) { 625 448 // Wait for a spot to be available 626 __attribute__((unused)) bool blocked = P(ring.submit); 627 #if !defined(__CFA_NO_STATISTICS__) 628 __atomic_fetch_add( &ring.submit_q.stats.submit_avg.block, blocked ? 1ul64 : 0ul64, __ATOMIC_RELAXED ); 629 #endif 449 P(ring.submit); 630 450 631 451 // Allocate the sqe … … 643 463 } 644 464 645 static inline void __submit( struct __io_data& ring, uint32_t idx ) {465 static inline void __submit( struct io_ring & ring, uint32_t idx ) { 646 466 // get mutual exclusion 647 467 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); … … 704 524 705 525 #define __submit_prelude \ 706 struct __io_data & ring = *active_cluster()->io; \526 struct io_ring & ring = active_cluster()->io; \ 707 527 struct io_uring_sqe * sqe; \ 708 528 uint32_t idx; \ -
libcfa/src/concurrency/kernel.cfa
r4385e8b r08a994e 254 254 } 255 255 256 void ?{}(cluster & this, const char name[], Duration preemption_rate , int io_flags) with( this ) {256 void ?{}(cluster & this, const char name[], Duration preemption_rate) with( this ) { 257 257 this.name = name; 258 258 this.preemption_rate = preemption_rate; … … 268 268 threads{ __get }; 269 269 270 __kernel_io_startup( this, io_flags,&this == mainCluster );270 __kernel_io_startup( this, &this == mainCluster ); 271 271 272 272 doregister(this); … … 987 987 void ^?{}(semaphore & this) {} 988 988 989 boolP(semaphore & this) with( this ){989 void P(semaphore & this) with( this ){ 990 990 lock( lock __cfaabi_dbg_ctx2 ); 991 991 count -= 1; … … 997 997 unlock( lock ); 998 998 park( __cfaabi_dbg_ctx ); 999 return true;1000 999 } 1001 1000 else { 1002 1001 unlock( lock ); 1003 return false;1004 1002 } 1005 1003 } -
libcfa/src/concurrency/kernel.hfa
r4385e8b r08a994e 38 38 void ?{}(semaphore & this, int count = 1); 39 39 void ^?{}(semaphore & this); 40 boolP (semaphore & this);40 void P (semaphore & this); 41 41 bool V (semaphore & this); 42 42 bool V (semaphore & this, unsigned count); … … 114 114 //----------------------------------------------------------------------------- 115 115 // I/O 116 struct __io_data; 117 118 #define CFA_CLUSTER_IO_POLLER_USER_THREAD 1 << 0 119 // #define CFA_CLUSTER_IO_POLLER_KERNEL_SIDE 1 << 1 116 #if defined(HAVE_LINUX_IO_URING_H) 117 struct io_uring_sq { 118 // Head and tail of the ring (associated with array) 119 volatile uint32_t * head; 120 volatile uint32_t * tail; 121 122 // The actual kernel ring which uses head/tail 123 // indexes into the sqes arrays 124 uint32_t * array; 125 126 // number of entries and mask to go with it 127 const uint32_t * num; 128 const uint32_t * mask; 129 130 // Submission flags (Not sure what for) 131 uint32_t * flags; 132 133 // number of sqes not submitted (whatever that means) 134 uint32_t * dropped; 135 136 // Like head/tail but not seen by the kernel 137 volatile uint32_t alloc; 138 volatile uint32_t ready; 139 140 __spinlock_t lock; 141 142 // A buffer of sqes (not the actual ring) 143 struct io_uring_sqe * sqes; 144 145 // The location and size of the mmaped area 146 void * ring_ptr; 147 size_t ring_sz; 148 149 // Statistics 150 #if !defined(__CFA_NO_STATISTICS__) 151 struct { 152 struct { 153 unsigned long long int val; 154 unsigned long long int cnt; 155 } submit_avg; 156 } stats; 157 #endif 158 }; 159 160 struct io_uring_cq { 161 // Head and tail of the ring 162 volatile uint32_t * head; 163 volatile uint32_t * tail; 164 165 // number of entries and mask to go with it 166 const uint32_t * mask; 167 const uint32_t * num; 168 169 // number of cqes not submitted (whatever that means) 170 uint32_t * overflow; 171 172 // the kernel ring 173 struct io_uring_cqe * cqes; 174 175 // The location and size of the mmaped area 176 void * ring_ptr; 177 size_t ring_sz; 178 179 // Statistics 180 #if !defined(__CFA_NO_STATISTICS__) 181 struct { 182 struct { 183 unsigned long long int val; 184 unsigned long long int cnt; 185 } completed_avg; 186 } stats; 187 #endif 188 }; 189 190 #if defined(__CFA_IO_POLLING_USER__) 191 struct __io_poller_fast { 192 struct io_ring * ring; 193 $thread thrd; 194 }; 195 #endif 196 197 struct io_ring { 198 struct io_uring_sq submit_q; 199 struct io_uring_cq completion_q; 200 uint32_t flags; 201 int fd; 202 semaphore submit; 203 volatile bool done; 204 struct { 205 struct { 206 void * stack; 207 pthread_t kthrd; 208 } slow; 209 #if defined(__CFA_IO_POLLING_USER__) 210 __io_poller_fast fast; 211 __bin_sem_t sem; 212 #endif 213 } poller; 214 }; 215 #endif 120 216 121 217 //----------------------------------------------------------------------------- … … 151 247 } node; 152 248 153 struct __io_data * io; 249 #if defined(HAVE_LINUX_IO_URING_H) 250 struct io_ring io; 251 #endif 154 252 155 253 #if !defined(__CFA_NO_STATISTICS__) … … 159 257 extern Duration default_preemption(); 160 258 161 void ?{} (cluster & this, const char name[], Duration preemption_rate , int flags);259 void ?{} (cluster & this, const char name[], Duration preemption_rate); 162 260 void ^?{}(cluster & this); 163 261 164 static inline void ?{} (cluster & this) { this{"Anonymous Cluster", default_preemption(), 0}; } 165 static inline void ?{} (cluster & this, Duration preemption_rate) { this{"Anonymous Cluster", preemption_rate, 0}; } 166 static inline void ?{} (cluster & this, const char name[]) { this{name, default_preemption(), 0}; } 167 static inline void ?{} (cluster & this, int flags) { this{"Anonymous Cluster", default_preemption(), flags}; } 168 static inline void ?{} (cluster & this, Duration preemption_rate, int flags) { this{"Anonymous Cluster", preemption_rate, flags}; } 169 static inline void ?{} (cluster & this, const char name[], int flags) { this{name, default_preemption(), flags}; } 262 static inline void ?{} (cluster & this) { this{"Anonymous Cluster", default_preemption()}; } 263 static inline void ?{} (cluster & this, Duration preemption_rate) { this{"Anonymous Cluster", preemption_rate}; } 264 static inline void ?{} (cluster & this, const char name[]) { this{name, default_preemption()}; } 170 265 171 266 static inline [cluster *&, cluster *& ] __get( cluster & this ) __attribute__((const)) { return this.node.[next, prev]; } -
libcfa/src/concurrency/kernel_private.hfa
r4385e8b r08a994e 59 59 extern volatile thread_local __cfa_kernel_preemption_state_t preemption_state __attribute__ ((tls_model ( "initial-exec" ))); 60 60 61 extern cluster * mainCluster;62 63 61 //----------------------------------------------------------------------------- 64 62 // Threads … … 77 75 //----------------------------------------------------------------------------- 78 76 // I/O 79 void __kernel_io_startup ( cluster &, int,bool );77 void __kernel_io_startup ( cluster &, bool ); 80 78 void __kernel_io_finish_start( cluster & ); 81 79 void __kernel_io_prepare_stop( cluster & ); -
libcfa/src/startup.cfa
r4385e8b r08a994e 14 14 // 15 15 16 #include <time.h> // tzset 17 #include <locale.h> // setlocale 16 #include <time.h> // tzset 18 17 #include "startup.hfa" 19 18 … … 22 21 void __cfaabi_appready_startup( void ) { 23 22 tzset(); // initialize time global variables 24 setlocale(LC_NUMERIC, "");25 23 #ifdef __CFA_DEBUG__ 26 24 extern void heapAppStart();
Note:
See TracChangeset
for help on using the changeset viewer.