source: libcfa/src/concurrency/io.cfa@ 04b5cef

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since 04b5cef was 8834751, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Moved statistics to stats.cfa to combine ready Q stats and IO stats

  • Property mode set to 100644
File size: 36.5 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// io.cfa --
8//
9// Author : Thierry Delisle
10// Created On : Thu Apr 23 17:31:00 2020
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16// #define __CFA_DEBUG_PRINT_IO__
17// #define __CFA_DEBUG_PRINT_IO_CORE__
18
19#include "kernel.hfa"
20#include "bitmanip.hfa"
21
22#if !defined(HAVE_LINUX_IO_URING_H)
23 void __kernel_io_startup( cluster &, unsigned, bool ) {
24 // Nothing to do without io_uring
25 }
26
27 void __kernel_io_finish_start( cluster & ) {
28 // Nothing to do without io_uring
29 }
30
31 void __kernel_io_prepare_stop( cluster & ) {
32 // Nothing to do without io_uring
33 }
34
35 void __kernel_io_shutdown( cluster &, bool ) {
36 // Nothing to do without io_uring
37 }
38
39#else
40 extern "C" {
41 #define _GNU_SOURCE /* See feature_test_macros(7) */
42 #include <errno.h>
43 #include <stdint.h>
44 #include <string.h>
45 #include <unistd.h>
46 #include <sys/mman.h>
47 #include <sys/syscall.h>
48
49 #include <linux/io_uring.h>
50 }
51
52 #include "bits/signal.hfa"
53 #include "kernel_private.hfa"
54 #include "thread.hfa"
55
56 uint32_t entries_per_cluster() {
57 return 256;
58 }
59
60 static void * __io_poller_slow( void * arg );
61
62 // Weirdly, some systems that do support io_uring don't actually define these
63 #ifdef __alpha__
64 /*
65 * alpha is the only exception, all other architectures
66 * have common numbers for new system calls.
67 */
68 #ifndef __NR_io_uring_setup
69 #define __NR_io_uring_setup 535
70 #endif
71 #ifndef __NR_io_uring_enter
72 #define __NR_io_uring_enter 536
73 #endif
74 #ifndef __NR_io_uring_register
75 #define __NR_io_uring_register 537
76 #endif
77 #else /* !__alpha__ */
78 #ifndef __NR_io_uring_setup
79 #define __NR_io_uring_setup 425
80 #endif
81 #ifndef __NR_io_uring_enter
82 #define __NR_io_uring_enter 426
83 #endif
84 #ifndef __NR_io_uring_register
85 #define __NR_io_uring_register 427
86 #endif
87 #endif
88
89 // Fast poller user-thread
90 // Not using the "thread" keyword because we want to control
91 // more carefully when to start/stop it
92 struct __io_poller_fast {
93 struct __io_data * ring;
94 $thread thrd;
95 };
96
97 void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
98 this.ring = cltr.io;
99 (this.thrd){ "Fast I/O Poller", cltr };
100 }
101 void ^?{}( __io_poller_fast & mutex this );
102 void main( __io_poller_fast & this );
103 static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
104 void ^?{}( __io_poller_fast & mutex this ) {}
105
106 struct __submition_data {
107 // Head and tail of the ring (associated with array)
108 volatile uint32_t * head;
109 volatile uint32_t * tail;
110
111 // The actual kernel ring which uses head/tail
112 // indexes into the sqes arrays
113 uint32_t * array;
114
115 // number of entries and mask to go with it
116 const uint32_t * num;
117 const uint32_t * mask;
118
119 // Submission flags (Not sure what for)
120 uint32_t * flags;
121
122 // number of sqes not submitted (whatever that means)
123 uint32_t * dropped;
124
125 // Like head/tail but not seen by the kernel
126 volatile uint32_t * ready;
127 uint32_t ready_cnt;
128
129 __spinlock_t lock;
130
131 // A buffer of sqes (not the actual ring)
132 struct io_uring_sqe * sqes;
133
134 // The location and size of the mmaped area
135 void * ring_ptr;
136 size_t ring_sz;
137 };
138
139 struct __completion_data {
140 // Head and tail of the ring
141 volatile uint32_t * head;
142 volatile uint32_t * tail;
143
144 // number of entries and mask to go with it
145 const uint32_t * mask;
146 const uint32_t * num;
147
148 // number of cqes not submitted (whatever that means)
149 uint32_t * overflow;
150
151 // the kernel ring
152 struct io_uring_cqe * cqes;
153
154 // The location and size of the mmaped area
155 void * ring_ptr;
156 size_t ring_sz;
157 };
158
159 struct __io_data {
160 struct __submition_data submit_q;
161 struct __completion_data completion_q;
162 uint32_t ring_flags;
163 int cltr_flags;
164 int fd;
165 semaphore submit;
166 volatile bool done;
167 struct {
168 struct {
169 void * stack;
170 pthread_t kthrd;
171 volatile bool blocked;
172 } slow;
173 __io_poller_fast fast;
174 __bin_sem_t sem;
175 } poller;
176 };
177
178//=============================================================================================
179// I/O Startup / Shutdown logic
180//=============================================================================================
181 void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) {
182 this.io = malloc();
183
184 // Step 1 : call to setup
185 struct io_uring_params params;
186 memset(&params, 0, sizeof(params));
187
188 uint32_t nentries = entries_per_cluster();
189
190 int fd = syscall(__NR_io_uring_setup, nentries, &params );
191 if(fd < 0) {
192 abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
193 }
194
195 // Step 2 : mmap result
196 memset( this.io, 0, sizeof(struct __io_data) );
197 struct __submition_data & sq = this.io->submit_q;
198 struct __completion_data & cq = this.io->completion_q;
199
200 // calculate the right ring size
201 sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned) );
202 cq.ring_sz = params.cq_off.cqes + (params.cq_entries * sizeof(struct io_uring_cqe));
203
204 // Requires features
205 #if defined(IORING_FEAT_SINGLE_MMAP)
206 // adjust the size according to the parameters
207 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
208 cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz);
209 }
210 #endif
211
212 // mmap the Submit Queue into existence
213 sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
214 if (sq.ring_ptr == (void*)MAP_FAILED) {
215 abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
216 }
217
218 // Requires features
219 #if defined(IORING_FEAT_SINGLE_MMAP)
220 // mmap the Completion Queue into existence (may or may not be needed)
221 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
222 cq->ring_ptr = sq->ring_ptr;
223 }
224 else
225 #endif
226 {
227 // We need multiple call to MMAP
228 cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
229 if (cq.ring_ptr == (void*)MAP_FAILED) {
230 munmap(sq.ring_ptr, sq.ring_sz);
231 abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
232 }
233 }
234
235 // mmap the submit queue entries
236 size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
237 sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
238 if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
239 munmap(sq.ring_ptr, sq.ring_sz);
240 if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
241 abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
242 }
243
244 // Get the pointers from the kernel to fill the structure
245 // submit queue
246 sq.head = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
247 sq.tail = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
248 sq.mask = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
249 sq.num = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
250 sq.flags = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
251 sq.dropped = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
252 sq.array = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
253
254 {
255 const uint32_t num = *sq.num;
256 for( i; num ) {
257 sq.sqes[i].user_data = 0ul64;
258 }
259 }
260
261 if( io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
262 /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8) );
263 sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8);
264 sq.ready = alloc_align( 64, sq.ready_cnt );
265 for(i; sq.ready_cnt) {
266 sq.ready[i] = -1ul32;
267 }
268 }
269 else {
270 sq.ready_cnt = 0;
271 sq.ready = 0p;
272 }
273
274 // completion queue
275 cq.head = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
276 cq.tail = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
277 cq.mask = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
278 cq.num = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
279 cq.overflow = ( uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
280 cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
281
282 // some paranoid checks
283 /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask );
284 /* paranoid */ verifyf( (*cq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num );
285 /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head );
286 /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail );
287
288 /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
289 /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
290 /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );
291 /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );
292
293 // Update the global ring info
294 this.io->ring_flags = params.flags;
295 this.io->cltr_flags = io_flags;
296 this.io->fd = fd;
297 this.io->done = false;
298 (this.io->submit){ min(*sq.num, *cq.num) };
299
300 if(!main_cluster) {
301 __kernel_io_finish_start( this );
302 }
303 }
304
305 void __kernel_io_finish_start( cluster & this ) {
306 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
307 __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this);
308 (this.io->poller.fast){ this };
309 __thrd_start( this.io->poller.fast, main );
310 }
311
312 // Create the poller thread
313 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluter %p\n", &this);
314 this.io->poller.slow.blocked = false;
315 this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
316 }
317
318 void __kernel_io_prepare_stop( cluster & this ) {
319 __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this);
320 // Notify the poller thread of the shutdown
321 __atomic_store_n(&this.io->done, true, __ATOMIC_SEQ_CST);
322
323 // Stop the IO Poller
324 sigval val = { 1 };
325 pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val );
326 post( this.io->poller.sem );
327
328 // Wait for the poller thread to finish
329 pthread_join( this.io->poller.slow.kthrd, 0p );
330 free( this.io->poller.slow.stack );
331
332 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this);
333
334 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
335 with( this.io->poller.fast ) {
336 /* paranoid */ verify( this.procs.head == 0p || &this == mainCluster );
337 /* paranoid */ verify( this.idles.head == 0p || &this == mainCluster );
338
339 // We need to adjust the clean-up based on where the thread is
340 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
341
342 // This is the tricky case
343 // The thread was preempted and now it is on the ready queue
344
345 /* paranoid */ verify( thrd.next != 0p ); // The thread should be the last on the list
346 /* paranoid */ verify( this.ready_queue.head == &thrd ); // The thread should be the only thing on the list
347
348 // Remove the thread from the ready queue of this cluster
349 this.ready_queue.head = 1p;
350 thrd.next = 0p;
351 __cfaabi_dbg_debug_do( thrd.unpark_stale = true );
352
353 // Fixup the thread state
354 thrd.state = Blocked;
355 thrd.preempted = __NO_PREEMPTION;
356
357 // Pretend like the thread was blocked all along
358 }
359 // !!! This is not an else if !!!
360 if( thrd.state == Blocked ) {
361
362 // This is the "easy case"
363 // The thread is parked and can easily be moved to active cluster
364 verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster );
365 thrd.curr_cluster = active_cluster();
366
367 // unpark the fast io_poller
368 unpark( &thrd __cfaabi_dbg_ctx2 );
369 }
370 else {
371
372 // The thread is in a weird state
373 // I don't know what to do here
374 abort("Fast poller thread is in unexpected state, cannot clean-up correctly\n");
375 }
376
377 }
378
379 ^(this.io->poller.fast){};
380
381 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this);
382 }
383 }
384
385 void __kernel_io_shutdown( cluster & this, bool main_cluster ) {
386 if(!main_cluster) {
387 __kernel_io_prepare_stop( this );
388 }
389
390 // Shutdown the io rings
391 struct __submition_data & sq = this.io->submit_q;
392 struct __completion_data & cq = this.io->completion_q;
393
394 // unmap the submit queue entries
395 munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe));
396
397 // unmap the Submit Queue ring
398 munmap(sq.ring_ptr, sq.ring_sz);
399
400 // unmap the Completion Queue ring, if it is different
401 if (cq.ring_ptr != sq.ring_ptr) {
402 munmap(cq.ring_ptr, cq.ring_sz);
403 }
404
405 // close the file descriptor
406 close(this.io->fd);
407
408 free( this.io->submit_q.ready ); // Maybe null, doesn't matter
409 free( this.io );
410 }
411
412//=============================================================================================
413// I/O Polling
414//=============================================================================================
415 struct io_user_data {
416 int32_t result;
417 $thread * thrd;
418 };
419
420 // Process a single completion message from the io_uring
421 // This is NOT thread-safe
422 static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask, int waitcnt, bool in_kernel ) {
423 unsigned to_submit = 0;
424 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
425
426 // If the poller thread also submits, then we need to aggregate the submissions which are ready
427 uint32_t tail = *ring.submit_q.tail;
428 const uint32_t mask = *ring.submit_q.mask;
429
430 // Go through the list of ready submissions
431 for( i; ring.submit_q.ready_cnt ) {
432 // replace any submission with the sentinel, to consume it.
433 uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
434
435 // If it was already the sentinel, then we are done
436 if( idx == -1ul32 ) continue;
437
438 // If we got a real submission, append it to the list
439 ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask;
440 to_submit++;
441 }
442
443 // Increment the tail based on how many we are ready to submit
444 __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST);
445 }
446
447 const uint32_t smask = *ring.submit_q.mask;
448 uint32_t shead = *ring.submit_q.head;
449 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
450 if( ret < 0 ) {
451 switch((int)errno) {
452 case EAGAIN:
453 case EINTR:
454 return -EAGAIN;
455 default:
456 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) );
457 }
458 }
459
460 verify( (shead + ret) == *ring.submit_q.head );
461
462 // Release the consumed SQEs
463 for( i; ret ) {
464 uint32_t idx = ring.submit_q.array[ (i + shead) & smask ];
465 ring.submit_q.sqes[ idx ].user_data = 0;
466 }
467
468 uint32_t avail = 0;
469 uint32_t sqe_num = *ring.submit_q.num;
470 for(i; sqe_num) {
471 if( ring.submit_q.sqes[ i ].user_data == 0 ) avail++;
472 }
473
474 // update statistics
475 #if !defined(__CFA_NO_STATISTICS__)
476 __tls_stats()->io.submit_q.stats.submit_avg.rdy += to_submit;
477 __tls_stats()->io.submit_q.stats.submit_avg.csm += ret;
478 __tls_stats()->io.submit_q.stats.submit_avg.avl += avail;
479 __tls_stats()->io.submit_q.stats.submit_avg.cnt += 1;
480 #endif
481
482 // Drain the queue
483 unsigned head = *ring.completion_q.head;
484 unsigned tail = *ring.completion_q.tail;
485 const uint32_t mask = *ring.completion_q.mask;
486
487 // Memory barrier
488 __atomic_thread_fence( __ATOMIC_SEQ_CST );
489
490 // Nothing was new return 0
491 if (head == tail) {
492 return 0;
493 }
494
495 uint32_t count = tail - head;
496 for(i; count) {
497 unsigned idx = (head + i) & mask;
498 struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
499
500 /* paranoid */ verify(&cqe);
501
502 struct io_user_data * data = (struct io_user_data *)cqe.user_data;
503 __cfadbg_print_safe( io, "Kernel I/O : Performed reading io cqe %p, result %d for %p\n", data, cqe.res, data->thrd );
504
505 data->result = cqe.res;
506 if(!in_kernel) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }
507 else { __unpark( data->thrd __cfaabi_dbg_ctx2 ); }
508 }
509
510 // Allow new submissions to happen
511 // V(ring.submit, count);
512
513 // Mark to the kernel that the cqe has been seen
514 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
515 __atomic_thread_fence( __ATOMIC_SEQ_CST );
516 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
517
518 return [count, count > 0 || to_submit > 0];
519 }
520
521 static void * __io_poller_slow( void * arg ) {
522 cluster * cltr = (cluster *)arg;
523 struct __io_data & ring = *cltr->io;
524
525 sigset_t mask;
526 sigfillset(&mask);
527 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
528 abort( "KERNEL ERROR: IO_URING - pthread_sigmask" );
529 }
530
531 sigdelset( &mask, SIGUSR1 );
532
533 verify( (*ring.submit_q.head) == (*ring.submit_q.tail) );
534 verify( (*ring.completion_q.head) == (*ring.completion_q.tail) );
535
536 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p ready\n", &ring);
537
538 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
539 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
540
541 __atomic_store_n( &ring.poller.slow.blocked, true, __ATOMIC_SEQ_CST );
542
543 // In the user-thread approach drain and if anything was drained,
544 // batton pass to the user-thread
545 int count;
546 bool again;
547 [count, again] = __drain_io( ring, &mask, 1, true );
548
549 __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST );
550
551 // Update statistics
552 #if !defined(__CFA_NO_STATISTICS__)
553 __tls_stats()->io.complete_q.stats.completed_avg.val += count;
554 __tls_stats()->io.complete_q.stats.completed_avg.slow_cnt += 1;
555 #endif
556
557 if(again) {
558 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring);
559 __unpark( &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
560 wait( ring.poller.sem );
561 }
562 }
563 }
564 else {
565 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
566 //In the naive approach, just poll the io completion queue directly
567 int count;
568 bool again;
569 [count, again] = __drain_io( ring, &mask, 1, true );
570
571 // Update statistics
572 #if !defined(__CFA_NO_STATISTICS__)
573 __tls_stats()->io.complete_q.stats.completed_avg.val += count;
574 __tls_stats()->io.complete_q.stats.completed_avg.slow_cnt += 1;
575 #endif
576 }
577 }
578
579 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring);
580
581 return 0p;
582 }
583
584 void main( __io_poller_fast & this ) {
585 verify( this.ring->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD );
586
587 // Start parked
588 park( __cfaabi_dbg_ctx );
589
590 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring);
591
592 int reset = 0;
593
594 // Then loop until we need to start
595 while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
596
597 // Drain the io
598 int count;
599 bool again;
600 [count, again] = __drain_io( *this.ring, 0p, 0, false );
601
602 if(!again) reset++;
603
604 // Update statistics
605 #if !defined(__CFA_NO_STATISTICS__)
606 __tls_stats()->io.complete_q.stats.completed_avg.val += count;
607 __tls_stats()->io.complete_q.stats.completed_avg.fast_cnt += 1;
608 #endif
609
610 // If we got something, just yield and check again
611 if(reset < 5) {
612 yield();
613 }
614 // We didn't get anything baton pass to the slow poller
615 else {
616 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
617 reset = 0;
618
619 // wake up the slow poller
620 post( this.ring->poller.sem );
621
622 // park this thread
623 park( __cfaabi_dbg_ctx );
624 }
625 }
626
627 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
628 }
629
630 static inline void __wake_poller( struct __io_data & ring ) __attribute__((artificial));
631 static inline void __wake_poller( struct __io_data & ring ) {
632 if(!__atomic_load_n( &ring.poller.slow.blocked, __ATOMIC_SEQ_CST)) return;
633
634 sigval val = { 1 };
635 pthread_sigqueue( ring.poller.slow.kthrd, SIGUSR1, val );
636 }
637
638//=============================================================================================
639// I/O Submissions
640//=============================================================================================
641
642// Submition steps :
643// 1 - We need to make sure we don't overflow any of the buffer, P(ring.submit) to make sure
644// entries are available. The semaphore make sure that there is no more operations in
645// progress then the number of entries in the buffer. This probably limits concurrency
646// more than necessary since submitted but not completed operations don't need any
647// entries in user space. However, I don't know what happens if we overflow the buffers
648// because too many requests completed at once. This is a safe approach in all cases.
649// Furthermore, with hundreds of entries, this may be okay.
650//
651// 2 - Allocate a queue entry. The ring already has memory for all entries but only the ones
652// listed in sq.array are visible by the kernel. For those not listed, the kernel does not
653// offer any assurance that an entry is not being filled by multiple flags. Therefore, we
654// need to write an allocator that allows allocating concurrently.
655//
656// 3 - Actually fill the submit entry, this is the only simple and straightforward step.
657//
658// 4 - Append the entry index to the array and adjust the tail accordingly. This operation
659// needs to arrive to two concensus at the same time:
660// A - The order in which entries are listed in the array: no two threads must pick the
661// same index for their entries
662// B - When can the tail be update for the kernel. EVERY entries in the array between
663// head and tail must be fully filled and shouldn't ever be touched again.
664//
665
666 static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ) {
667 verify( data != 0 );
668
669 // Prepare the data we need
670 __attribute((unused)) int len = 0;
671 __attribute((unused)) int block = 0;
672 uint32_t cnt = *ring.submit_q.num;
673 uint32_t mask = *ring.submit_q.mask;
674 uint32_t off = __tls_rand();
675
676 // Loop around looking for an available spot
677 LOOKING: for() {
678 // Look through the list starting at some offset
679 for(i; cnt) {
680 uint64_t expected = 0;
681 uint32_t idx = (i + off) & mask;
682 struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
683 volatile uint64_t * udata = &sqe->user_data;
684
685 if( *udata == expected &&
686 __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) )
687 {
688 // update statistics
689 #if !defined(__CFA_NO_STATISTICS__)
690 __tls_stats()->io.submit_q.stats.alloc_avg.val += len;
691 __tls_stats()->io.submit_q.stats.alloc_avg.block += block;
692 __tls_stats()->io.submit_q.stats.alloc_avg.cnt += 1;
693 #endif
694
695 // Success return the data
696 return [sqe, idx];
697 }
698 verify(expected != data);
699
700 len ++;
701 }
702
703 block++;
704 yield();
705 }
706 }
707
708 static inline void __submit( struct __io_data & ring, uint32_t idx ) {
709 // Get now the data we definetely need
710 uint32_t * const tail = ring.submit_q.tail;
711 const uint32_t mask = *ring.submit_q.mask;
712
713 // There are 2 submission schemes, check which one we are using
714 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
715 // If the poller thread submits, then we just need to add this to the ready array
716
717 /* paranoid */ verify( idx <= mask );
718 /* paranoid */ verify( idx != -1ul32 );
719
720 // We need to find a spot in the ready array
721 __attribute((unused)) int len = 0;
722 __attribute((unused)) int block = 0;
723 uint32_t ready_mask = ring.submit_q.ready_cnt - 1;
724 uint32_t off = __tls_rand();
725 LOOKING: for() {
726 for(i; ring.submit_q.ready_cnt) {
727 uint32_t ii = (i + off) & ready_mask;
728 uint32_t expected = -1ul32;
729 if( __atomic_compare_exchange_n( &ring.submit_q.ready[ii], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) {
730 break LOOKING;
731 }
732 verify(expected != idx);
733
734 len ++;
735 }
736
737 block++;
738 yield();
739 }
740
741 __wake_poller( ring );
742
743 // update statistics
744 #if !defined(__CFA_NO_STATISTICS__)
745 __tls_stats()->io.submit_q.stats.look_avg.val += len;
746 __tls_stats()->io.submit_q.stats.look_avg.block += block;
747 __tls_stats()->io.submit_q.stats.look_avg.cnt += 1;
748 #endif
749
750 __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
751 }
752 else {
753 // get mutual exclusion
754 lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
755
756 // Append to the list of ready entries
757
758 /* paranoid */ verify( idx <= mask );
759
760 ring.submit_q.array[ (*tail) & mask ] = idx & mask;
761 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
762
763 // Submit however, many entries need to be submitted
764 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);
765 if( ret < 0 ) {
766 switch((int)errno) {
767 default:
768 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
769 }
770 }
771
772 // update statistics
773 #if !defined(__CFA_NO_STATISTICS__)
774 __tls_stats()->io.submit_q.stats.submit_avg.csm += 1;
775 __tls_stats()->io.submit_q.stats.submit_avg.cnt += 1;
776 #endif
777
778 unlock(ring.submit_q.lock);
779
780 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
781 }
782 }
783
784 static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd) {
785 this.opcode = opcode;
786 #if !defined(IOSQE_ASYNC)
787 this.flags = 0;
788 #else
789 this.flags = IOSQE_ASYNC;
790 #endif
791 this.ioprio = 0;
792 this.fd = fd;
793 this.off = 0;
794 this.addr = 0;
795 this.len = 0;
796 this.rw_flags = 0;
797 this.__pad2[0] = this.__pad2[1] = this.__pad2[2] = 0;
798 }
799
800 static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd, void * addr, uint32_t len, uint64_t off ) {
801 (this){ opcode, fd };
802 this.off = off;
803 this.addr = (uint64_t)addr;
804 this.len = len;
805 }
806
807
808//=============================================================================================
809// I/O Interface
810//=============================================================================================
811
812 #define __submit_prelude \
813 io_user_data data = { 0, active_thread() }; \
814 struct __io_data & ring = *data.thrd->curr_cluster->io; \
815 struct io_uring_sqe * sqe; \
816 uint32_t idx; \
817 [sqe, idx] = __submit_alloc( ring, (uint64_t)&data );
818
819 #define __submit_wait \
820 /*__cfaabi_bits_print_safe( STDERR_FILENO, "Preparing user data %p for %p\n", &data, data.thrd );*/ \
821 verify( sqe->user_data == (uint64_t)&data ); \
822 __submit( ring, idx ); \
823 park( __cfaabi_dbg_ctx ); \
824 return data.result;
825#endif
826
827// Some forward declarations
828extern "C" {
829 #include <unistd.h>
830 #include <sys/types.h>
831 #include <sys/socket.h>
832 #include <sys/syscall.h>
833
834#if defined(HAVE_PREADV2)
835 struct iovec;
836 extern ssize_t preadv2 (int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
837#endif
838#if defined(HAVE_PWRITEV2)
839 struct iovec;
840 extern ssize_t pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
841#endif
842
843 extern int fsync(int fd);
844 extern int sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags);
845
846 struct msghdr;
847 struct sockaddr;
848 extern ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);
849 extern ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
850 extern ssize_t send(int sockfd, const void *buf, size_t len, int flags);
851 extern ssize_t recv(int sockfd, void *buf, size_t len, int flags);
852 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
853 extern int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
854
855 extern int fallocate(int fd, int mode, uint64_t offset, uint64_t len);
856 extern int posix_fadvise(int fd, uint64_t offset, uint64_t len, int advice);
857 extern int madvise(void *addr, size_t length, int advice);
858
859 extern int openat(int dirfd, const char *pathname, int flags, mode_t mode);
860 extern int close(int fd);
861
862 extern ssize_t read (int fd, void *buf, size_t count);
863}
864
865//-----------------------------------------------------------------------------
866// Asynchronous operations
867#if defined(HAVE_PREADV2)
868 ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
869 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READV)
870 return preadv2(fd, iov, iovcnt, offset, flags);
871 #else
872 __submit_prelude
873
874 (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset };
875
876 __submit_wait
877 #endif
878 }
879#endif
880
881#if defined(HAVE_PWRITEV2)
882 ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
883 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_WRITEV)
884 return pwritev2(fd, iov, iovcnt, offset, flags);
885 #else
886 __submit_prelude
887
888 (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset };
889
890 __submit_wait
891 #endif
892 }
893#endif
894
895int cfa_fsync(int fd) {
896 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FSYNC)
897 return fsync(fd);
898 #else
899 __submit_prelude
900
901 (*sqe){ IORING_OP_FSYNC, fd };
902
903 __submit_wait
904 #endif
905}
906
907int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags) {
908 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SYNC_FILE_RANGE)
909 return sync_file_range(fd, offset, nbytes, flags);
910 #else
911 __submit_prelude
912
913 (*sqe){ IORING_OP_SYNC_FILE_RANGE, fd };
914 sqe->off = offset;
915 sqe->len = nbytes;
916 sqe->sync_range_flags = flags;
917
918 __submit_wait
919 #endif
920}
921
922
923ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags) {
924 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SENDMSG)
925 return sendmsg(sockfd, msg, flags);
926 #else
927 __submit_prelude
928
929 (*sqe){ IORING_OP_SENDMSG, sockfd, msg, 1, 0 };
930 sqe->msg_flags = flags;
931
932 __submit_wait
933 #endif
934}
935
936ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags) {
937 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_RECVMSG)
938 return recvmsg(sockfd, msg, flags);
939 #else
940 __submit_prelude
941
942 (*sqe){ IORING_OP_RECVMSG, sockfd, msg, 1, 0 };
943 sqe->msg_flags = flags;
944
945 __submit_wait
946 #endif
947}
948
949ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags) {
950 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SEND)
951 return send( sockfd, buf, len, flags );
952 #else
953 __submit_prelude
954
955 (*sqe){ IORING_OP_SEND, sockfd };
956 sqe->addr = (uint64_t)buf;
957 sqe->len = len;
958 sqe->msg_flags = flags;
959
960 __submit_wait
961 #endif
962}
963
964ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags) {
965 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_RECV)
966 return recv( sockfd, buf, len, flags );
967 #else
968 __submit_prelude
969
970 (*sqe){ IORING_OP_RECV, sockfd };
971 sqe->addr = (uint64_t)buf;
972 sqe->len = len;
973 sqe->msg_flags = flags;
974
975 __submit_wait
976 #endif
977}
978
979int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) {
980 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_ACCEPT)
981 return accept4( sockfd, addr, addrlen, flags );
982 #else
983 __submit_prelude
984
985 (*sqe){ IORING_OP_ACCEPT, sockfd };
986 sqe->addr = addr;
987 sqe->addr2 = addrlen;
988 sqe->accept_flags = flags;
989
990 __submit_wait
991 #endif
992}
993
994int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) {
995 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_CONNECT)
996 return connect( sockfd, addr, addrlen );
997 #else
998 __submit_prelude
999
1000 (*sqe){ IORING_OP_CONNECT, sockfd };
1001 sqe->addr = (uint64_t)addr;
1002 sqe->off = addrlen;
1003
1004 __submit_wait
1005 #endif
1006}
1007
1008int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len) {
1009 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FALLOCATE)
1010 return fallocate( fd, mode, offset, len );
1011 #else
1012 __submit_prelude
1013
1014 (*sqe){ IORING_OP_FALLOCATE, fd };
1015 sqe->off = offset;
1016 sqe->len = length;
1017 sqe->mode = mode;
1018
1019 __submit_wait
1020 #endif
1021}
1022
1023int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice) {
1024 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_FADVISE)
1025 return posix_fadvise( fd, offset, len, advice );
1026 #else
1027 __submit_prelude
1028
1029 (*sqe){ IORING_OP_FADVISE, fd };
1030 sqe->off = (uint64_t)offset;
1031 sqe->len = length;
1032 sqe->fadvise_advice = advice;
1033
1034 __submit_wait
1035 #endif
1036}
1037
1038int cfa_madvise(void *addr, size_t length, int advice) {
1039 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_MADVISE)
1040 return madvise( addr, length, advice );
1041 #else
1042 __submit_prelude
1043
1044 (*sqe){ IORING_OP_MADVISE, 0 };
1045 sqe->addr = (uint64_t)addr;
1046 sqe->len = length;
1047 sqe->fadvise_advice = advice;
1048
1049 __submit_wait
1050 #endif
1051}
1052
1053int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode) {
1054 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_OPENAT)
1055 return openat( dirfd, pathname, flags, mode );
1056 #else
1057 __submit_prelude
1058
1059 (*sqe){ IORING_OP_OPENAT, dirfd };
1060 sqe->addr = (uint64_t)pathname;
1061 sqe->open_flags = flags;
1062 sqe->mode = mode;
1063
1064 __submit_wait
1065 #endif
1066}
1067
1068int cfa_close(int fd) {
1069 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_CLOSE)
1070 return close( fd );
1071 #else
1072 __submit_prelude
1073
1074 (*sqe){ IORING_OP_CLOSE, fd };
1075
1076 __submit_wait
1077 #endif
1078}
1079
1080
1081ssize_t cfa_read(int fd, void *buf, size_t count) {
1082 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READ)
1083 return read( fd, buf, count );
1084 #else
1085 __submit_prelude
1086
1087 (*sqe){ IORING_OP_READ, fd, buf, count, 0 };
1088
1089 __submit_wait
1090 #endif
1091}
1092
1093ssize_t cfa_write(int fd, void *buf, size_t count) {
1094 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_WRITE)
1095 return read( fd, buf, count );
1096 #else
1097 __submit_prelude
1098
1099 (*sqe){ IORING_OP_WRITE, fd, buf, count, 0 };
1100
1101 __submit_wait
1102 #endif
1103}
1104
1105//-----------------------------------------------------------------------------
1106// Check if a function is asynchronous
1107
1108// Macro magic to reduce the size of the following switch case
1109#define IS_DEFINED_APPLY(f, ...) f(__VA_ARGS__)
1110#define IS_DEFINED_SECOND(first, second, ...) second
1111#define IS_DEFINED_TEST(expansion) _CFA_IO_FEATURE_##expansion
1112#define IS_DEFINED(macro) IS_DEFINED_APPLY( IS_DEFINED_SECOND,IS_DEFINED_TEST(macro) false, true)
1113
1114bool has_user_level_blocking( fptr_t func ) {
1115 #if defined(HAVE_LINUX_IO_URING_H)
1116 #if defined(HAVE_PREADV2)
1117 if( /*func == (fptr_t)preadv2 || */
1118 func == (fptr_t)cfa_preadv2 )
1119 #define _CFA_IO_FEATURE_IORING_OP_READV ,
1120 return IS_DEFINED(IORING_OP_READV);
1121 #endif
1122
1123 #if defined(HAVE_PWRITEV2)
1124 if( /*func == (fptr_t)pwritev2 || */
1125 func == (fptr_t)cfa_pwritev2 )
1126 #define _CFA_IO_FEATURE_IORING_OP_WRITEV ,
1127 return IS_DEFINED(IORING_OP_WRITEV);
1128 #endif
1129
1130 if( /*func == (fptr_t)fsync || */
1131 func == (fptr_t)cfa_fsync )
1132 #define _CFA_IO_FEATURE_IORING_OP_FSYNC ,
1133 return IS_DEFINED(IORING_OP_FSYNC);
1134
1135 if( /*func == (fptr_t)ync_file_range || */
1136 func == (fptr_t)cfa_sync_file_range )
1137 #define _CFA_IO_FEATURE_IORING_OP_SYNC_FILE_RANGE ,
1138 return IS_DEFINED(IORING_OP_SYNC_FILE_RANGE);
1139
1140 if( /*func == (fptr_t)sendmsg || */
1141 func == (fptr_t)cfa_sendmsg )
1142 #define _CFA_IO_FEATURE_IORING_OP_SENDMSG ,
1143 return IS_DEFINED(IORING_OP_SENDMSG);
1144
1145 if( /*func == (fptr_t)recvmsg || */
1146 func == (fptr_t)cfa_recvmsg )
1147 #define _CFA_IO_FEATURE_IORING_OP_RECVMSG ,
1148 return IS_DEFINED(IORING_OP_RECVMSG);
1149
1150 if( /*func == (fptr_t)send || */
1151 func == (fptr_t)cfa_send )
1152 #define _CFA_IO_FEATURE_IORING_OP_SEND ,
1153 return IS_DEFINED(IORING_OP_SEND);
1154
1155 if( /*func == (fptr_t)recv || */
1156 func == (fptr_t)cfa_recv )
1157 #define _CFA_IO_FEATURE_IORING_OP_RECV ,
1158 return IS_DEFINED(IORING_OP_RECV);
1159
1160 if( /*func == (fptr_t)accept4 || */
1161 func == (fptr_t)cfa_accept4 )
1162 #define _CFA_IO_FEATURE_IORING_OP_ACCEPT ,
1163 return IS_DEFINED(IORING_OP_ACCEPT);
1164
1165 if( /*func == (fptr_t)connect || */
1166 func == (fptr_t)cfa_connect )
1167 #define _CFA_IO_FEATURE_IORING_OP_CONNECT ,
1168 return IS_DEFINED(IORING_OP_CONNECT);
1169
1170 if( /*func == (fptr_t)fallocate || */
1171 func == (fptr_t)cfa_fallocate )
1172 #define _CFA_IO_FEATURE_IORING_OP_FALLOCATE ,
1173 return IS_DEFINED(IORING_OP_FALLOCATE);
1174
1175 if( /*func == (fptr_t)posix_fadvise || */
1176 func == (fptr_t)cfa_fadvise )
1177 #define _CFA_IO_FEATURE_IORING_OP_FADVISE ,
1178 return IS_DEFINED(IORING_OP_FADVISE);
1179
1180 if( /*func == (fptr_t)madvise || */
1181 func == (fptr_t)cfa_madvise )
1182 #define _CFA_IO_FEATURE_IORING_OP_MADVISE ,
1183 return IS_DEFINED(IORING_OP_MADVISE);
1184
1185 if( /*func == (fptr_t)openat || */
1186 func == (fptr_t)cfa_openat )
1187 #define _CFA_IO_FEATURE_IORING_OP_OPENAT ,
1188 return IS_DEFINED(IORING_OP_OPENAT);
1189
1190 if( /*func == (fptr_t)close || */
1191 func == (fptr_t)cfa_close )
1192 #define _CFA_IO_FEATURE_IORING_OP_CLOSE ,
1193 return IS_DEFINED(IORING_OP_CLOSE);
1194
1195 if( /*func == (fptr_t)read || */
1196 func == (fptr_t)cfa_read )
1197 #define _CFA_IO_FEATURE_IORING_OP_READ ,
1198 return IS_DEFINED(IORING_OP_READ);
1199
1200 if( /*func == (fptr_t)write || */
1201 func == (fptr_t)cfa_write )
1202 #define _CFA_IO_FEATURE_IORING_OP_WRITE ,
1203 return IS_DEFINED(IORING_OP_WRITE);
1204 #endif
1205
1206 return false;
1207}
Note: See TracBrowser for help on using the repository browser.