source: libcfa/src/concurrency/io.cfa@ e46c753

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since e46c753 was e46c753, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Added new io algorithm that eagerly submits while still helping

  • Property mode set to 100644
File size: 27.7 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// io.cfa --
8//
9// Author : Thierry Delisle
10// Created On : Thu Apr 23 17:31:00 2020
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16// #define __CFA_DEBUG_PRINT_IO__
17// #define __CFA_DEBUG_PRINT_IO_CORE__
18
19#include "kernel.hfa"
20#include "bitmanip.hfa"
21
22#if !defined(HAVE_LINUX_IO_URING_H)
23 void __kernel_io_startup( cluster &, unsigned, bool ) {
24 // Nothing to do without io_uring
25 }
26
27 void __kernel_io_finish_start( cluster & ) {
28 // Nothing to do without io_uring
29 }
30
31 void __kernel_io_prepare_stop( cluster & ) {
32 // Nothing to do without io_uring
33 }
34
35 void __kernel_io_shutdown( cluster &, bool ) {
36 // Nothing to do without io_uring
37 }
38
39#else
40 #define _GNU_SOURCE /* See feature_test_macros(7) */
41 #include <errno.h>
42 #include <stdint.h>
43 #include <string.h>
44 #include <unistd.h>
45 #include <sys/mman.h>
46
47 extern "C" {
48 #include <sys/syscall.h>
49
50 #include <linux/io_uring.h>
51 }
52
53 #include "bits/signal.hfa"
54 #include "kernel_private.hfa"
55 #include "thread.hfa"
56
57 uint32_t entries_per_cluster() {
58 return 256;
59 }
60
61 static void * __io_poller_slow( void * arg );
62
63 // Weirdly, some systems that do support io_uring don't actually define these
64 #ifdef __alpha__
65 /*
66 * alpha is the only exception, all other architectures
67 * have common numbers for new system calls.
68 */
69 #ifndef __NR_io_uring_setup
70 #define __NR_io_uring_setup 535
71 #endif
72 #ifndef __NR_io_uring_enter
73 #define __NR_io_uring_enter 536
74 #endif
75 #ifndef __NR_io_uring_register
76 #define __NR_io_uring_register 537
77 #endif
78 #else /* !__alpha__ */
79 #ifndef __NR_io_uring_setup
80 #define __NR_io_uring_setup 425
81 #endif
82 #ifndef __NR_io_uring_enter
83 #define __NR_io_uring_enter 426
84 #endif
85 #ifndef __NR_io_uring_register
86 #define __NR_io_uring_register 427
87 #endif
88 #endif
89
90 // Fast poller user-thread
91 // Not using the "thread" keyword because we want to control
92 // more carefully when to start/stop it
93 struct __io_poller_fast {
94 struct __io_data * ring;
95 $thread thrd;
96 };
97
98 void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
99 this.ring = cltr.io;
100 (this.thrd){ "Fast I/O Poller", cltr };
101 }
102 void ^?{}( __io_poller_fast & mutex this );
103 void main( __io_poller_fast & this );
104 static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
105 void ^?{}( __io_poller_fast & mutex this ) {}
106
107 struct __submition_data {
108 // Head and tail of the ring (associated with array)
109 volatile uint32_t * head;
110 volatile uint32_t * tail;
111
112 // The actual kernel ring which uses head/tail
113 // indexes into the sqes arrays
114 uint32_t * array;
115
116 // number of entries and mask to go with it
117 const uint32_t * num;
118 const uint32_t * mask;
119
120 // Submission flags (Not sure what for)
121 uint32_t * flags;
122
123 // number of sqes not submitted (whatever that means)
124 uint32_t * dropped;
125
126 // Like head/tail but not seen by the kernel
127 volatile uint32_t * ready;
128 uint32_t ready_cnt;
129
130 __spinlock_t lock;
131
132 // A buffer of sqes (not the actual ring)
133 struct io_uring_sqe * sqes;
134
135 // The location and size of the mmaped area
136 void * ring_ptr;
137 size_t ring_sz;
138 };
139
140 struct __completion_data {
141 // Head and tail of the ring
142 volatile uint32_t * head;
143 volatile uint32_t * tail;
144
145 // number of entries and mask to go with it
146 const uint32_t * mask;
147 const uint32_t * num;
148
149 // number of cqes not submitted (whatever that means)
150 uint32_t * overflow;
151
152 // the kernel ring
153 struct io_uring_cqe * cqes;
154
155 // The location and size of the mmaped area
156 void * ring_ptr;
157 size_t ring_sz;
158 };
159
160 struct __io_data {
161 struct __submition_data submit_q;
162 struct __completion_data completion_q;
163 uint32_t ring_flags;
164 int cltr_flags;
165 int fd;
166 semaphore submit;
167 volatile bool done;
168 struct {
169 struct {
170 __processor_id_t id;
171 void * stack;
172 pthread_t kthrd;
173 volatile bool blocked;
174 } slow;
175 __io_poller_fast fast;
176 __bin_sem_t sem;
177 } poller;
178 };
179
180//=============================================================================================
181// I/O Startup / Shutdown logic
182//=============================================================================================
183 void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) {
184 if( (io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) && (io_flags & CFA_CLUSTER_IO_EAGER_SUBMITS) ) {
185 abort("CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS and CFA_CLUSTER_IO_EAGER_SUBMITS cannot be mixed\n");
186 }
187
188 this.io = malloc();
189
190 // Step 1 : call to setup
191 struct io_uring_params params;
192 memset(&params, 0, sizeof(params));
193
194 uint32_t nentries = entries_per_cluster();
195
196 int fd = syscall(__NR_io_uring_setup, nentries, &params );
197 if(fd < 0) {
198 abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
199 }
200
201 // Step 2 : mmap result
202 memset( this.io, 0, sizeof(struct __io_data) );
203 struct __submition_data & sq = this.io->submit_q;
204 struct __completion_data & cq = this.io->completion_q;
205
206 // calculate the right ring size
207 sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned) );
208 cq.ring_sz = params.cq_off.cqes + (params.cq_entries * sizeof(struct io_uring_cqe));
209
210 // Requires features
211 #if defined(IORING_FEAT_SINGLE_MMAP)
212 // adjust the size according to the parameters
213 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
214 cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz);
215 }
216 #endif
217
218 // mmap the Submit Queue into existence
219 sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
220 if (sq.ring_ptr == (void*)MAP_FAILED) {
221 abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
222 }
223
224 // Requires features
225 #if defined(IORING_FEAT_SINGLE_MMAP)
226 // mmap the Completion Queue into existence (may or may not be needed)
227 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
228 cq->ring_ptr = sq->ring_ptr;
229 }
230 else
231 #endif
232 {
233 // We need multiple call to MMAP
234 cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
235 if (cq.ring_ptr == (void*)MAP_FAILED) {
236 munmap(sq.ring_ptr, sq.ring_sz);
237 abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
238 }
239 }
240
241 // mmap the submit queue entries
242 size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
243 sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
244 if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
245 munmap(sq.ring_ptr, sq.ring_sz);
246 if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
247 abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
248 }
249
250 // Get the pointers from the kernel to fill the structure
251 // submit queue
252 sq.head = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
253 sq.tail = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
254 sq.mask = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
255 sq.num = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
256 sq.flags = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
257 sq.dropped = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
258 sq.array = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
259
260 {
261 const uint32_t num = *sq.num;
262 for( i; num ) {
263 sq.sqes[i].user_data = 0ul64;
264 }
265 }
266
267 if( io_flags & ( CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS | CFA_CLUSTER_IO_EAGER_SUBMITS ) ) {
268 /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8) );
269 sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8);
270 sq.ready = alloc_align( 64, sq.ready_cnt );
271 for(i; sq.ready_cnt) {
272 sq.ready[i] = -1ul32;
273 }
274 }
275 else {
276 sq.ready_cnt = 0;
277 sq.ready = 0p;
278 }
279
280 // completion queue
281 cq.head = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
282 cq.tail = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
283 cq.mask = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
284 cq.num = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
285 cq.overflow = ( uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
286 cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
287
288 // some paranoid checks
289 /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask );
290 /* paranoid */ verifyf( (*cq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num );
291 /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head );
292 /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail );
293
294 /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
295 /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
296 /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );
297 /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );
298
299 // Update the global ring info
300 this.io->ring_flags = params.flags;
301 this.io->cltr_flags = io_flags;
302 this.io->fd = fd;
303 this.io->done = false;
304 (this.io->submit){ min(*sq.num, *cq.num) };
305
306 if(!main_cluster) {
307 __kernel_io_finish_start( this );
308 }
309 }
310
311 void __kernel_io_finish_start( cluster & this ) {
312 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
313 __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this);
314 (this.io->poller.fast){ this };
315 __thrd_start( this.io->poller.fast, main );
316 }
317
318 // Create the poller thread
319 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluter %p\n", &this);
320 this.io->poller.slow.blocked = false;
321 this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
322 }
323
324 void __kernel_io_prepare_stop( cluster & this ) {
325 __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this);
326 // Notify the poller thread of the shutdown
327 __atomic_store_n(&this.io->done, true, __ATOMIC_SEQ_CST);
328
329 // Stop the IO Poller
330 sigval val = { 1 };
331 pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val );
332 post( this.io->poller.sem );
333
334 // Wait for the poller thread to finish
335 pthread_join( this.io->poller.slow.kthrd, 0p );
336 free( this.io->poller.slow.stack );
337
338 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this);
339
340 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
341 with( this.io->poller.fast ) {
342 /* paranoid */ verify( this.nprocessors == 0 || &this == mainCluster );
343 /* paranoid */ verify( !ready_mutate_islocked() );
344
345 // We need to adjust the clean-up based on where the thread is
346 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
347
348 ready_schedule_lock( (struct __processor_id_t *)active_processor() );
349
350 // This is the tricky case
351 // The thread was preempted and now it is on the ready queue
352 // The thread should be the last on the list
353 /* paranoid */ verify( thrd.link.next != 0p );
354
355 // Remove the thread from the ready queue of this cluster
356 __attribute__((unused)) bool removed = remove_head( &this, &thrd );
357 /* paranoid */ verify( removed );
358 thrd.link.next = 0p;
359 thrd.link.prev = 0p;
360 __cfaabi_dbg_debug_do( thrd.unpark_stale = true );
361
362 // Fixup the thread state
363 thrd.state = Blocked;
364 thrd.ticket = 0;
365 thrd.preempted = __NO_PREEMPTION;
366
367 ready_schedule_unlock( (struct __processor_id_t *)active_processor() );
368
369 // Pretend like the thread was blocked all along
370 }
371 // !!! This is not an else if !!!
372 if( thrd.state == Blocked ) {
373
374 // This is the "easy case"
375 // The thread is parked and can easily be moved to active cluster
376 verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster );
377 thrd.curr_cluster = active_cluster();
378
379 // unpark the fast io_poller
380 unpark( &thrd __cfaabi_dbg_ctx2 );
381 }
382 else {
383
384 // The thread is in a weird state
385 // I don't know what to do here
386 abort("Fast poller thread is in unexpected state, cannot clean-up correctly\n");
387 }
388
389 }
390
391 ^(this.io->poller.fast){};
392
393 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this);
394 }
395 }
396
397 void __kernel_io_shutdown( cluster & this, bool main_cluster ) {
398 if(!main_cluster) {
399 __kernel_io_prepare_stop( this );
400 }
401
402 // Shutdown the io rings
403 struct __submition_data & sq = this.io->submit_q;
404 struct __completion_data & cq = this.io->completion_q;
405
406 // unmap the submit queue entries
407 munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe));
408
409 // unmap the Submit Queue ring
410 munmap(sq.ring_ptr, sq.ring_sz);
411
412 // unmap the Completion Queue ring, if it is different
413 if (cq.ring_ptr != sq.ring_ptr) {
414 munmap(cq.ring_ptr, cq.ring_sz);
415 }
416
417 // close the file descriptor
418 close(this.io->fd);
419
420 free( this.io->submit_q.ready ); // Maybe null, doesn't matter
421 free( this.io );
422 }
423
424//=============================================================================================
425// I/O Polling
426//=============================================================================================
427 // Process a single completion message from the io_uring
428 // This is NOT thread-safe
429 static unsigned __collect_submitions( struct __io_data & ring );
430 static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask, int waitcnt, bool in_kernel ) {
431 /* paranoid */ verify( !kernelTLS.preemption_state.enabled );
432
433 unsigned to_submit = 0;
434 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
435 // If the poller thread also submits, then we need to aggregate the submissions which are ready
436 to_submit = __collect_submitions( ring );
437 }
438
439 const uint32_t smask = *ring.submit_q.mask;
440 uint32_t shead = *ring.submit_q.head;
441 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
442 if( ret < 0 ) {
443 switch((int)errno) {
444 case EAGAIN:
445 case EINTR:
446 return [0, true];
447 default:
448 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) );
449 }
450 }
451
452 // Release the consumed SQEs
453 for( i; ret ) {
454 uint32_t idx = ring.submit_q.array[ (i + shead) & smask ];
455 ring.submit_q.sqes[ idx ].user_data = 0;
456 }
457
458 // update statistics
459 #if !defined(__CFA_NO_STATISTICS__)
460 if( to_submit > 0 ) {
461 __tls_stats()->io.submit_q.submit_avg.rdy += to_submit;
462 __tls_stats()->io.submit_q.submit_avg.csm += ret;
463 __tls_stats()->io.submit_q.submit_avg.cnt += 1;
464 }
465 #endif
466
467 // Drain the queue
468 unsigned head = *ring.completion_q.head;
469 unsigned tail = *ring.completion_q.tail;
470 const uint32_t mask = *ring.completion_q.mask;
471
472 // Memory barrier
473 __atomic_thread_fence( __ATOMIC_SEQ_CST );
474
475 // Nothing was new return 0
476 if (head == tail) {
477 return [0, to_submit > 0];
478 }
479
480 uint32_t count = tail - head;
481 for(i; count) {
482 unsigned idx = (head + i) & mask;
483 struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
484
485 /* paranoid */ verify(&cqe);
486
487 struct __io_user_data_t * data = (struct __io_user_data_t *)(uintptr_t)cqe.user_data;
488 __cfadbg_print_safe( io, "Kernel I/O : Performed reading io cqe %p, result %d for %p\n", data, cqe.res, data->thrd );
489
490 data->result = cqe.res;
491 if(!in_kernel) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }
492 else { __unpark( &ring.poller.slow.id, data->thrd __cfaabi_dbg_ctx2 ); }
493 }
494
495 // Allow new submissions to happen
496 // V(ring.submit, count);
497
498 // Mark to the kernel that the cqe has been seen
499 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
500 __atomic_thread_fence( __ATOMIC_SEQ_CST );
501 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
502
503 return [count, count > 0 || to_submit > 0];
504 }
505
506 static void * __io_poller_slow( void * arg ) {
507 #if !defined( __CFA_NO_STATISTICS__ )
508 __stats_t local_stats;
509 __init_stats( &local_stats );
510 kernelTLS.this_stats = &local_stats;
511 #endif
512
513 cluster * cltr = (cluster *)arg;
514 struct __io_data & ring = *cltr->io;
515
516 ring.poller.slow.id.id = doregister( &ring.poller.slow.id );
517
518 sigset_t mask;
519 sigfillset(&mask);
520 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
521 abort( "KERNEL ERROR: IO_URING - pthread_sigmask" );
522 }
523
524 sigdelset( &mask, SIGUSR1 );
525
526 verify( (*ring.submit_q.head) == (*ring.submit_q.tail) );
527 verify( (*ring.completion_q.head) == (*ring.completion_q.tail) );
528
529 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p ready\n", &ring);
530
531 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
532 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
533
534 __atomic_store_n( &ring.poller.slow.blocked, true, __ATOMIC_SEQ_CST );
535
536 // In the user-thread approach drain and if anything was drained,
537 // batton pass to the user-thread
538 int count;
539 bool again;
540 [count, again] = __drain_io( ring, &mask, 1, true );
541
542 __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST );
543
544 // Update statistics
545 #if !defined(__CFA_NO_STATISTICS__)
546 __tls_stats()->io.complete_q.completed_avg.val += count;
547 __tls_stats()->io.complete_q.completed_avg.slow_cnt += 1;
548 #endif
549
550 if(again) {
551 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring);
552 __unpark( &ring.poller.slow.id, &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
553 wait( ring.poller.sem );
554 }
555 }
556 }
557 else {
558 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
559 //In the naive approach, just poll the io completion queue directly
560 int count;
561 bool again;
562 [count, again] = __drain_io( ring, &mask, 1, true );
563
564 // Update statistics
565 #if !defined(__CFA_NO_STATISTICS__)
566 __tls_stats()->io.complete_q.completed_avg.val += count;
567 __tls_stats()->io.complete_q.completed_avg.slow_cnt += 1;
568 #endif
569 }
570 }
571
572 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring);
573
574 unregister( &ring.poller.slow.id );
575
576 #if !defined(__CFA_NO_STATISTICS__)
577 __tally_stats(cltr->stats, &local_stats);
578 #endif
579
580 return 0p;
581 }
582
583 void main( __io_poller_fast & this ) {
584 verify( this.ring->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD );
585
586 // Start parked
587 park( __cfaabi_dbg_ctx );
588
589 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring);
590
591 int reset = 0;
592
593 // Then loop until we need to start
594 while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
595
596 // Drain the io
597 int count;
598 bool again;
599 disable_interrupts();
600 [count, again] = __drain_io( *this.ring, 0p, 0, false );
601
602 if(!again) reset++;
603
604 // Update statistics
605 #if !defined(__CFA_NO_STATISTICS__)
606 __tls_stats()->io.complete_q.completed_avg.val += count;
607 __tls_stats()->io.complete_q.completed_avg.fast_cnt += 1;
608 #endif
609 enable_interrupts( __cfaabi_dbg_ctx );
610
611 // If we got something, just yield and check again
612 if(reset < 5) {
613 yield();
614 }
615 // We didn't get anything baton pass to the slow poller
616 else {
617 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
618 reset = 0;
619
620 // wake up the slow poller
621 post( this.ring->poller.sem );
622
623 // park this thread
624 park( __cfaabi_dbg_ctx );
625 }
626 }
627
628 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
629 }
630
631 static inline void __wake_poller( struct __io_data & ring ) __attribute__((artificial));
632 static inline void __wake_poller( struct __io_data & ring ) {
633 if(!__atomic_load_n( &ring.poller.slow.blocked, __ATOMIC_SEQ_CST)) return;
634
635 sigval val = { 1 };
636 pthread_sigqueue( ring.poller.slow.kthrd, SIGUSR1, val );
637 }
638
639//=============================================================================================
640// I/O Submissions
641//=============================================================================================
642
643// Submition steps :
644// 1 - Allocate a queue entry. The ring already has memory for all entries but only the ones
645// listed in sq.array are visible by the kernel. For those not listed, the kernel does not
646// offer any assurance that an entry is not being filled by multiple flags. Therefore, we
647// need to write an allocator that allows allocating concurrently.
648//
649// 2 - Actually fill the submit entry, this is the only simple and straightforward step.
650//
651// 3 - Append the entry index to the array and adjust the tail accordingly. This operation
652// needs to arrive to two concensus at the same time:
653// A - The order in which entries are listed in the array: no two threads must pick the
654// same index for their entries
655// B - When can the tail be update for the kernel. EVERY entries in the array between
656// head and tail must be fully filled and shouldn't ever be touched again.
657//
658
659 [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ) {
660 /* paranoid */ verify( data != 0 );
661
662 // Prepare the data we need
663 __attribute((unused)) int len = 0;
664 __attribute((unused)) int block = 0;
665 uint32_t cnt = *ring.submit_q.num;
666 uint32_t mask = *ring.submit_q.mask;
667
668 disable_interrupts();
669 uint32_t off = __tls_rand();
670 enable_interrupts( __cfaabi_dbg_ctx );
671
672 // Loop around looking for an available spot
673 for() {
674 // Look through the list starting at some offset
675 for(i; cnt) {
676 uint64_t expected = 0;
677 uint32_t idx = (i + off) & mask;
678 struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
679 volatile uint64_t * udata = &sqe->user_data;
680
681 if( *udata == expected &&
682 __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) )
683 {
684 // update statistics
685 #if !defined(__CFA_NO_STATISTICS__)
686 disable_interrupts();
687 __tls_stats()->io.submit_q.alloc_avg.val += len;
688 __tls_stats()->io.submit_q.alloc_avg.block += block;
689 __tls_stats()->io.submit_q.alloc_avg.cnt += 1;
690 enable_interrupts( __cfaabi_dbg_ctx );
691 #endif
692
693
694 // Success return the data
695 return [sqe, idx];
696 }
697 verify(expected != data);
698
699 len ++;
700 }
701
702 block++;
703 yield();
704 }
705 }
706
707 static inline uint32_t __submit_to_ready_array( struct __io_data & ring, uint32_t idx, const uint32_t mask ) {
708 /* paranoid */ verify( idx <= mask );
709 /* paranoid */ verify( idx != -1ul32 );
710
711 // We need to find a spot in the ready array
712 __attribute((unused)) int len = 0;
713 __attribute((unused)) int block = 0;
714 uint32_t ready_mask = ring.submit_q.ready_cnt - 1;
715
716 disable_interrupts();
717 uint32_t off = __tls_rand();
718 enable_interrupts( __cfaabi_dbg_ctx );
719
720 uint32_t picked;
721 LOOKING: for() {
722 for(i; ring.submit_q.ready_cnt) {
723 picked = (i + off) & ready_mask;
724 uint32_t expected = -1ul32;
725 if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) {
726 break LOOKING;
727 }
728 verify(expected != idx);
729
730 len ++;
731 }
732
733 block++;
734 yield();
735 }
736
737 // update statistics
738 #if !defined(__CFA_NO_STATISTICS__)
739 disable_interrupts();
740 __tls_stats()->io.submit_q.look_avg.val += len;
741 __tls_stats()->io.submit_q.look_avg.block += block;
742 __tls_stats()->io.submit_q.look_avg.cnt += 1;
743 enable_interrupts( __cfaabi_dbg_ctx );
744 #endif
745
746 return picked;
747 }
748
749 void __submit( struct __io_data & ring, uint32_t idx ) {
750 // Get now the data we definetely need
751 uint32_t * const tail = ring.submit_q.tail;
752 const uint32_t mask = *ring.submit_q.mask;
753
754 // There are 2 submission schemes, check which one we are using
755 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
756 // If the poller thread submits, then we just need to add this to the ready array
757 __submit_to_ready_array( ring, idx, mask );
758
759 __wake_poller( ring );
760
761 __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
762 }
763 else if( ring.cltr_flags & CFA_CLUSTER_IO_EAGER_SUBMITS ) {
764 uint32_t picked = __submit_to_ready_array( ring, idx, mask );
765
766 for() {
767 yield();
768
769 // If some one else collected our index, we are done
770 if( ring.submit_q.ready[picked] != idx ) {
771 #if !defined(__CFA_NO_STATISTICS__)
772 disable_interrupts();
773 __tls_stats()->io.submit_q.helped += 1;
774 enable_interrupts( __cfaabi_dbg_ctx );
775 #endif
776 return;
777 }
778
779 if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
780 #if !defined(__CFA_NO_STATISTICS__)
781 __tls_stats()->io.submit_q.leader += 1;
782 #endif
783 break;
784 }
785 }
786
787 // We got the lock
788 unsigned to_submit = __collect_submitions( ring );
789 // /* paranoid */ verify( to_submit > 0 );
790 if( to_submit == 0 ) abort();
791
792 const uint32_t smask = *ring.submit_q.mask;
793 uint32_t shead = *ring.submit_q.head;
794 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, 0, 0p, _NSIG / 8);
795 if( ret < 0 ) {
796 switch((int)errno) {
797 case EAGAIN:
798 case EINTR:
799 unlock(ring.submit_q.lock);
800 return;
801 default:
802 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) );
803 }
804 }
805
806 /* paranoid */ verify( ret > 0 );
807
808 // Release the consumed SQEs
809 for( i; ret ) {
810 uint32_t idx = ring.submit_q.array[ (i + shead) & smask ];
811 ring.submit_q.sqes[ idx ].user_data = 0;
812 }
813
814 // update statistics
815 #if !defined(__CFA_NO_STATISTICS__)
816 __tls_stats()->io.submit_q.submit_avg.rdy += to_submit;
817 __tls_stats()->io.submit_q.submit_avg.csm += ret;
818 __tls_stats()->io.submit_q.submit_avg.cnt += 1;
819 #endif
820
821 unlock(ring.submit_q.lock);
822 }
823 else {
824 // get mutual exclusion
825 lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
826
827 // Append to the list of ready entries
828
829 /* paranoid */ verify( idx <= mask );
830
831 ring.submit_q.array[ (*tail) & mask ] = idx & mask;
832 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
833
834 // Submit however, many entries need to be submitted
835 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);
836 if( ret < 0 ) {
837 switch((int)errno) {
838 default:
839 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
840 }
841 }
842
843 // update statistics
844 #if !defined(__CFA_NO_STATISTICS__)
845 disable_interrupts();
846 abort();
847 __tls_stats()->io.submit_q.submit_avg.csm += 1;
848 __tls_stats()->io.submit_q.submit_avg.cnt += 1;
849 enable_interrupts( __cfaabi_dbg_ctx );
850 #endif
851
852 ring.submit_q.sqes[ idx & mask ].user_data = 0;
853
854 unlock(ring.submit_q.lock);
855
856 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
857 }
858 }
859
860 static unsigned __collect_submitions( struct __io_data & ring ) {
861 /* paranoid */ verify( ring.submit_q.ready != 0p );
862 /* paranoid */ verify( ring.submit_q.ready_cnt > 0 );
863
864 unsigned to_submit = 0;
865 uint32_t tail = *ring.submit_q.tail;
866 const uint32_t mask = *ring.submit_q.mask;
867
868 // Go through the list of ready submissions
869 for( i; ring.submit_q.ready_cnt ) {
870 // replace any submission with the sentinel, to consume it.
871 uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
872
873 // If it was already the sentinel, then we are done
874 if( idx == -1ul32 ) continue;
875
876 // If we got a real submission, append it to the list
877 ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask;
878 to_submit++;
879 }
880
881 // Increment the tail based on how many we are ready to submit
882 __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST);
883
884 return to_submit;
885 }
886#endif
Note: See TracBrowser for help on using the repository browser.